Skip to content
Permalink
Browse files

Add event-pipeline-demo to CircleCI (#1103)

This PR adds the event-pipeline-demo to CircleCI.

It turns out sbt doesn’t work building inside of docker because of this https://github.com/milessabin/shapeless/wiki/Shapeless-with-SBT-Assembly-inside-Docker - so this PR also includes building the shapeless library as described above and including the binary jar in scala_modules/lib/ - note that we'll have to repeat this process if the version of shapeless we depend on changes for some reason.
  • Loading branch information...
natekupp committed Apr 3, 2019
1 parent e4b148d commit 9e8bf3a7b25b11e68ec94113e2c715be301749e1
@@ -86,6 +86,15 @@ workflows:
context: coveralls
- airline-demo-py27:
context: coveralls
- event-pipeline-demo-py37:
context: coveralls
- event-pipeline-demo-py36:
context: coveralls
- event-pipeline-demo-py35:
context: coveralls
- event-pipeline-demo-py27:
context: coveralls

- coverage:
requires:
- dagster-py37
@@ -124,6 +133,10 @@ workflows:
- airline-demo-py36
- airline-demo-py35
- airline-demo-py27
- event-pipeline-demo-py37
- event-pipeline-demo-py36
- event-pipeline-demo-py35
- event-pipeline-demo-py27
context: coveralls
nightly:
triggers:
@@ -898,6 +911,72 @@ jobs:
paths:
- .coverage*

event-pipeline-demo-py36: &event-pipeline-demo-template
docker:
- image: dagster/cci-test:3.6
environment:
TOXENV: py36
working_directory: ~/repo
steps:
- checkout
- restore_cache:
key: sbt-cache
- run:
name: Create working directory for build
command: |
mkdir -p /tmp/dagster/events/
- run:
name: Build Scala jar
command: |
pushd scala_modules
/opt/sbt/bin/sbt events/assembly
cp ./events/target/scala-2.11/events-assembly-0.1.0-SNAPSHOT.jar /tmp/dagster/events/
popd
- save_cache:
key: sbt-cache
paths:
- "~/.ivy2/cache"
- "~/.sbt"
- "~/.m2"
- run:
name: Install Dependencies
command: |
sudo pip install tox
- run:
name: Run Dagster Tests
command: |
pushd python_modules/event-pipeline-demo
tox -e $TOXENV
popd
- run:
command: |
mv python_modules/event-pipeline-demo/.coverage python_modules/.coverage.event-pipeline-demo.${CIRCLE_BUILD_NUM}
- persist_to_workspace:
root: python_modules/
paths:
- .coverage*

event-pipeline-demo-py37:
<<: *event-pipeline-demo-template
docker:
- image: dagster/cci-test:3.7.2
environment:
TOXENV: py37

event-pipeline-demo-py35:
<<: *event-pipeline-demo-template
docker:
- image: dagster/cci-test:3.5
environment:
TOXENV: py35

event-pipeline-demo-py27:
<<: *event-pipeline-demo-template
docker:
- image: dagster/cci-test:2.7
environment:
TOXENV: py27

coverage:
docker:
- image: circleci/python:3.6.6
@@ -92,7 +92,7 @@ def define_snowflake_config():
validate_default_parameters = Field(
Bool,
description='''False by default. Raise an exception if either one of specified database,
schema or warehouse doesnt exists if True.''',
schema or warehouse doesn't exists if True.''',
is_optional=True,
)

@@ -27,6 +27,6 @@
'Operating System :: OS Independent',
],
packages=find_packages(exclude=['test']),
install_requires=['dagster', 'snowflake-connector-python==1.7.*'],
install_requires=['dagster', 'dagster_pandas', 'snowflake-connector-python==1.7.*'],
zip_safe=False,
)
@@ -59,10 +59,10 @@ def _spark_transform_fn(context, _):
)
]

