Skip to content

Commit

Permalink
Merge pull request #4099 from snowplow/release/r116-madara-rider
Browse files Browse the repository at this point in the history
Release 116
  • Loading branch information
benjben committed Sep 13, 2019
2 parents 07a4ef8 + 6078f16 commit 69ff4de
Show file tree
Hide file tree
Showing 20 changed files with 360 additions and 51 deletions.
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ env:
- secure: Pav87F57y6QVUs5/NMiH8sTa3u7UmCeQIgtLIRJprj4CFN6yBqnVXxF8VJ5ozVUikgJlEf4L7KagX+MTyxImId017/4In4A2vxql0wCT2R4qZ4pu4zEWF1gYm+ZqRB9llPrm85CjI5j1MYslrb4aglyi4l3iITksDI6RUXrHazM=
# BINTRAY_SNOWPLOW_DOCKER_API_KEY
- secure: LXoplmEfqUpjSD95jwOIysCJBn43OgUaZDvncX+a9LSGpSv8EenvJSdzVKEpywYPGdt0jAi/cKMRYMDjtms+DW4TXWkm+2qHGcWLG2SBiKJyQe486zNCdU/JpP24wXZ4FV+iK8d8oE9kSLi+Pi+hZvD9p1BKBTXP8IYpjoIYsP8=
# DOCKER_USERNAME
- secure: raLw23n0ruhIPOprVsq+AR6CcQZVqEXcfOBVGsQ1UfcP00gsxSP2H0HbCtHLTqonNSDvG6aZPsg2KgiB9oxoe+1TZAhSqYycPh6WBriTl+6dWHgeog9yVXxYWK4VlPeIvJNenkO53Cn8j/jVPrTNY9tdmLNtAwbrPfepfXwIBdk=
# DOCKER_PASSWORD
- secure: UhleYVVkQKnw8OfZjagQqLzHgV7pUXD6qYQfa2iOsIR9oazFmgbsbiPwThzAITcyP+USFXq8I2xMVN6mjPo0EzzPN4yzOx8a+ZVK/4Q37hGt210JbfOcgrlIxTIdTTEujcwcZRuv9L303ypL0LNe/0n0mF3KmYH5PSiIc+4qKro=
matrix:
- TEST_DIR=2-collectors/scala-stream-collector/
- TEST_DIR=3-enrich/scala-common-enrich/
Expand All @@ -63,6 +67,7 @@ before_install:
- rvm install jruby-9.1.6.0
- gem uninstall -i /home/travis/.rvm/gems/jruby-9.1.6.0 bundler -a -x
- gem install bundler -v 1.15.4
- docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD
addons:
postgresql: '9.3'
services:
Expand All @@ -81,7 +86,7 @@ deploy:
tags: true
condition: '$([[ "$TEST_DIR" == "3-enrich/scala-common-enrich/" ]] && .travis/is_release_tag.sh scala_common_enrich $TRAVIS_TAG && [ $? -eq 0 ] && echo "Deploying")'
- provider: script
script: ./.travis/deploy.sh scala_stream_collector $TRAVIS_TAG
script: cd 2-collectors/scala-stream-collector/ && sbt "project kinesis" docker:publish && sbt "project pubsub" docker:publish && sbt "project kafka" docker:publish && sbt "project nsq" docker:publish && cd ../.. && ./.travis/deploy.sh scala_stream_collector $TRAVIS_TAG
skip_cleanup: true
on:
tags: true
Expand Down
22 changes: 20 additions & 2 deletions 2-collectors/scala-stream-collector/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* governing permissions and limitations there under.
*/

import com.typesafe.sbt.packager.docker._

