Skip to content
Permalink
Browse files

(airline-demo-testability-3) Delete unused solids

Summary: These solids are no longer used in the airline-demo

Test Plan: Buildkite

Reviewers: max, natekupp

Reviewed By: max

Differential Revision: https://dagster.phacility.com/D364
  • Loading branch information...
schrockn committed Jun 11, 2019
1 parent 50a5153 commit 5b0777206145518f03c17f60010e3d94875e661e
@@ -2,15 +2,11 @@

import os
import re
import zipfile

from io import BytesIO

from pyspark.sql import DataFrame
from sqlalchemy import text

from dagster import (
Bytes,
composite_solid,
Dict,
ExpectationResult,
@@ -179,50 +175,10 @@ def ingest_csv_file_handle_to_spark(context, csv_file_handle):
)


@solid(
name='unzip_file',
inputs=[
InputDefinition('archive_file', Bytes, description='The archive to unzip'),
InputDefinition('archive_member', String, description='The archive member to extract.'),
],
description='Extracts an archive member from a zip archive.',
outputs=[OutputDefinition(Bytes, description='The unzipped archive member.')],
)
def unzip_file(_context, archive_file, archive_member):

with zipfile.ZipFile(archive_file) as zip_ref:
return BytesIO(zip_ref.open(archive_member).read())


def rename_spark_dataframe_columns(data_frame, fn):
return data_frame.toDF(*[fn(c) for c in data_frame.columns])


@solid(
name='ingest_csv_to_spark',
inputs=[InputDefinition('input_csv_file', Bytes)],
outputs=[OutputDefinition(SparkDataFrameType)],
)
def ingest_csv_to_spark(context, input_csv_file):
tf = context.resources.tempfile.tempfile()
with open(tf.name, 'wb') as fd:
fd.write(input_csv_file.read())

data_frame = (
context.resources.spark.read.format('csv')
.options(
header='true',
# inferSchema='true',
)
.load(tf.name)
)

# parquet compat
return rename_spark_dataframe_columns(
data_frame, lambda x: re.sub(PARQUET_SPECIAL_CHARACTERS, '', x)
)


