Skip to content

Commit

Permalink
Merge pull request #270 from dafreels/develop
Browse files Browse the repository at this point in the history
#266 Updated data connectors after stream testing.
  • Loading branch information
dafreels committed Nov 9, 2021
2 parents a784aeb + cb8e505 commit b9f3681
Show file tree
Hide file tree
Showing 20 changed files with 916 additions and 86 deletions.
6 changes: 5 additions & 1 deletion docs/applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The _DefaultPipelineDriver_ class requires an additional parameter named ```driv

This is an extension point where developers may customize how applications are loaded.

## Applicaton Driver Setup
## Application Driver Setup
The Metalus application framework provides a default implementation that may be used:
```com.acxiom.pipeline.applications.DefaultApplicationDriverSetup```

Expand Down Expand Up @@ -90,6 +90,10 @@ By default, library will attempt to instantiate _case classes_ with the values s
To serialize more complicated objects with _traits_ and _Enumerations_, custom serializers can be provided using the
_json4sSerializers_ object. More on custom serializers can be found [here](serialization.md).

In addition to _className_ and _object_, the _mapEmbeddedVariables_ attribute in the object will indicate that the
map should parse for embedded values using the command line parameters. These values should start with thee **!**
character.

#### Arrays
Values of a global entry may be an array of any of the supported types except array. When embedding objects, refer to
the object section above.
Expand Down
3 changes: 3 additions & 0 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* [Flow Control](flow-control.md)
* [Fork/Join](fork-join.md)
* [Split/Merge](split-merge.md)
* [Streaming Monitor Step](../metalus-common/docs/flowutilssteps.md#streaming-monitor)
* [Streaming Query Monitors](../metalus-common/docs/streamingquerymonitor.md)
* [Parameter Mapping](parameter-mapping.md)
* [Reserved Globals](parameter-mapping.md#reserved-globals)
* [Execution and Data Flow](pipeline-flow.md)
Expand All @@ -34,6 +36,7 @@
* [JavascriptSteps](../metalus-common/docs/javascriptsteps.md)
* [JDBCSteps](../metalus-common/docs/jdbcsteps.md)
* [JSONSteps](../metalus-common/docs/jsonsteps.md)
* [LoggingSteps](../metalus-common/docs/loggingsteps.md)
* [QuerySteps](../metalus-common/docs/querysteps.md)
* [ScalaSteps](../metalus-common/docs/scalascriptsteps.md)
* [SFTPSteps](../metalus-common/docs/sftpsteps.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ case class S3DataConnector(override val name: String,
val path = destination.getOrElse("")
setSecurity(pipelineContext, path)
if (dataFrame.isStreaming) {
Some(dataFrame.writeStream
.format(writeOptions.format)
.option("path", destination.getOrElse(""))
.options(writeOptions.options.getOrElse(Map[String, String]()))
.start())
Some(DataConnectorUtilities.buildDataStreamWriter(dataFrame, writeOptions, path).start())
} else {
DataConnectorUtilities.buildDataFrameWriter(dataFrame, writeOptions)
.save(S3Utilities.replaceProtocol(path, S3Utilities.deriveProtocol(path)))
Expand Down
12 changes: 12 additions & 0 deletions metalus-common/docs/flowutilssteps.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@ which provide a mechanism for retrying a single step when an exception occurs.
### Results
* **retry** - This branch is used to transition to the step that needs to be retried.
* **stop** - This branch is used to call the step when retries have been exhausted.

## Streaming Monitor
This step provides a mechanism to monitor a streaming query in a pipeline. This step can safely be called without
providing the streaming query and the _stop_ action will be taken. The [StreamingQueryMonitor](streamingquerymonitor.md) framework provides an
integration point for controlling how this step handles the streaming query.

### Parameters
* **query** - The streaming query to monitor.
* **streamingMonitorClassName** - Fully qualified classname of the monitor class. The [default monitor class](streamingquerymonitor.md#basestreamingquerymonitor) never stops the query.
### Results
* **continue** - This branch is used to allow restarting the streaming query.
* **stop** - This branch is used to call the step when streaming should not continue.
5 changes: 5 additions & 0 deletions metalus-common/docs/loadtobronze.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ column names, add a unique record id and a static file id column.
* **destinationBronzeConnector** - The data connector to use as the destination.
* **destinationBronzePath** - The path to write the data.
* **destinationBronzeWriteOptions** - The [options](dataoptions.md#dataframe-writer-options) to use during the write.

## Streaming
This pipeline can be used with streaming connectors. By default, if a streaming connector is provided as the load connector,
the job will run indefinitely. The [Streaming Query Monitor](streamingquerymonitor.md) provides additional options for
writing [partitioned data](streamingquerymonitor.md#batchpartitionedstreamingquerymonitor-_comacxiompipelinestreamingbatchpartitionedstreamingquerymonitor_).
8 changes: 8 additions & 0 deletions metalus-common/docs/loggingsteps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Documentation Home](../../docs/readme.md) | [Common Home](../readme.md)

# Logging Steps
LoggingSteps provides a means to log message during the execution of a pipeline.
## Log Message
The log message step provides a simple mechanism for logging a message. Full parameter descriptions are listed below:
* **message** - The message to log.
* **level** - Log level at which to log. Should be a valid log4j level.
40 changes: 40 additions & 0 deletions metalus-common/docs/streamingquerymonitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[Documentation Home](../../docs/readme.md) | [Common Home](../readme.md)

# Streaming Query Monitor
Streaming Query Monitors provide a method for interacting with a running Spark StreamingQuery object. Implementations
can be created to perform different types of monitoring, control whether the query is stopped, whether to continue and
provide a map of variables that will be placed on the globals for the next step. A [step is provided](flowutilssteps.md#streaming-monitor) which
is designed to use the monitors and provide a decision to _continue_ or _stop_.
## BaseStreamingQueryMonitor
This implementation doesn't do anything but allow continuous streaming. The query doesn't actually get monitored or stopped.
## BatchWriteStreamingQueryMonitor
This abstract class provides a base implementation for streaming queries that need to write to disk. Basic status
checking is provided and abstract functions that can be overridden to control behavior. This class uses the following
globals to determine behavior:
* **STREAMING_BATCH_MONITOR_TYPE** - Either _duration_ or _count_.
* **STREAMING_BATCH_MONITOR_DURATION** - A number representing the time amount
* **STREAMING_BATCH_MONITOR_DURATION_TYPE** - Either _milliseconds_, _seconds_ or _minutes_.
* **STREAMING_BATCH_MONITOR_COUNT** - A number representing the approximate number of records to process before stopping the query.
## BatchPartitionedStreamingQueryMonitor _(com.acxiom.pipeline.streaming.BatchPartitionedStreamingQueryMonitor)_
This implementation will process until the limit is reached and then stop the query. The query should continue and a new
partition value will be provided on the globals. The partition value will be pushed to the global specified using the
_STREAMING_BATCH_PARTITION_GLOBAL_ global. **Note:** When using this monitor, the _checkpointLocation_ must be specified
so that a single location is used throughout.

* **STREAMING_BATCH_PARTITION_COUNTER** - Track the number of times this has been invoked.
* **STREAMING_BATCH_PARTITION_GLOBAL** - The name of the global key that contains the name of the partition value.
* **STREAMING_BATCH_PARTITION_TEMPLATE** - Indicates whether to use a _counter_ or _date_.

The date sting will be formatted using this template: _yyyy-dd-MM HH:mm:ssZ_
## (Experimental) BatchFileStreamingQueryMonitor _(com.acxiom.pipeline.streaming.BatchFileStreamingQueryMonitor)_
This implementation will process until the limit is reached and then stop the query. The query should continue and a new
destination value will be provided on the globals. The destination will be modified by appending an underscore and the
template value (counter or date). The destination global will then be updated with the new value.

The following parameters are required to use this monitor:
* **STREAMING_BATCH_OUTPUT_PATH_KEY** - Item in the path that needs to be tagged.
* **STREAMING_BATCH_OUTPUT_GLOBAL** - Provides the global key to get the path being used for output.
* **STREAMING_BATCH_OUTPUT_TEMPLATE** - Indicates whether to use a _counter_ or _date_.
* **STREAMING_BATCH_OUTPUT_COUNTER** - Track the number of times this has been invoked.

The date sting will be formatted using this template: _yyyy_dd_MM_HH_mm_ss_SSS_
6 changes: 6 additions & 0 deletions metalus-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
<version>${parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.apache.derby</groupId>
Expand Down

0 comments on commit b9f3681

Please sign in to comment.