lazy val commonDependencies = Seq(
// Java
Dependencies.Libraries.yodaTime,
Expand All @@ -38,7 +40,7 @@ lazy val commonDependencies = Seq(
lazy val buildSettings = Seq(
organization := "com.snowplowanalytics",
name := "snowplow-stream-collector",
version := "0.15.0",
version := "0.16.0",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.11.11",
scalacOptions := BuildSettings.compilerOptions,
Expand All @@ -48,9 +50,17 @@ lazy val buildSettings = Seq(
resolvers ++= Dependencies.resolutionRepos
)

lazy val dockerSettings = Seq(
maintainer in Docker := "Snowplow Analytics Ltd. <support@snowplowanalytics.com>",
dockerBaseImage := "snowplow-docker-registry.bintray.io/snowplow/base-debian:0.1.0",
daemonUser in Docker := "snowplow",
dockerUpdateLatest := true
)

lazy val allSettings = buildSettings ++
BuildSettings.sbtAssemblySettings ++
Seq(libraryDependencies ++= commonDependencies)
Seq(libraryDependencies ++= commonDependencies) ++
dockerSettings

lazy val root = project.in(file("."))
.settings(buildSettings)
Expand All @@ -69,25 +79,33 @@ lazy val core = project
lazy val kinesis = project
.settings(moduleName := "snowplow-stream-collector-kinesis")
.settings(allSettings)
.settings(packageName in Docker := "snowplow/scala-stream-collector-kinesis")
.settings(libraryDependencies ++= Seq(Dependencies.Libraries.kinesis))
.enablePlugins(JavaAppPackaging, DockerPlugin)
.dependsOn(core)

lazy val pubsub = project
.settings(moduleName := "snowplow-stream-collector-google-pubsub")
.settings(allSettings)
.settings(packageName in Docker := "snowplow/scala-stream-collector-pubsub")
.settings(libraryDependencies ++= Seq(Dependencies.Libraries.pubsub))
.enablePlugins(JavaAppPackaging, DockerPlugin)
.dependsOn(core)

lazy val kafka = project
.settings(moduleName := "snowplow-stream-collector-kafka")
.settings(allSettings)
.settings(packageName in Docker := "snowplow/scala-stream-collector-kafka")
.settings(libraryDependencies ++= Seq(Dependencies.Libraries.kafkaClients))
.enablePlugins(JavaAppPackaging, DockerPlugin)
.dependsOn(core)

lazy val nsq = project
.settings(moduleName := "snowplow-stream-collector-nsq")
.settings(allSettings)
.settings(packageName in Docker := "snowplow/scala-stream-collector-nsq")
.settings(libraryDependencies ++= Seq(Dependencies.Libraries.nsqClient))
.enablePlugins(JavaAppPackaging, DockerPlugin)
.dependsOn(core)

lazy val stdout = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import akka.http.scaladsl.model.{ContentType, HttpResponse, StatusCode, StatusCodes}
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpCookiePair
import akka.http.scaladsl.server.{Directive1, Route}
import akka.http.scaladsl.server.Directives._
import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatcher

import monitoring.BeanRegistry

trait CollectorRoute {
Expand All @@ -43,7 +42,7 @@ trait CollectorRoute {
extractors { (host, ip, request) =>
// get the adapter vendor and version from the path
path(Segment / Segment) { (vendor, version) =>
val path = s"/$vendor/$version"
val path = collectorService.determinePath(vendor, version)
post {
extractContentType { ct =>
entity(as[String]) { body =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
package com.snowplowanalytics.snowplow
package collectors.scalastream


import java.util.UUID
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._

import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.headers.CacheDirectives._
import org.apache.commons.codec.binary.Base64
import org.slf4j.LoggerFactory
import scalaz._

import CollectorPayload.thrift.model1.CollectorPayload
Expand Down Expand Up @@ -55,6 +57,7 @@ trait Service {
): (HttpResponse, List[Array[Byte]])
def cookieName: Option[String]
def doNotTrackCookie: Option[DntCookieMatcher]
def determinePath(vendor: String, version: String): String
}

object CollectorService {
Expand All @@ -76,6 +79,15 @@ class CollectorService(
override val cookieName = config.cookieName
override val doNotTrackCookie = config.doNotTrackHttpCookie

/**
* Determines the path to be used in the response,
* based on whether a mapping can be found in the config for the original request path.
*/
override def determinePath(vendor: String, version: String): String = {
val original = s"/$vendor/$version"
config.paths.getOrElse(original, original)
}

override def cookie(
queryString: Option[String],
body: Option[String],
Expand Down Expand Up @@ -117,7 +129,8 @@ class CollectorService(
request,
config.cookieBounce,
bounce) ++
cookieHeader(config.cookieConfig, nuid, doNotTrack) ++
cookieHeader(request, config.cookieConfig, nuid, doNotTrack) ++
cacheControl(pixelExpected) ++
List(
RawHeader("P3P", "policyref=\"%s\", CP=\"%s\"".format(config.p3p.policyRef, config.p3p.CP)),
accessControlAllowOriginHeader(request),
Expand Down Expand Up @@ -280,6 +293,7 @@ class CollectorService(
* @return the build cookie wrapped in a header
*/
def cookieHeader(
request: HttpRequest,
cookieConfig: Option[CookieConfig],
networkUserId: String,
doNotTrack: Boolean
Expand All @@ -292,8 +306,11 @@ class CollectorService(
name = config.name,
value = networkUserId,
expires = Some(DateTime.now + config.expiration.toMillis),
domain = config.domain,
path = Some("/")
domain = cookieDomain(request.headers, config.domains, config.fallbackDomain),
path = Some("/"),
secure = config.secure,
httpOnly = config.httpOnly,
extension = config.sameSite.map(value => s"SameSite=$value")
)
`Set-Cookie`(responseCookie)
}
Expand Down Expand Up @@ -330,12 +347,50 @@ class CollectorService(
None
}

/** Retrieves all headers from the request except Remote-Address and Raw-Requet-URI */
/** Retrieves all headers from the request except Remote-Address and Raw-Request-URI */
def headers(request: HttpRequest): Seq[String] = request.headers.flatMap {
case _: `Remote-Address` | _: `Raw-Request-URI` => None
case other => Some(other.toString)
}

/** If the pixel is requested, this attaches cache control headers to the response to prevent any caching. */
def cacheControl(pixelExpected: Boolean): List[`Cache-Control`] =
if (pixelExpected) List(`Cache-Control`(`no-cache`, `no-store`, `must-revalidate`))
else Nil

/**
* Determines the cookie domain to be used by inspecting the Origin header of the request
* and trying to find a match in the list of domains specified in the config file.
* @param headers The headers from the http request.
* @param domains The list of cookie domains from the configuration.
* @param fallbackDomain The fallback domain from the configuration.
* @return The domain to be sent back in the response, unless no cookie domains are configured.
* The Origin header may include multiple domains. The first matching domain is returned.
* If no match is found, the fallback domain is used if configured. Otherwise, the cookie domain is not set.
*/
def cookieDomain(headers: Seq[HttpHeader], domains: Option[List[String]], fallbackDomain: Option[String]): Option[String] =
(for {
domainList <- domains
origins <- headers.collectFirst { case header: `Origin` => header.origins }
originHosts = extractHosts(origins)
domainToUse <- domainList.find(domain => originHosts.exists(validMatch(_, domain)))
} yield domainToUse).orElse(fallbackDomain)

/** Extracts the host names from a list of values in the request's Origin header. */
def extractHosts(origins: Seq[HttpOrigin]): Seq[String] =
origins.map(origin => origin.host.host.address())

/**
* Ensures a match is valid.
* We only want matches where:
* a.) the Origin host is exactly equal to the cookie domain from the config
* b.) the Origin host is a subdomain of the cookie domain from the config.
* But we want to avoid cases where the cookie domain from the config is randomly
* a substring of the Origin host, without any connection between them.
*/
def validMatch(host: String, domain: String): Boolean =
host == domain || host.endsWith("." + domain)

/**
* Gets the IP from a RemoteAddress. If ipAsPartitionKey is false, a UUID will be generated.
* @param remoteAddress Address extracted from an HTTP request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ package model {
enabled: Boolean,
name: String,
expiration: FiniteDuration,
domain: Option[String]
domains: Option[List[String]],
fallbackDomain: Option[String],
secure: Boolean,
httpOnly: Boolean,
sameSite: Option[String]
)
final case class DoNotTrackCookieConfig(
enabled: Boolean,
Expand Down Expand Up @@ -126,6 +130,7 @@ package model {
final case class CollectorConfig(
interface: String,
port: Int,
paths: Map[String, String],
p3p: P3PConfig,
crossDomain: CrossDomainConfig,
cookie: CookieConfig,
Expand All @@ -145,7 +150,8 @@ package model {
None

def cookieName = cookieConfig.map(_.name)
def cookieDomain = cookieConfig.flatMap(_.domain)
def cookieDomain = cookieConfig.flatMap(_.domains)
def fallbackDomain = cookieConfig.flatMap(_.fallbackDomain)
def cookieExpiration = cookieConfig.map(_.expiration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CollectorRouteSpec extends Specification with Specs2RouteTest {
): (HttpResponse, List[Array[Byte]]) = (HttpResponse(200, entity = s"cookie"), List.empty)
def cookieName: Option[String] = Some("name")
def doNotTrackCookie: Option[DntCookieMatcher] = None
def determinePath(vendor: String, version: String): String = "/p1/p2"
}
}

Expand Down

0 comments on commit 69ff4de

Please sign in to comment.