check.not_none_param(
spark_home,
'No spark home set. You must either provide spark_home or set $SPARK_HOME in your '
'environment',
check.invariant(
spark_home is not None,
'No spark home set. You must either pass spark_home in config or set $SPARK_HOME '
'in your environment (got None).',
)

deploy_mode = ['--deploy-mode', '{}'.format(deploy_mode)] if deploy_mode else []
@@ -1,8 +1,10 @@

#Introduction
# Event pipeline demo
This is intended to be a fully working example Spark pipeline, with the ability to kick off a Spark
job locally as well as on a remote Spark cluster.

Example events data here: https://s3.us-east-2.amazonaws.com/elementl-public/example-json.gz

## Requirements

You must have Spark installed, and you must set $SPARK_HOME.
@@ -0,0 +1,2 @@
pyspark==2.4.0
mock==2.0.0
@@ -9,7 +9,7 @@ solids:
bucket: "elementl-public"
key: "example-json.gz"
skip_if_present: true
target_folder: "/tmp/dagster/events"
target_folder: "/tmp/dagster/events/data"
snowflake_load:
config:
account: "<< SET ME >>"
@@ -20,11 +20,11 @@ solids:
schema: "TESTSCHEMA"
event_ingest:
config:
spark_outputs: ["/tmp/dagster/events"]
application_jar: "events-assembly-0.1.0-SNAPSHOT.jar"
spark_outputs: ["/tmp/dagster/events/data"]
application_jar: "/tmp/dagster/events/events-assembly-0.1.0-SNAPSHOT.jar"
deploy_mode: "client"
main_class: "io.dagster.events.EventPipeline"
application_arguments: "--local-path /tmp/dagster/events --date 2019-01-01"
application_arguments: "--local-path /tmp/dagster/events/data --date 2019-01-01"
master_url: "local[*]"
spark_conf:
spark:
@@ -45,7 +45,10 @@ def define_event_ingest_pipeline():

# TODO: express dependency of this solid on event_ingest
snowflake_load = SnowflakeLoadSolidDefinition(
'snowflake_load', src='file:///tmp/output/local/output/2019/01/01/*.parquet', table='events'
'snowflake_load',
# TODO: need to pull this out to a config
src='file:///tmp/dagster/events/data/output/2019/01/01/*.parquet',
table='events',
)

return PipelineDefinition(
@@ -1,25 +1,70 @@
import os
import subprocess

# another py2/3 difference
try:
import unittest.mock as mock
except ImportError:
import mock

import pytest

# from dagster import execute_pipeline
from dagster import execute_pipeline

# from dagster.utils import load_yaml_from_globs, script_relative_path
from dagster.utils import load_yaml_from_globs, script_relative_path

# from event_pipeline_demo.pipelines import define_event_ingest_pipeline
from event_pipeline_demo.pipelines import define_event_ingest_pipeline

spark = pytest.mark.spark
'''Tests that require Spark.'''

skip = pytest.mark.skip


@skip
# To support this test, we need to do the following:
# 1. Have CircleCI publish Scala/Spark jars when that code changes
# 2. Ensure we have Spark available to CircleCI
# 3. Include example / test data in this repository
@spark
def test_event_pipeline():
# config = load_yaml_from_globs(script_relative_path('../environments/default.yml'))
# result_pipeline = execute_pipeline(define_event_ingest_pipeline(), config)
# assert result_pipeline.success
# To support this test, we need to do the following:
# 1. Have CircleCI publish Scala/Spark jars when that code changes
# 2. Ensure we have Spark available to CircleCI
# 3. Include example / test data in this repository
raise NotImplementedError()
@mock.patch('snowflake.connector.connect')
def test_event_pipeline(snowflake_connect):
spark_home_set = True

if os.getenv('SPARK_HOME') is None:
spark_home_set = False

try:
if not spark_home_set:
try:
pyspark_show = subprocess.check_output(['pip', 'show', 'pyspark'])
except subprocess.CalledProcessError:
pass
else:
os.environ['SPARK_HOME'] = os.path.join(
list(
filter(lambda x: 'Location' in x, pyspark_show.decode('utf-8').split('\n'))
)[0].split(' ')[1],
'pyspark',
)

config = load_yaml_from_globs(script_relative_path('../environments/default.yml'))
result_pipeline = execute_pipeline(define_event_ingest_pipeline(), config)
assert result_pipeline.success

# We're not testing Snowflake loads here, so at least test that we called the connect
# appropriately
snowflake_connect.assert_called_once_with(
user='<< SET ME >>',
password='<< SET ME >>',
account='<< SET ME >>',
database='TESTDB',
schema='TESTSCHEMA',
warehouse='TINY_WAREHOUSE',
)

finally:
if not spark_home_set:
try:
del os.environ['SPARK_HOME']
except KeyError:
pass
@@ -0,0 +1,7 @@
from event_pipeline_demo.version import __version__


def test_version():
assert isinstance(__version__, str)
assert len(__version__.split('.')) == 3
assert all((isinstance(int(num), int) for num in __version__.split('.')))
@@ -0,0 +1,19 @@
[tox]
envlist = py37,py36,py35,py27

[testenv]
passenv = CIRCLECI CIRCLE_* CI_PULL_REQUEST COVERALLS_REPO_TOKEN TOXENV AWS_SECRET_ACCESS_KEY AWS_ACCESS_KEY_ID
deps =
-e ../dagster/
-r ../dagster/dev-requirements.txt
-e ../dagster-framework/aws
-e ../dagster-framework/snowflake
-e ../dagster-framework/spark
-e ../dagster-pandas/
-r ./dev-requirements.txt
commands =
coverage erase
pytest -vv --junitxml=test_results.xml --cov=event_pipeline_demo --cov-append --cov-report=
coverage report --omit='.tox/*,**/test_*.py' --skip-covered
coverage html --omit='.tox/*,**/test_*.py'
coverage xml --omit='.tox/*,**/test_*.py'
@@ -17,3 +17,4 @@ project/plugins/project/
.scala_dependencies
.worksheet

!lib
@@ -4,6 +4,7 @@ This project is our collection of Scala modules.
## Requirements
* IntelliJ IDEA
* IntelliJ Scalafmt Plugin
* Scala 2.11
* Spark 2.4.0+

Spark is expected to be a provided dependency, so you should have a working Spark install somewhere, and `$SPARK_HOME` should be set in your environment.
@@ -15,4 +16,9 @@ Some editor config to put in place: [Case Class Definition Style](https://stacko
We follow the Twitter [Effective Scala](http://twitter.github.io/effectivescala/) style guide.

Saving this here for future reference: [Spark + S3](https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98)


### Installing scala and sbt on Mac OS X
Use homebrew:

brew install scala@2.11
brew install sbt
@@ -19,7 +19,7 @@ lazy val awsVersion = "1.11.525"
lazy val events = project
.settings(
name := "events",
assemblyMergeStrategy in assembly := {
assembly / assemblyMergeStrategy := {
case x if x.endsWith("io.netty.versions.properties") => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
@@ -33,9 +33,14 @@ lazy val events = project
// version needed by Spark.
//
// These rules apply at jar assembly time, so code can still import com.amazonaws... as before.
assemblyShadeRules in assembly := Seq(
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("com.amazonaws.**" -> "shaded.@0").inAll
),
// See: https://github.com/milessabin/shapeless/wiki/Shapeless-with-SBT-Assembly-inside-Docker
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter {_.data.getName == "shapeless_2.11-2.3.3.jar"}
},
resolvers += Resolver.sonatypeRepo("releases"),
libraryDependencies ++= Seq(
scalaTest % Test,
@@ -46,5 +51,7 @@ lazy val events = project
"io.circe" %% "circe-parser" % "0.11.1",
"io.circe" %% "circe-generic" % "0.11.1",
"io.circe" %% "circe-generic-extras" % "0.11.1"
)
),
scalacOptions ++= Seq("-Xmax-classfile-name", "240"),
Compile / scalacOptions ++= Seq("-Xmax-classfile-name", "240")
)
Binary file not shown.
@@ -1,3 +1,4 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0-RC5")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0-RC5")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")

This file was deleted.

Oops, something went wrong.

0 comments on commit 9e8bf3a

Please sign in to comment.
You can’t perform that action at this time.