Skip to content

Commit

Permalink
Improve SPSS plugin (#483)
Browse files Browse the repository at this point in the history
* Improved SPSS writing

* Improved SQL internals

* Improved SPSS internals

* Improved BigQuery internals

* Improved Pandas internals

* Fixed SPSS warnings

* Added SpssDialect/Parser

* Rebased parser.write on reusable `read_row_stream`

* Fixed travis

* Updated travis
  • Loading branch information
roll committed Oct 13, 2020
1 parent 4194d34 commit abd0247
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 366 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ before_install:
- cp .env.example .env
- psql -c 'CREATE DATABASE test;' -U postgres
- mysql -e 'CREATE DATABASE test CHARACTER SET utf8 COLLATE utf8_general_ci;'
- openssl aes-256-cbc -K $encrypted_4885f94bfb6d_key -iv $encrypted_4885f94bfb6d_iv -in .google.json.enc -out .google.json -d
- openssl aes-256-cbc -K $encrypted_4885f94bfb6d_key -iv $encrypted_4885f94bfb6d_iv -in .google.json.enc -out .google.json -d || echo 'Encrypted files have been removed for security reasons.'

install:
# TODO: remove after
Expand Down
4 changes: 2 additions & 2 deletions frictionless/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ def read_data_stream_handle_errors(self, data_stream):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
"""Write row stream into the resource
Parameters:
gen<Row[]>: row stream
read_row_stream (gen<Row[]>): row stream factory
"""
raise NotImplementedError

Expand Down
4 changes: 2 additions & 2 deletions frictionless/parsers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ def read_data_stream_infer_dialect(self):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
options = {}
for name in vars(self.resource.dialect.to_python()):
value = getattr(self.resource.dialect, name, None)
if value is not None:
options[name] = value
with tempfile.NamedTemporaryFile(delete=False) as file:
writer = unicodecsv.writer(file, encoding=self.resource.encoding, **options)
for row in row_stream:
for row in read_row_stream():
schema = row.schema
if row.row_number == 1:
writer.writerow(schema.field_names)
Expand Down
8 changes: 4 additions & 4 deletions frictionless/parsers/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ def read_data_stream_create(self):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
helpers.ensure_dir(self.resource.source)
book = openpyxl.Workbook(write_only=True)
title = dialect.sheet
if isinstance(title, int):
title = f"Sheet {dialect.sheet}"
sheet = book.create_sheet(title)
for row in row_stream:
for row in read_row_stream():
cells = []
if row.row_number == 1:
sheet.append(row.schema.field_names)
Expand Down Expand Up @@ -223,15 +223,15 @@ def type_value(ctype, value):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
helpers.ensure_dir(self.resource.source)
book = xlwt.Workbook()
title = dialect.sheet
if isinstance(title, int):
title = f"Sheet {dialect.sheet}"
sheet = book.add_sheet(title)
for row_index, row in enumerate(row_stream):
for row_index, row in enumerate(read_row_stream()):
if row.row_number == 1:
for field_index, name in enumerate(row.schema.field_names):
sheet.write(0, field_index, name)
Expand Down
4 changes: 2 additions & 2 deletions frictionless/parsers/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ def read_data_stream_create(self):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
self.resource.data = []
for row in row_stream:
for row in read_row_stream():
item = row.to_dict() if dialect.keyed else list(row.values())
if not dialect.keyed and row.row_number == 1:
self.resource.data.append(row.schema.field_names)
Expand Down
8 changes: 4 additions & 4 deletions frictionless/parsers/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ def read_data_stream_create(self, dialect=None):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
data = []
dialect = self.resource.dialect
for row in row_stream:
for row in read_row_stream():
cells = list(row.values())
cells, notes = row.schema.write_data(cells, native_types=self.native_types)
item = dict(zip(row.schema.field_names, cells)) if dialect.keyed else cells
Expand Down Expand Up @@ -103,11 +103,11 @@ def read_data_stream_create(self, dialect=None):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
with tempfile.NamedTemporaryFile(delete=False) as file:
writer = jsonlines.Writer(file)
for row in row_stream:
for row in read_row_stream():
schema = row.schema
cells = list(row.values())
cells, notes = schema.write_data(cells, native_types=self.native_types)
Expand Down
134 changes: 59 additions & 75 deletions frictionless/plugins/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ def read_data_stream_create(self):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
schema = self.resource.schema
storage = BigqueryStorage(
service=self.resource.source,
project=dialect.project,
dataset=dialect.dataset,
)
resource = Resource(name=dialect.table, data=row_stream, schema=schema)
resource = Resource(name=dialect.table, data=read_row_stream, schema=schema)
storage.write_resource(resource)


Expand Down Expand Up @@ -227,7 +227,7 @@ def read_resource(self, name):

# Create resource
schema = self.__read_convert_schema(response["schema"])
data = partial(self.__read_data_stream, name, schema)
data = partial(self.__read_convert_data, name, schema)
resource = Resource(name=name, schema=schema, data=data)

return resource
Expand Down Expand Up @@ -257,28 +257,7 @@ def __read_convert_schema(self, bq_schema):

return schema

def __read_convert_type(self, bq_type):

# Mapping
mapping = {
"BOOLEAN": "boolean",
"DATE": "date",
"DATETIME": "datetime",
"INTEGER": "integer",
"FLOAT": "number",
"STRING": "string",
"TIME": "time",
}

# Return type
if bq_type in mapping:
return mapping[bq_type]

# Not supported
note = "Type %s is not supported" % type
raise exceptions.FrictionlessException(errors.StorageError(note=note))

def __read_data_stream(self, name, schema):
def __read_convert_data(self, name, schema):
bq_name = self.__write_convert_name(name)

# Get response
Expand Down Expand Up @@ -306,6 +285,26 @@ def __read_data_stream(self, name, schema):
yield schema.field_names
yield from data

def __read_convert_type(self, bq_type=None):

# Mapping
mapping = {
"BOOLEAN": "boolean",
"DATE": "date",
"DATETIME": "datetime",
"INTEGER": "integer",
"FLOAT": "number",
"STRING": "string",
"TIME": "time",
}

# Return type
if bq_type:
return mapping.get(bq_type, "string")

# Return mapping
return mapping

# Write

def write_resource(self, resource, *, force=False):
Expand Down Expand Up @@ -344,90 +343,53 @@ def write_package(self, package, *, force=False):
).execute()

