Skip to content

Commit

Permalink
Merge pull request #4218 from snowplow/release/r117-biskupin
Browse files Browse the repository at this point in the history
Release R117 Biskupin
  • Loading branch information
chuwy committed Dec 3, 2019
2 parents 69ff4de + 826f227 commit 95f938f
Show file tree
Hide file tree
Showing 41 changed files with 1,331 additions and 138 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist: trusty
language: scala
scala:
- 2.11.12
Expand All @@ -12,6 +13,7 @@ script:
- python integration-tests/api-lookup-test.py &
- cd 3-enrich/scala-common-enrich && sbt publishLocal && cd ../..
- cd $TEST_DIR && sbt test && cd ../..
- (test "${TEST_DIR}" == "2-collectors/scala-stream-collector/" && cd $TRAVIS_BUILD_DIR/integration-tests && bash -c ./ssc-ssl-test.sh && cd -) || true
env:
global:
# OER_KEY
Expand Down Expand Up @@ -92,7 +94,7 @@ deploy:
tags: true
condition: '$([[ "$TEST_DIR" == "2-collectors/scala-stream-collector/" ]] && .travis/is_release_tag.sh scala_stream_collector $TRAVIS_TAG && [ $? -eq 0 ] && echo "Deploying")'
- provider: script
script: ./.travis/deploy.sh stream_enrich $TRAVIS_TAG
script: cd 3-enrich/stream-enrich/ && sbt "project kinesis" docker:publish && sbt "project kafka" docker:publish && sbt "project nsq" docker:publish && cd ../.. && ./.travis/deploy.sh stream_enrich $TRAVIS_TAG
skip_cleanup: true
on:
tags: true
Expand All @@ -104,7 +106,7 @@ deploy:
tags: true
condition: '$([[ "$TEST_DIR" == "3-enrich/spark-enrich/" ]] && .travis/is_release_tag.sh spark_enrich $TRAVIS_TAG && [ $? -eq 0 ] && echo "Deploying")'
- provider: script
script: ./.travis/deploy.sh beam_enrich $TRAVIS_TAG && ./.travis/deploy_docker.sh $TRAVIS_TAG
script: cd 3-enrich/beam-enrich/ && sbt docker:publish && cd ../../ && ./.travis/deploy.sh beam_enrich $TRAVIS_TAG
skip_cleanup: true
on:
tags: true
Expand Down
22 changes: 0 additions & 22 deletions .travis/deploy_docker.sh

This file was deleted.

7 changes: 4 additions & 3 deletions 2-collectors/scala-stream-collector/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

import com.typesafe.sbt.packager.docker._

