Skip to content

Commit

Permalink
Merge branch 'release/r114-polonnaruwa'
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed May 17, 2019
2 parents dd8eec2 + d9f8ed0 commit 6c294da
Show file tree
Hide file tree
Showing 69 changed files with 2,456 additions and 322 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ services:
- docker
- postgresql
before_deploy:
- pip install --user release-manager==0.3.0
- pip install --user release-manager==0.4.1
# necessary for the deploy section to work, see https://github.com/snowplow/snowplow/issues/3691
- rvm install 2.4.3
- rvm --default use 2.4.3
Expand Down
138 changes: 138 additions & 0 deletions 3-enrich/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,144 @@ The **Enrich** process takes raw Snowplow events logged by a [Collector][collect
| [scala-common-enrich][e4] | A shared library for processing raw Snowplow events, used in (1) and (3) | Production-ready |
| [emr-etl-runner][e5] | A Ruby app for running (1) and (2) on Amazon Elastic MapReduce | Production-ready |

## How to add an enrichment?

5 things to update:
1. [iglu-central](https://github.com/snowplow/iglu-central): holds the JSON schema(s) required for the enrichments.
2. [scala-common-enrich](./scala-common-enrich/): library containing all the enrichments. This library is then used in enrichment jobs like [spark-enrich](./spark-enrich/), [stream-enrich](./stream-enrich/) or [beam-enrich](./beam-enrich/).
3. [spark-enrich](./spark-enrich/): holds the integration tests.
4. [config/](./config/enrichments/): holds one example of JSON configuration for each enrichment.
5. [Wiki](https://github.com/snowplow/snowplow/wiki/configurable-enrichments): contains documentation for each enrichment (how to configure it, the fields that it adds, etc).

### 1. Iglu

Files to create:
- If the new enrichment requires some configuration, JSON schema of this configuration. Examples can be found [here](https://github.com/snowplow/iglu-central/tree/master/schemas/com.snowplowanalytics.snowplow.enrichments/). `vendor`, `name`, `format`, `version` will be reused in `scala-common-enrich`, as well as the parameters' names when parsing the conf.
- JSON schema of the the context added by the enrichment. An example can be found [here](https://github.com/snowplow/iglu-central/tree/master/schemas/com.iab.snowplow/spiders_and_robots/jsonschema/1-0-0). `vendor`, `name`, `format`, `version` will be added to context data to create a self-describing JSON. The enrichment process will check that the context added by the enrichment is valid for this schema.
2 more files need to be generated for the context, with `igluctl static generate --with-json-paths <contextSchemaPath>` (`igluctl` can be found [here](https://docs.snowplowanalytics.com/open-source/iglu/igluctl/)):
- DDL ([examples](https://github.com/snowplow/iglu-central/tree/master/sql/)): used to create a table in Redshift to store the context of the enrichment.
- JSON paths ([examples](https://github.com/snowplow/iglu-central/tree/master/jsonpaths/)): used to order the fields of the JSON in the same way that they are in the DDL (because a JSON is not ordered).

### 2. scala-common-enrich

#### 2.a. File with the new enrichment

This file should be created in [registry/](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/).

It should contain 2 things:
1) Case class that extends `Enrichment` ([here](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/enrichments.scala)) and that holds the logic of the enrichment.
This class has a function (e.g. `getContext`) that expects parameters from the raw event and returns the result of the enrichment, in a JSON holding the data as well as the name of the schema for these data ([self-describing JSON](https://snowplowanalytics.com/blog/2014/05/15/introducing-self-describing-jsons/)).
2) Companion object that extends `ParseableEnrichment` ([here](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/enrichments.scala)) and has a function (e.g. `parse`) that can create an instance of the enrichment class from the configuration.

An example can be found [here](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/YauaaEnrichment.scala).

The unit tests for this example can be found [here](./scala-common-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/YauaaEnrichmentSpec.scala). The purpose of the unit tests is to make sure that:
- The functions used in the enrichment are working as expected.
- The enrichment can be correctly instanciated from the configuration.
- The self-describing JSON returned by the enrichment is correctly formatted and with the correct values.

#### 2.b. [EnrichmentRegistry](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala)

This class instanciates the enrichments based on the configuration and holds a map `enrichment name -> enrichment instance`.

2 things to add:
1) In the method `buildEnrichmentConfig` of the companion object, add a case for the new enrichment and call the function previously created in the companion object of the enrichment.
```scala
case "my_enrichment_config" =>
MyEnrichment.parse(enrichmentConfig, schemaKey).map((nm, _).some)
```
This instanciates the enrichment and puts it in the registry, if a configuration exists for the enrichment.

2) In `EnrichmentRegistry` case class, create function `getMyEnrichment`:
```scala
def getMyEnrichment: Option[MyEnrichment] =
getEnrichment[MyEnrichment]("my_enrichment_config")
```
This function returns the instance of the enrichment from the registry.

`"my_enrichment_config"` should be the `name` field of the JSON schema for the configuration of the enrichment.

#### 2.c. [EnrichmentManager](./scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala)

This is where the events are actually enriched, in the `enrichEvent` function.

3 things to do:
1) Get the result of the enrichment (if instanciated in the registry).
```scala
val myEnrichmentContext = registry.getMyEnrichment match {
case Some(myEnrichment) =>
myEnrichment
.getContext(event.usergent, event.otherparams)
.map(_.some)
case None => None.success
}
```
2) Add the result (if any) to the derived contexts:
```scala
val preparedDerivedContexts =
...
++ List(myEnrichmentContext).collect {
case Success(Some(context)) => context
}
++ ...
```
3) Fail the event (will be sent to bad rows) if the enrichment didn't work:
```scala
val third =
...
myEnrichmentContext.toValidationNel |@|
...
```

### 3. spark-enrich

The integration tests for the new enrichment need to be added in this project.
The purpose of these tests is to make sure that the results of the new enrichment are correctly added to the derived contexts of the enriched event.

An example can be found [here](./spark-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.spark/good/YauaaEnrichmentCfLineSpec.scala).

The idea is to create lines as they would be written by a collector (e.g. [CloudFront](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat)),
and then to run an enrich job on these lines, with the new enrichment enabled.
It's then possible to check in the output of the job that the derived contexts of the enriched events contain the output of the new enrichment, with the correct values.

To enable the new enrichment in the test job, the function `getEnrichments`
in [EnrichJobSpec](./spark-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobSpec.scala)
needs to be updated with a new parameter saying if the enrichment should be enabled
and with the JSON configuration of the enrichment.

**What if the pull request on `iglu-central` with the schemas for the new enrichment has not been merged yet?**

It's still possible to use the new schemas in the integration tests.
When a new PR is created on `iglu-central`, a new registry is created at this address:
`http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/<branch_of_PR>`,
with all the existing schemas and the new ones.

This registry needs to be added to the list of Iglu resolvers used in the tests.
In [EnrichJobSpec](./spark-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.spark/EnrichJobSpec.scala) object,
add the registry in the value `igluCentralConfig`:
```scala
|{
|"name": "Iglu with MyEnrichment schemas",
|"priority": 1,
|"vendorPrefixes": [ "com.snowplowanalytics" ],
|"connection": {
|"http": {
|"uri": "http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/<branch_of_PR>"
|}
|}
|}
```

### 4. config

Add example for the JSON configuration of the new enrichment.

### 5. Wiki

Add detailed page for the new enrichment.

--------------------------

## Find out more

| Technical Docs | Setup Guide | Roadmap & Contributing |
Expand Down
4 changes: 2 additions & 2 deletions 3-enrich/beam-enrich/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import Tests._

lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
organization := "com.snowplowanalytics",
version := "0.2.0",
version := "0.3.0",
scalaVersion := "2.11.12",
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
scalacOptions ++= compilerOptions,
Expand Down Expand Up @@ -68,7 +68,7 @@ daemonUser in Docker := "snowplow"

lazy val scioVersion = "0.6.0"
lazy val beamVersion = "2.5.0"
lazy val sceVersion = "0.36.0"
lazy val sceVersion = "0.37.0"
lazy val scalaMacrosVersion = "2.1.0"
lazy val slf4jVersion = "1.7.25"
lazy val scalatestVersion = "3.0.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scalaz._
import Scalaz._

import common.EtlPipeline
import common.adapters.AdapterRegistry
import common.enrichments.EnrichmentRegistry
import common.loaders.ThriftLoader
import common.outputs.{EnrichedEvent, BadRow}
Expand Down Expand Up @@ -202,6 +203,7 @@ object Enrich {
implicit r: Resolver): List[Validation[BadRow, EnrichedEvent]] = {
val collectorPayload = ThriftLoader.toCollectorPayload(data)
val processedEvents = EtlPipeline.processEvents(
new AdapterRegistry,
enrichmentRegistry,
s"beam-enrich-${generated.BuildInfo.version}",
new DateTime(System.currentTimeMillis),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,5 @@ object singleton {
}
instance
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class EnrichWithLocalFileSpec extends PipelineSpec {
"L",
"Dublin",
"D02",
"53.3331",
"-6.2489",
"53.3338", //!\ after an update of MaxMind database this coordinate might change
"-6.2488", //!\ after an update of MaxMind database this coordinate might change
"Leinster",
"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/screen_view/jsonschema/1-0-0","data":{"name":"hello from Snowplow"}}}""",
"curl/7.50.3",
Expand All @@ -51,7 +51,7 @@ class EnrichWithLocalFileSpec extends PipelineSpec {
"1-0-0"
)

"Enrich" should "enrich a unstruct event with geo ip information" in {
"Enrich" should "enrich a unstruct event with geo ip information (if failure, check coordinates)" in {
downloadLocalEnrichmentFile(
"http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind/GeoLite2-City.mmdb",
"./ip_geo"
Expand Down
9 changes: 9 additions & 0 deletions 3-enrich/config/enrichments/yauaa_enrichment_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow.enrichments/yauaa_enrichment_config/jsonschema/1-0-0",

"data": {
"enabled": true,
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "yauaa_enrichment_config"
}
}
2 changes: 1 addition & 1 deletion 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@
module Snowplow
module EmrEtlRunner
NAME = "snowplow-emr-etl-runner"
VERSION = "0.34.1"
VERSION = "0.34.2"
end
end
2 changes: 1 addition & 1 deletion 3-enrich/emr-etl-runner/lib/snowplow-emr-etl-runner/emr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def list_clusters(client, marker)
client.list_clusters(options)
rescue Elasticity::ThrottlingException, RestClient::RequestTimeout, RestClient::InternalServerError, RestClient::ServiceUnavailable, RestClient::SSLCertificateNotVerified
retries += 1
sleep(2 ** retries * 1)
sleep(2 ** retries + 30)
retry if retries < 3
end
end
Expand Down

0 comments on commit 6c294da

Please sign in to comment.