# Write data
self.__write_row_stream(resource)
self.__write_convert_data(resource)

def __write_convert_name(self, name):
return _slugify_name(self.__prefix + name)

def __write_convert_schema(self, schema):
bq_schema = {"fields": []}

# Fields
bq_fields = []
for field in schema.fields:
bq_type = self.__write_convert_type(field.type)
if not bq_type:
bq_type = "STRING"
mode = "NULLABLE"
if field.required:
mode = "REQUIRED"
bq_fields.append(
bq_schema["fields"].append(
{
"name": _slugify_name(field.name),
"type": bq_type,
"mode": mode,
}
)

# Schema
bq_schema = {
"fields": bq_fields,
}

return bq_schema

def __write_convert_type(self, type):
mapping = self.__write_convert_types()

# Supported type
if type in mapping:
return mapping[type]

# Not supported
note = "Type %s is not supported" % type
raise exceptions.FrictionlessException(errors.StorageError(note=note))

def __write_convert_types(self):
return {
"any": "STRING",
"array": None,
"boolean": "BOOLEAN",
"date": "DATE",
"datetime": "DATETIME",
"duration": None,
"geojson": None,
"geopoint": None,
"integer": "INTEGER",
"number": "FLOAT",
"object": None,
"string": "STRING",
"time": "TIME",
"year": "INTEGER",
"yearmonth": None,
}

def __write_row_stream(self, resource):
mapping = self.__write_convert_types()
def __write_convert_data(self, resource):
mapping = self.__write_convert_type()

# Fallback fields
fallback_fields = []
mapping = self.__write_convert_types()
mapping = self.__write_convert_type()
for field in resource.schema.fields:
if mapping[field.type] is None:
if not mapping.get(field.type):
fallback_fields.append(field)

# Write data
buffer = []
for row in resource.read_rows():
for row in resource.read_row_stream():
for field in fallback_fields:
row[field.name], notes = field.write_cell(row[field.name])
buffer.append(row.to_list())
if len(buffer) > BUFFER_SIZE:
self.__write_row_stream_buffer(resource.name, buffer)
self.__write_convert_data_start_job(resource.name, buffer)
buffer = []
if len(buffer) > 0:
self.__write_row_stream_buffer(resource.name, buffer)
self.__write_convert_data_start_job(resource.name, buffer)

def __write_row_stream_buffer(self, name, buffer):
def __write_convert_data_start_job(self, name, buffer):
http = helpers.import_from_plugin("apiclient.http", plugin="bigquery")
bq_name = self.__write_convert_name(name)

Expand Down Expand Up @@ -465,14 +427,14 @@ def __write_row_stream_buffer(self, name, buffer):

# Wait the job
try:
self.__write_wait_job_is_done(response)
self.__write_convert_data_finish_job(response)
except Exception as exception:
if "not found: job" in str(exception).lower():
note = "BigQuery plugin supports only the US location of datasets"
raise exceptions.FrictionlessException(errors.StorageError(note=note))
raise

def __write_wait_job_is_done(self, response):
def __write_convert_data_finish_job(self, response):

# Get job instance
job = self.__service.jobs().get(
Expand All @@ -490,6 +452,28 @@ def __write_wait_job_is_done(self, response):
break
time.sleep(1)

def __write_convert_type(self, type=None):

# Mapping
mapping = {
"any": "STRING",
"boolean": "BOOLEAN",
"date": "DATE",
"datetime": "DATETIME",
"integer": "INTEGER",
"number": "FLOAT",
"string": "STRING",
"time": "TIME",
"year": "INTEGER",
}

# Return type
if type:
return mapping.get(type, "STRING")

# Return mapping
return mapping

# Delete

def delete_resource(self, name, *, ignore=False):
Expand Down
3 changes: 1 addition & 2 deletions frictionless/plugins/gsheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def read_data_stream_create(self):

# Write

# NOTE: if we migrate to the native driver we can enable it
def write(self, row_stream, *, schema):
def write(self, read_row_stream):
error = errors.Error(note="Writing to Google Sheets is not supported")
raise exceptions.FrictionlessException(error)
4 changes: 2 additions & 2 deletions frictionless/plugins/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ def read_data_stream_create(self):

# NOTE: rebase on proper pyquery
# NOTE: take dialect into account
def write(self, row_stream):
def write(self, read_row_stream):
html = "<html><body><table>\n"
for row in row_stream:
for row in read_row_stream():
if row.row_number == 1:
html += "<tr>"
for name in row.schema.field_names:
Expand Down
4 changes: 2 additions & 2 deletions frictionless/plugins/ods.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ def type_value(cell):

# Write

def write(self, row_stream):
def write(self, read_row_stream):
dialect = self.resource.dialect
helpers.ensure_dir(self.resource.source)
ezodf = helpers.import_from_plugin("ezodf", plugin="ods")
book = ezodf.newdoc(doctype="ods", filename=self.resource.source)
title = f"Sheet {dialect.sheet}"
book.sheets += ezodf.Sheet(title)
sheet = book.sheets[title]
for row_index, row in enumerate(row_stream):
for row_index, row in enumerate(read_row_stream()):
if row.row_number == 1:
for field_index, name in enumerate(row.schema.field_names):
sheet[(0, field_index)].set_value(name)
Expand Down
Loading

0 comments on commit abd0247

Please sign in to comment.