Skip to content

Commit

Permalink
Refactor airline demo to use S3DownloadManager (#1030)
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 25, 2019
1 parent 3bc08ec commit 77e375f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 91 deletions.
3 changes: 3 additions & 0 deletions python_modules/airline-demo/airline_demo/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
spark_session_local,
tempfile_resource,
unsigned_s3_session,
s3_download_manager,
)
from .solids import (
average_sfo_outbound_avg_delays_by_destination,
Expand Down Expand Up @@ -46,6 +47,7 @@
's3': unsigned_s3_session,
'db_info': redshift_db_info_resource,
'tempfile': tempfile_resource,
'download_manager': s3_download_manager,
},
)

Expand All @@ -57,6 +59,7 @@
's3': unsigned_s3_session,
'db_info': postgres_db_info_resource,
'tempfile': tempfile_resource,
'download_manager': s3_download_manager,
},
)

Expand Down
65 changes: 64 additions & 1 deletion python_modules/airline-demo/airline_demo/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from pyspark.sql import SparkSession

from dagster import Field, resource
from dagster import Field, resource, check, Dict, String, Path, Bool
from dagster.utils import safe_isfile, mkdir_p

from .types import DbInfo, PostgresConfigData, RedshiftConfigData
from .utils import (
Expand All @@ -14,9 +15,71 @@
create_redshift_db_url,
create_redshift_engine,
create_s3_session,
S3Logger,
)


class S3DownloadManager:
def __init__(self, bucket, key, target_folder, skip_if_present):
self.bucket = check.str_param(bucket, 'bucket')
self.key = check.str_param(key, 'key')
self.target_folder = check.str_param(target_folder, 'target_folder')
self.skip_if_present = check.bool_param(skip_if_present, 'skip_if_present')

def download_file(self, context, target_file):
check.str_param(target_file, 'target_file')

target_path = os.path.join(self.target_folder, target_file)

if self.skip_if_present and safe_isfile(target_path):
context.log.info(
'Skipping download, file already present at {target_path}'.format(
target_path=target_path
)
)
else:
full_key = self.key + '/' + target_file
if os.path.dirname(target_path):
mkdir_p(os.path.dirname(target_path))

context.log.info(
'Starting download of {bucket}/{key} to {target_path}'.format(
bucket=self.bucket, key=full_key, target_path=target_path
)
)

headers = context.resources.s3.head_object(Bucket=self.bucket, Key=full_key)
logger = S3Logger(
context.log.debug, self.bucket, full_key, target_path, int(headers['ContentLength'])
)
context.resources.s3.download_file(
Bucket=self.bucket, Key=full_key, Filename=target_path, Callback=logger
)

return target_path


@resource(
config_field=Field(
Dict(
{
'bucket': Field(String),
'key': Field(String),
'target_folder': Field(Path),
'skip_if_present': Field(Bool),
}
)
)
)
def s3_download_manager(init_context):
return S3DownloadManager(
bucket=init_context.resource_config['bucket'],
key=init_context.resource_config['key'],
target_folder=init_context.resource_config['target_folder'],
skip_if_present=init_context.resource_config['skip_if_present'],
)


