Skip to content

Commit

Permalink
Merge pull request #5 from dafreels/develop
Browse files Browse the repository at this point in the history
Documentation improvements and Build files
  • Loading branch information
dafreels committed Jan 24, 2019
2 parents a93af74 + a52db92 commit 0a0bbc6
Show file tree
Hide file tree
Showing 16 changed files with 1,088 additions and 929 deletions.
50 changes: 50 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
language: scala

jobs:
allow_failures:
- os: windows
include:
- stage: Build
os: linux
scala:
- 2.11.12
script: mvn clean install
-
os: osx
osx_image: xcode9.3
scala:
- 2.11.12
script: mvn clean install
-
os: windows
language: shell
before_script:
- choco install jdk8
- choco install maven
script: mvn clean install
- stage: Coverage
if: branch = develop
os: linux
scala:
- 2.11.12
script: mvn -DrepoToken=$coverallsToken clean scoverage:report install scala:doc scalastyle:check coveralls:report
- stage: Release
if: branch = master
os: linux
scala:
- 2.11.12
before_script:
- mvn versions:set -DremoveSnapshot
script: mvn -DrepoToken=$coverallsToken clean scoverage:report install scala:doc scalastyle:check coveralls:report
before_deploy:
- export project_version=$(mvn -q -Dexec.executable="echo" -Dexec.args='${project.version}' --non-recursive exec:exec)
deploy:
provider: releases
api_key: $GITHUB_OAUTH_TOKEN
name: ${project_version}
skip_cleanup: true
file_glob: true
file:
- common-pipeline-steps/target/common-pipeline-steps_2.11-${project_version}.jar
- spark-pipeline-engine/target/spark-pipeline-engine_2.11-${project_version}.jar
- streaming-pipeline-drivers/target/streaming-pipeline-drivers_2.11-${project_version}.jar
99 changes: 99 additions & 0 deletions common-pipeline-steps/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Common Pipeline Steps
The goal of this project is to provide a set of common steps to help application developers focus on building custom
steps and *DriverSetup* implementations. Any step accepted into this project must be annotated, provide a unit test,
provide api documentation for all private functions and update this readme to provide additional usage information.

## Steps
Here is a list of steps provided.

### JavascriptSteps
This step object provides a way for the application developer to define steps at runtime using the Javascript language
without the need for writing and compiling Scala steps. This should only be used for simple step constructs such as
branching steps or basic processing. Writing Javascript that interacts with Scala can be cumbersome, so two system
objects are provided:

* **pipelineContext** - The current *PipelineContext* is provided to allow access to current state. This object is read only.
* **ReflectionUtils** - This utility allows the script to extract values from Scala objects including *Options*.
* **userValue** - Optional user provided value.

There are two step functions provided:

#### Execute script
This step function will simply execute the script and return a *PipelineStepResponse*.

#### Execute script with value
This step function will execute the script making the value available io the script with the variable name **userValue**
and return a *PipelineStepResponse*.

#### Example
This example demonstrates how to read a file into a *DataFrame* using nothing but Javascript. This script assumes the
step function that takes a provided value is being used.

Before the script can do anything it needs to have some of the Spark static objects imported:

```javascript
var MetaData = Java.type('org.apache.spark.sql.types.Metadata');
var StructType = Java.type('org.apache.spark.sql.types.StructType');
var StructField = Java.type('org.apache.spark.sql.types.StructField');
var DataTypes = Java.type('org.apache.spark.sql.types.DataTypes');
```

Now the script can create the schema to be used for the DataFrame using the structures imported above:

```javascript
var schema = new StructType(new Array(
new StructField('id', DataTypes.LongType, true, MetaData.empty()),
new StructField('first_name', DataTypes.StringType, true, MetaData.empty()),
new StructField('last_name', DataTypes.StringType, true, MetaData.empty()),
new StructField('email', DataTypes.StringType, true, MetaData.empty()),
new StructField('gender', DataTypes.StringType, true, MetaData.empty()),
new StructField('ein', DataTypes.StringType, true, MetaData.empty()),
new StructField('postal_code', DataTypes.StringType, true, MetaData.empty())
));
```

Next the script needs to get a handle to the *SparkSession* and create a *DataFrameReader*:

```javascript
var sparkSession = pipelineContext.sparkSession().get();
var dfReader = sparkSession.read();
```

Tthe script can set the newly created schema on the reader, the separator character used by the file and disable
inferring the schema:

```javascript
dfReader = dfReader.schema(schema).option('sep', ',').option("inferSchema", false)
```

Finally the script informs the reader that the file has a header, sets the format to 'csv' and calls the load function
to create the *DataFrame*:

```javascript
dfReader.option("header", true).format('csv').load(userValue);
```

Note that the *return* keyword is not used. The final statement output is used as the return automatically.

Here is the full script:

```javascript
var MetaData = Java.type('org.apache.spark.sql.types.Metadata');
var StructType = Java.type('org.apache.spark.sql.types.StructType');
var StructField = Java.type('org.apache.spark.sql.types.StructField');
var DataTypes = Java.type('org.apache.spark.sql.types.DataTypes');
var schema = new StructType(new Array(
new StructField('id', DataTypes.LongType, true, MetaData.empty()),
new StructField('first_name', DataTypes.StringType, true, MetaData.empty()),
new StructField('last_name', DataTypes.StringType, true, MetaData.empty()),
new StructField('email', DataTypes.StringType, true, MetaData.empty()),
new StructField('gender', DataTypes.StringType, true, MetaData.empty()),
new StructField('ein', DataTypes.StringType, true, MetaData.empty()),
new StructField('postal_code', DataTypes.StringType, true, MetaData.empty())
));
var sparkSession = pipelineContext.sparkSession().get();
var dfReader = sparkSession.read();
dfReader = dfReader.schema(schema).option('sep', ',').option("inferSchema", false)
dfReader.option("header", true).format('csv').load(userValue);
```

Empty file removed docs/contributors-guide.md
Empty file.
43 changes: 24 additions & 19 deletions pipeline-drivers-examples/docs/kinesis-streaming-example.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
# Simple Streaming Example (Kinesis)
The [**SimpleKinesisDriverSetup**](../src/main/scala/com/acxiom/pipeline/SimpleKinesisDriverSetup.scala) is very similar to the [**SimpleDataDriverSetup**](../src/main/scala/com/acxiom/pipeline/SimpleDataDriverSetup.scala) driver that is covered in detail in the [main Example README](../readme.md). However, at execution time we will use the *com.acxiom.pipeline.drivers.KinesisPipelineDriver* driver. This will monitor a Kinesis stream and execute the specified pipeline for every batch of records that comes through. The defined pipeline will count the number of records in each dataframe, then write them out to disk, continually appending to the same file.
The [**SimpleKinesisDriverSetup**](../src/main/scala/com/acxiom/pipeline/SimpleKinesisDriverSetup.scala) is very similar
to the [**SimpleDataDriverSetup**](../src/main/scala/com/acxiom/pipeline/SimpleDataDriverSetup.scala) example that is
covered in detail on the main example page [here](../readme.md). The difference is the use of the
*com.acxiom.pipeline.drivers.KinesisPipelineDriver* driver. This will monitor a Kinesis stream and execute the
specified pipeline for every batch of records that comes through. The defined pipeline will count the number of records
in each *DataFrame*, then write them out to disk, continually appending to the same file.

### Grouping Steps
One extra grouping step was added that simply returns the number of records in a streaming dataframe.
One extra grouping step was added that simply returns the number of records in a streaming *DataFrame*.

### Running
The code will need to be packaged as an uber jar (the example project does this automatically when package is called) that
contains all of the dependencies. Once this is done, place the jar in a location that can be read by Spark.
The code will need to be packaged as an 'uber-jar' (the example project does this automatically when package is called)
that contains all of the dependencies. Once this is done, place the jar in a location that can be read by Spark.

Submit a job:

```
spark-submit --class com.acxiom.pipeline.drivers.KinesisPipelineDriver \
--master spark://localhost:7077 \
--deploy-mode client \
--jars <jar_path/spark-pipeline-engine_2.11-<VERSION>.jar,<jar_path/streaming-pipeline-drivers_2.11-<VERSION>.jar <jar_path>/pipeline-drivers-examples_2.11-<VERSION>.jar \
--driverSetupClass com.acxiom.pipeline.SimpleKinesisDriverSetup \
--appName <Application name> \
--streamName <Stream name> \
--endPointURL <Endpoint URL. EG : kinesis.us-east-1.amazonaws.com> \
--regionName <Region. EG : us-east-1> \
--awsAccessKey <AWS Access Key> \
--awsAccessSecret <AWS Access Secret> \
--duration <Integer duration to collect each frame (in seconds)> \
--output_url <location to write the JSON file>
```
```
spark-submit --class com.acxiom.pipeline.drivers.KinesisPipelineDriver \
--master spark://localhost:7077 \
--deploy-mode client \
--jars <jar_path/spark-pipeline-engine_2.11-<VERSION>.jar,<jar_path/streaming-pipeline-drivers_2.11-<VERSION>.jar <jar_path>/pipeline-drivers-examples_2.11-<VERSION>.jar \
--driverSetupClass com.acxiom.pipeline.SimpleKinesisDriverSetup \
--appName <Application name> \
--streamName <Stream name> \
--endPointURL <Endpoint URL. EG : kinesis.us-east-1.amazonaws.com> \
--regionName <Region. EG : us-east-1> \
--awsAccessKey <AWS Access Key> \
--awsAccessSecret <AWS Access Secret> \
--duration <Integer duration to collect each frame (in seconds)> \
--output_url <location to write the JSON file>
```

0 comments on commit 0a0bbc6

Please sign in to comment.