lazy val commonDependencies = Seq(
Expand All @@ -26,23 +25,25 @@ lazy val commonDependencies = Seq(
// Scala
Dependencies.Libraries.scopt,
Dependencies.Libraries.scalaz7,
Dependencies.Libraries.akkaStream,
Dependencies.Libraries.akkaHttp,
Dependencies.Libraries.akkaSlf4j,
Dependencies.Libraries.json4sJackson,
Dependencies.Libraries.snowplowCommonEnrich,
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.pureconfig,
// Scala (test)
Dependencies.Libraries.akkaTestkit,
Dependencies.Libraries.akkaHttpTestkit,
Dependencies.Libraries.specs2
)

lazy val buildSettings = Seq(
organization := "com.snowplowanalytics",
name := "snowplow-stream-collector",
version := "0.16.0",
version := "0.17.0",
description := "Scala Stream Collector for Snowplow raw events",
scalaVersion := "2.11.11",
scalaVersion := "2.11.12",
scalacOptions := BuildSettings.compilerOptions,
scalacOptions in (Compile, console) ~= { _.filterNot(Set("-Ywarn-unused-import")) },
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package scalastream
import java.io.File

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.{ConnectionContext, Http}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.snowplowanalytics.snowplow.collectors.scalastream.metrics._
import com.snowplowanalytics.snowplow.collectors.scalastream.model._
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import org.slf4j.LoggerFactory
import pureconfig._

Expand Down Expand Up @@ -87,12 +90,49 @@ trait Collector {
metricsRoute.metricsRoute ~ metricsDirectives.logRequest(collectorRoute.collectorRoute)
else collectorRoute.collectorRoute

Http().bindAndHandle(routes, collectorConf.interface, collectorConf.port)
.map { binding =>
log.info(s"REST interface bound to ${binding.localAddress}")
} recover { case ex =>
log.error("REST interface could not be bound to " +
s"${collectorConf.interface}:${collectorConf.port}", ex.getMessage)
lazy val redirectRoutes =
scheme("http") {
extract(_.request.uri) { uri =>
redirect(
uri.copy(scheme = "https").withPort(collectorConf.ssl.port),
StatusCodes.MovedPermanently
)
}
}

def bind(
rs: Route,
interface: String,
port: Int,
connectionContext: ConnectionContext = ConnectionContext.noEncryption()
) =
Http().bindAndHandle(rs, interface, port, connectionContext)
.map { binding =>
log.info(s"REST interface bound to ${binding.localAddress}")
} recover { case ex =>
log.error( "REST interface could not be bound to " +
s"${collectorConf.interface}:${collectorConf.port}", ex.getMessage)
}

lazy val secureEndpoint =
bind(routes,
collectorConf.interface,
collectorConf.ssl.port,
SSLConfig.secureConnectionContext(system, AkkaSSLConfig())
)

lazy val unsecureEndpoint = (routes: Route) =>
bind(routes, collectorConf.interface, collectorConf.port)

collectorConf.ssl match {
case SSLConfig(true, true, port) =>
unsecureEndpoint(redirectRoutes)
secureEndpoint
case SSLConfig(true, false, port) =>
unsecureEndpoint(routes)
secureEndpoint
case _ =>
unsecureEndpoint(routes)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ trait CollectorRoute {
def extractContentType: Directive1[ContentType] =
extractRequestContext.map(_.request.entity.contentType)

def collectorRoute: Route =
def collectorRoute =
if (collectorService.enableDefaultRedirect) routes else rejectRedirect ~ routes

def rejectRedirect: Route =
path("r" / Segment) { _ =>
complete(StatusCodes.NotFound -> "redirects disabled")
}

def routes: Route =
doNotTrack(collectorService.doNotTrackCookie) { dnt =>
cookieIfWanted(collectorService.cookieName) { reqCookie =>
val cookie = reqCookie.map(_.toCookie)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ trait Service {
def cookieName: Option[String]
def doNotTrackCookie: Option[DntCookieMatcher]
def determinePath(vendor: String, version: String): String
def enableDefaultRedirect: Boolean
}

object CollectorService {
Expand All @@ -78,6 +79,7 @@ class CollectorService(

override val cookieName = config.cookieName
override val doNotTrackCookie = config.doNotTrackHttpCookie
override val enableDefaultRedirect = config.enableDefaultRedirect

/**
* Determines the path to be used in the response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
*/
package com.snowplowanalytics.snowplow.collectors.scalastream

import javax.net.ssl.SSLContext
import scala.concurrent.duration.FiniteDuration

import akka.actor.ActorSystem
import akka.stream.TLSClientAuth
import akka.http.scaladsl.ConnectionContext
import akka.http.scaladsl.model.headers.HttpCookiePair

import com.typesafe.sslconfig.akka.AkkaSSLConfig
import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory
import com.typesafe.sslconfig.ssl.ConfigSSLContextBuilder
import com.typesafe.sslconfig.ssl.{ClientAuth => SslClientAuth}
import sinks.Sink

package model {
Expand Down Expand Up @@ -127,6 +133,11 @@ package model {
enabled: Boolean,
durationBucketsInSeconds: Option[List[Double]]
)
final case class SSLConfig(
enable: Boolean = false,
redirect: Boolean = false,
port: Int = 443
)
final case class CollectorConfig(
interface: String,
port: Int,
Expand All @@ -140,7 +151,9 @@ package model {
rootResponse: RootResponseConfig,
cors: CORSConfig,
streams: StreamsConfig,
prometheusMetrics: PrometheusMetricsConfig
prometheusMetrics: PrometheusMetricsConfig,
enableDefaultRedirect: Boolean = true,
ssl: SSLConfig = SSLConfig()
) {
val cookieConfig = if (cookie.enabled) Some(cookie) else None
val doNotTrackHttpCookie =
Expand All @@ -154,4 +167,55 @@ package model {
def fallbackDomain = cookieConfig.flatMap(_.fallbackDomain)
def cookieExpiration = cookieConfig.map(_.expiration)
}

object SSLConfig {
def secureConnectionContext(system: ActorSystem, sslConfig: AkkaSSLConfig) = {
val config = sslConfig.config

val sslContext = if (sslConfig.config.default) {
sslConfig.validateDefaultTrustManager(config)
SSLContext.getDefault
} else {
val mkLogger = new AkkaLoggerFactory(system)
val keyManagerFactory = sslConfig.buildKeyManagerFactory(config)
val trustManagerFactory = sslConfig.buildTrustManagerFactory(config)
new ConfigSSLContextBuilder(mkLogger, config, keyManagerFactory, trustManagerFactory).build()
}

val defaultParams = sslContext.getDefaultSSLParameters
val defaultProtocols = defaultParams.getProtocols
val protocols = sslConfig.configureProtocols(defaultProtocols, config)
defaultParams.setProtocols(protocols)

val defaultCiphers = defaultParams.getCipherSuites
val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, config)
defaultParams.setCipherSuites(cipherSuites)

val clientAuth: Option[TLSClientAuth] = config.sslParametersConfig.clientAuth match {
case SslClientAuth.Default => None
case SslClientAuth.Want =>
defaultParams.setWantClientAuth(true)
Some(TLSClientAuth.Want)
case SslClientAuth.Need =>
defaultParams.setNeedClientAuth(true)
Some(TLSClientAuth.Need)
case SslClientAuth.None =>
defaultParams.setNeedClientAuth(false)
Some(TLSClientAuth.None)
}

if (!sslConfig.config.loose.disableHostnameVerification) {
defaultParams.setEndpointIdentificationAlgorithm("HTTPS")
}

ConnectionContext.https(
sslContext,
Some(sslConfig),
Some(cipherSuites.toList),
Some(defaultProtocols.toList),
clientAuth,
Some(defaultParams)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.model.DntCookieMatc
import org.specs2.mutable.Specification

class CollectorRouteSpec extends Specification with Specs2RouteTest {
val route = new CollectorRoute {
val mkRoute = (withRedirects: Boolean) => new CollectorRoute {
override val collectorService = new Service {
def preflightResponse(req: HttpRequest): HttpResponse =
HttpResponse(200, entity = "preflight response")
Expand All @@ -45,8 +45,11 @@ class CollectorRouteSpec extends Specification with Specs2RouteTest {
def cookieName: Option[String] = Some("name")
def doNotTrackCookie: Option[DntCookieMatcher] = None
def determinePath(vendor: String, version: String): String = "/p1/p2"
def enableDefaultRedirect = withRedirects
}
}
val route = mkRoute(true)
val routeWithoutRedirects = mkRoute(false)

"The collector route" should {
"respond to the cors route with a preflight response" in {
Expand Down Expand Up @@ -100,6 +103,11 @@ class CollectorRouteSpec extends Specification with Specs2RouteTest {
responseAs[String] shouldEqual "200 collector root"
}
}
"disallow redirect routes when redirects disabled" in {
Get("/r/abc") ~> routeWithoutRedirects.collectorRoute ~> check {
responseAs[String] shouldEqual "redirects disabled"
}
}
"respond to anything else with a not found" in {
Get("/something") ~> route.collectorRoute ~> check {
responseAs[String] shouldEqual "404 not found"
Expand Down
46 changes: 35 additions & 11 deletions 2-collectors/scala-stream-collector/examples/config.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ collector {
port = {{collectorPort}}
port = ${?COLLECTOR_PORT}

# optional SSL/TLS configuration
ssl {
enable = false
enable = ${?COLLECTOR_SSL}
# whether to redirect HTTP to HTTPS
redirect = false
redirect = ${?COLLECTOR_SSL_REDIRECT}
port = 9543
port = ${?COLLECTOR_SSL_PORT}
}

# The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
# The expected values are:
# - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
Expand Down Expand Up @@ -141,6 +152,12 @@ collector {
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}

# When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
# Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
# Custom redirects configured in `paths` can still be used.
enableDefaultRedirect = true
enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

# When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
# token. All instances of that token are replaced withe the network ID. If the placeholder isn't
# specified, the default value is `${SP_NUID}`.
Expand Down Expand Up @@ -178,17 +195,6 @@ collector {
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}

# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
# and '/metrics' endpoint will return the report about the requests
enabled = false
enabled = ${?COLLECTOR_PROMETHEUS_METRICS_ENABLED}
# Custom buckets for http_request_duration_seconds_bucket duration metric
#durationBucketsInSeconds = [0.1, 3, 10]
#durationbucketsInSeconds = ${?COLLECTOR_PROMETHEUS_METRICS_DURATION_BUCKETS_IN_SECONDS}
}

# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
Expand Down Expand Up @@ -334,4 +340,22 @@ akka {
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}

# By default setting `collector.ssl` relies on JSSE (Java Secure Socket
# Extension) to enable secure communication.
# To override the default settings set the following section as per
# https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
# ssl-config {
# debug = {
# ssl = true
# }
# keyManager = {
# stores = [
# {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
# ]
# }
# loose {
# disableHostnameVerification = false
# }
# }
}

0 comments on commit 95f938f

Please sign in to comment.