def do_prefix_column_names(df, prefix):
check.inst_param(df, 'df', DataFrame)
check.str_param(prefix, 'prefix')
@@ -6,23 +6,14 @@
We lever pytest marks to isolate subsets of tests with different requirements. E.g., to run only
those tests that don't require Spark, `pytest -m "not spark"`.
"""
import os

import pyspark
import pytest

from dagster import (
DependencyDefinition,
execute_solid,
lambda_solid,
ModeDefinition,
PipelineDefinition,
RunConfig,
)
from dagster import ModeDefinition

from dagster.core.storage.temp_file_manager import tempfile_resource

from dagster_examples.airline_demo.solids import sql_solid, ingest_csv_to_spark, unzip_file
from dagster_examples.airline_demo.solids import sql_solid
from dagster_examples.airline_demo.resources import spark_session_local
from dagster_aws.s3.resources import s3_resource
from dagster_aws.s3.system_storage import s3_plus_default_storage_defs
@@ -57,60 +48,6 @@ def test_sql_solid():
# TODO: test execution?


def test_unzip_file_tempfile():
@lambda_solid
def nonce():
return None

with open(os.path.join(os.path.dirname(__file__), 'data/test.zip'), 'rb') as fd:
archive_file = fd.read()

result = execute_solid(
PipelineDefinition(
solids=[nonce, unzip_file],
dependencies={
'unzip_file': {
'archive_file': DependencyDefinition('nonce'),
'archive_member': DependencyDefinition('nonce'),
}
},
mode_definitions=[tempfile_mode],
),
'unzip_file',
inputs={'archive_file': archive_file, 'archive_member': 'test/test_file'},
environment_dict={},
run_config=RunConfig(mode='tempfile'),
)
assert result.success
assert result.result_value().read() == b'test\n'


@pytest.mark.spark
def test_ingest_csv_to_spark():
@lambda_solid
def nonce():
return None

with open(os.path.join(os.path.dirname(__file__), 'data/test.csv'), 'rb') as fd:
input_csv_file = fd.read()
result = execute_solid(
PipelineDefinition(
[nonce, ingest_csv_to_spark],
dependencies={'ingest_csv_to_spark': {'input_csv_file': DependencyDefinition('nonce')}},
mode_definitions=[spark_mode],
),
'ingest_csv_to_spark',
inputs={'input_csv_file': input_csv_file},
environment_dict={},
run_config=RunConfig(mode='spark'),
)
assert result.success
assert isinstance(result.result_value(), pyspark.sql.dataframe.DataFrame)
# We can't make this assertion because the tempfile is cleaned up after the solid executes --
# a fuller test would have another solid make this assertion
# assert result.result_value().head()[0] == '1'


@pytest.mark.postgres
@pytest.mark.skip
@pytest.mark.spark
@@ -12,55 +12,55 @@
from pyspark.sql import SparkSession, Row

from dagster import (
DependencyDefinition,
InputDefinition,
LocalFileHandle,
OutputDefinition,
PipelineDefinition,
RunConfig,
execute_pipeline,
execute_solid,
file_relative_path,
lambda_solid,
pipeline,
solid,
InputDefinition,
file_relative_path,
)

from dagster.core.storage.intermediate_store import FileSystemIntermediateStore

from dagster_aws.s3.intermediate_store import S3IntermediateStore

from dagster_examples.airline_demo.solids import ingest_csv_to_spark
from dagster_examples.airline_demo.solids import ingest_csv_file_handle_to_spark
from dagster_examples.airline_demo.types import SparkDataFrameType

from .test_solids import spark_mode


def test_spark_data_frame_serialization_file_system():
with open(os.path.join(os.path.dirname(__file__), 'data/test.csv'), 'rb') as fd:
input_csv_file = fd.read()

def test_spark_data_frame_serialization_file_system_file_handle():
@lambda_solid
def nonce():
return input_csv_file
return LocalFileHandle(file_relative_path(__file__, 'data/test.csv'))

pipeline_def = PipelineDefinition(
[nonce, ingest_csv_to_spark],
dependencies={'ingest_csv_to_spark': {'input_csv_file': DependencyDefinition('nonce')}},
mode_definitions=[spark_mode],
)
@pipeline(mode_definitions=[spark_mode])
def spark_df_test_pipeline(_):
# pylint: disable=no-value-for-parameter
ingest_csv_file_handle_to_spark(nonce())

run_id = str(uuid.uuid4())

intermediate_store = FileSystemIntermediateStore(run_id=run_id)

result = execute_pipeline(
pipeline_def,
spark_df_test_pipeline,
run_config=RunConfig(run_id=run_id, mode='spark'),
environment_dict={'storage': {'filesystem': {}}},
)

assert result.success
result_dir = os.path.join(
intermediate_store.root, 'intermediates', 'ingest_csv_to_spark.compute', 'result'
intermediate_store.root,
'intermediates',
'ingest_csv_file_handle_to_spark.compute',
'result',
)

assert '_SUCCESS' in os.listdir(result_dir)
@@ -72,26 +72,23 @@ def nonce():
assert df.head()[0] == '1'


def test_spark_data_frame_serialization_s3():
with open(os.path.join(os.path.dirname(__file__), 'data/test.csv'), 'rb') as fd:
input_csv_file = fd.read()
def test_spark_data_frame_serialization_s3_file_handle():
@solid
def nonce(context):
with open(os.path.join(os.path.dirname(__file__), 'data/test.csv'), 'rb') as fd:
return context.file_manager.write_data(fd.read())

@lambda_solid
def nonce():
return input_csv_file

pipeline_def = PipelineDefinition(
[nonce, ingest_csv_to_spark],
dependencies={'ingest_csv_to_spark': {'input_csv_file': DependencyDefinition('nonce')}},
mode_definitions=[spark_mode],
)
@pipeline(mode_definitions=[spark_mode])
def spark_df_test_pipeline(_):
# pylint: disable=no-value-for-parameter
ingest_csv_file_handle_to_spark(nonce())

run_id = str(uuid.uuid4())

intermediate_store = S3IntermediateStore(s3_bucket='dagster-airflow-scratch', run_id=run_id)

result = execute_pipeline(
pipeline_def,
spark_df_test_pipeline,
environment_dict={'storage': {'s3': {'config': {'s3_bucket': 'dagster-airflow-scratch'}}}},
run_config=RunConfig(run_id=run_id, mode='spark'),
)
@@ -102,7 +99,7 @@ def nonce():
[
intermediate_store.root.strip('/'),
'intermediates',
'ingest_csv_to_spark.compute',
'ingest_csv_file_handle_to_spark.compute',
'result',
'_SUCCESS',
]
@@ -1,6 +1,4 @@
from dagster import (
check,
solid,
Bool,
Bytes,
Dict,
@@ -10,7 +8,9 @@
Path,
Result,
String,
check,
input_schema,
solid,
)

from .configs import put_object_configs
@@ -48,22 +48,6 @@ def __init__(self):
)


@solid(
description='Downloads an object from S3.',
inputs=[InputDefinition('bucket_data', S3BucketData)],
outputs=[OutputDefinition(Bytes, description='The contents of the downloaded object.')],
required_resources={'s3'},
)
def download_from_s3_to_bytes(context, bucket_data):
'''Download an object from S3 as an in-memory bytes object.
Returns:
str:
The path to the downloaded object.
'''
return context.resources.s3.download_from_s3_to_bytes(bucket_data['bucket'], bucket_data['key'])


@solid(
name='download_from_s3_to_file',
config_field=Field(
@@ -10,7 +10,7 @@

from dagster import execute_solid, PipelineDefinition, ModeDefinition
from dagster_aws.s3.resources import s3_resource
from dagster_aws.s3.solids import download_from_s3_to_bytes, download_from_s3_to_file
from dagster_aws.s3.solids import download_from_s3_to_file


@pytest.mark.nettest
@@ -37,28 +37,3 @@ def test_download_from_s3_to_file():
)
assert result.success
assert result.result_value() == os.path.join(tmp_directory, 'test_file')


@pytest.mark.nettest
def test_download_from_s3_to_bytes():
result = execute_solid(
PipelineDefinition(
[download_from_s3_to_bytes],
mode_definitions=[ModeDefinition(resources={'s3': s3_resource})],
),
'download_from_s3_to_bytes',
environment_dict={
'solids': {
'download_from_s3_to_bytes': {
'inputs': {
'bucket_data': {
'bucket': 'dagster-airline-demo-source-data',
'key': 'test/test_file',
}
}
}
}
},
)
assert result.success
assert result.result_value().read() == b'test\n'

0 comments on commit 5b07772

Please sign in to comment.
You can’t perform that action at this time.