Skip to content

Commit

Permalink
Refactor write_file to use singer's Transformer()
Browse files Browse the repository at this point in the history
  • Loading branch information
CoopTang committed Feb 11, 2021
1 parent 4d437bd commit a59fdfd
Showing 1 changed file with 24 additions and 22 deletions.
46 changes: 24 additions & 22 deletions tap_spreadsheets_anywhere/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import re
import pytz
from datetime import datetime, timezone

import dateutil
import requests
import singer
from singer.transform import Transformer
import boto3
from google.cloud import storage
import os, logging
Expand All @@ -22,27 +22,29 @@ def write_file(target_filename, table_spec, schema, max_records=-1):
target_uri = table_spec['path'] + '/' + target_filename
records_synced = 0
try:
iterator = tap_spreadsheets_anywhere.format_handler.get_row_iterator(table_spec, target_uri)
for row in iterator:
metadata = {
'_smart_source_bucket': table_spec['path'],
'_smart_source_file': target_filename,
# index zero, +1 for header row
'_smart_source_lineno': records_synced + 2
}

try:
to_write = [{**conversion.convert_row(row, schema), **metadata}]
singer.write_records(table_spec['name'], to_write)
except BrokenPipeError as bpe:
LOGGER.error(
f'Pipe to loader broke after {records_synced} records were written from {target_filename}: troubled '
f'line was {to_write[0]}')
raise bpe

records_synced += 1
if 0 < max_records <= records_synced:
break
with Transformer() as transformer:
iterator = tap_spreadsheets_anywhere.format_handler.get_row_iterator(table_spec, target_uri)
for row in iterator:
metadata = {
'_smart_source_bucket': table_spec['path'],
'_smart_source_file': target_filename,
# index zero, +1 for header row
'_smart_source_lineno': records_synced + 2
}

try:
record = conversion.convert_row(row, schema)
transformed_record = transformer.transform(record, schema, metadata)
singer.write_record(table_spec['name'], transformed_record)
except BrokenPipeError as bpe:
LOGGER.error(
f'Pipe to loader broke after {records_synced} records were written from {target_filename}: troubled '
f'line was {record}')
raise bpe

records_synced += 1
if 0 < max_records <= records_synced:
break

except tap_spreadsheets_anywhere.format_handler.InvalidFormatError as ife:
if table_spec.get('invalid_format_action','fail').lower() == "ignore":
Expand Down

0 comments on commit a59fdfd

Please sign in to comment.