@resource
def spark_session_local(_init_context):
# Need two versions of this, one for test/local and one with a
Expand Down
56 changes: 6 additions & 50 deletions python_modules/airline-demo/airline_demo/solids.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,9 @@ def transform_fn(context, _inputs):
config_field=Field(
Dict(
fields={
# Probably want to make the region configuable too
'bucket': Field(String, description='The S3 bucket in which to look for the key.'),
'key': Field(String, description='The key to download.'),
'skip_if_present': Field(
Bool,
description=(
'If True, and a file already exists at the path described by the '
'target_path config value, if present, or the key, then the solid '
'will no-op.'
),
default_value=False,
is_optional=True,
),
'target_path': Field(
Path,
description=('If present, specifies the path at which to download the object.'),
is_optional=True,
),
'target_file': Field(
Path, description=('Specifies the path at which to download the object.')
)
}
)
),
Expand All @@ -171,38 +156,9 @@ def download_from_s3(context):
str:
The path to the downloaded object.
'''
file_ = context.solid_config
bucket = file_['bucket']
key = file_['key']
target_path = file_.get('target_path') or key

if target_path is None:
target_path = context.resources.tempfile.tempfile().name

if file_['skip_if_present'] and safe_isfile(target_path):
context.log.info(
'Skipping download, file already present at {target_path}'.format(
target_path=target_path
)
)
else:
if os.path.dirname(target_path):
mkdir_p(os.path.dirname(target_path))

context.log.info(
'Starting download of {bucket}/{key} to {target_path}'.format(
bucket=bucket, key=key, target_path=target_path
)
)

headers = context.resources.s3.head_object(Bucket=bucket, Key=key)
logger = S3Logger(
context.log.debug, bucket, key, target_path, int(headers['ContentLength'])
)
context.resources.s3.download_file(
Bucket=bucket, Key=key, Filename=target_path, Callback=logger
)
return target_path
target_file = context.solid_config['target_file']
# TODO: error here caused terrible, terrible error message
return context.resources.download_manager.download_file(context, target_file)


@solid(
Expand Down
31 changes: 23 additions & 8 deletions python_modules/airline-demo/airline_demo_tests/test_solids.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
)

from airline_demo.solids import sql_solid, download_from_s3, ingest_csv_to_spark, unzip_file
from airline_demo.resources import spark_session_local, tempfile_resource, unsigned_s3_session
from airline_demo.resources import (
spark_session_local,
tempfile_resource,
unsigned_s3_session,
s3_download_manager,
)

from .marks import nettest, postgres, redshift, skip, spark

Expand All @@ -40,7 +45,11 @@ def _s3_context():
return {
'test': PipelineContextDefinition(
context_fn=lambda info: ExecutionContext.console_logging(log_level=logging.DEBUG),
resources={'s3': unsigned_s3_session, 'tempfile': tempfile_resource},
resources={
's3': unsigned_s3_session,
'tempfile': tempfile_resource,
'download_manager': s3_download_manager,
},
)
}

Expand Down Expand Up @@ -80,15 +89,21 @@ def test_download_from_s3():
PipelineDefinition([download_from_s3], context_definitions=_s3_context()),
'download_from_s3',
environment_dict={
'context': {'test': {}},
'solids': {
'download_from_s3': {
'config': {
'bucket': 'dagster-airline-demo-source-data',
'key': 'test/test_file',
'context': {
'test': {
'resources': {
'download_manager': {
'config': {
'skip_if_present': False,
'bucket': 'dagster-airline-demo-source-data',
'key': 'test',
'target_folder': 'test',
}
}
}
}
},
'solids': {'download_from_s3': {'config': {'target_file': 'test_file'}}},
},
)
assert result.success
Expand Down
6 changes: 6 additions & 0 deletions python_modules/airline-demo/environments/local_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ context:
postgres_password: test
postgres_hostname: localhost
postgres_db_name: test
download_manager:
config:
bucket: dagster-airline-demo-source-data
key: test
skip_if_present: True
target_folder: source_data
40 changes: 8 additions & 32 deletions python_modules/airline-demo/environments/local_fast_ingest.yml
Original file line number Diff line number Diff line change
@@ -1,52 +1,28 @@
solids:
download_april_on_time_data:
config:
bucket: dagster-airline-demo-source-data
key: test/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_4.zip
skip_if_present: true
target_path: source_data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_4.zip
target_file: On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_4.zip
download_may_on_time_data:
config:
bucket: dagster-airline-demo-source-data
key: test/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_5.zip
skip_if_present: true
target_path: source_data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_5.zip
target_file: On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_5.zip
download_june_on_time_data:
config:
bucket: dagster-airline-demo-source-data
key: test/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_6.zip
skip_if_present: true
target_path: source_data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_6.zip
target_file: On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2018_6.zip
download_master_cord_data:
config:
bucket: dagster-airline-demo-source-data
key: test/954834304_T_MASTER_CORD.zip
skip_if_present: true
target_path: source_data/954834304_T_MASTER_CORD.zip
target_file: 954834304_T_MASTER_CORD.zip
download_q2_coupon_data:
config:
bucket: dagster-airline-demo-source-data
key: test/Origin_and_Destination_Survey_DB1BCoupon_2018_2.zip
skip_if_present: true
target_path: source_data/Origin_and_Destination_Survey_DB1BCoupon_2018_2.zip
target_file: Origin_and_Destination_Survey_DB1BCoupon_2018_2.zip
download_q2_market_data:
config:
bucket: dagster-airline-demo-source-data
key: test/Origin_and_Destination_Survey_DB1BMarket_2018_2.zip
skip_if_present: true
target_path: source_data/Origin_and_Destination_Survey_DB1BMarket_2018_2.zip
target_file: Origin_and_Destination_Survey_DB1BMarket_2018_2.zip
download_q2_ticket_data:
config:
bucket: dagster-airline-demo-source-data
key: test/Origin_and_Destination_Survey_DB1BTicket_2018_2.zip
skip_if_present: true
target_path: source_data/Origin_and_Destination_Survey_DB1BTicket_2018_2.zip
target_file: Origin_and_Destination_Survey_DB1BTicket_2018_2.zip
download_q2_sfo_weather:
config:
bucket: dagster-airline-demo-source-data
key: test/sfo_q2_weather.txt
skip_if_present: true
target_path: source_data/sfo_q2_weather.txt
target_file: sfo_q2_weather.txt
unzip_april_on_time_data:
config:
destination_dir: source_data
Expand Down

0 comments on commit 77e375f

Please sign in to comment.