Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up build #22

Merged
merged 5 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 23 additions & 14 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
style = intellij
project.git = true
maxColumn = 120
align = most
version = "2.7.5"
maxColumn = 75
align {
openParenCallSite = true
openParenDefnSite = true
tokens = [ "%", "%%",
{ code = "=>", owner = Case },
{ code = "->", owner = For }
preset = most
multiline = true
tokens.add = [
{code = "<-", owner = "For"},
{code = "%", owner = "Term.ApplyInfix"},
{code = "%%", owner = "Term.ApplyInfix"},
{code = "=", owner = "(Enumerator.Val|Defn.(Va(l|r)|Def|Type))"}
]
arrowEnumeratorGenerator = true
}
optIn.breakChainOnFirstMethodDot = true
verticalMultilineAtDefinitionSite = true
danglingParentheses = true
danglingParentheses {
defnSite = false
callSite = true
}
docstrings {
style = Asterisk
oneline = keep
wrap = yes
}
rewrite {
rules = [SortImports, RedundantBraces]
redundantBraces.maxLines = 1
rules = [SortImports, RedundantBraces, RedundantParens]
redundantBraces {
maxLines = 1
stringInterpolation = true
}
}
39 changes: 23 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ Test / fork := true
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"

val V = new {
val flink = "1.11.2"
val flink = "1.11.3"
val logback = "1.2.3"
val log4jOverSlf4j = "1.7.30"
val scalaLogging = "3.9.2"
val scalaTest = "3.2.2"
val scalaCheck = "1.14.3"
Expand All @@ -29,9 +28,8 @@ val V = new {
val typesafeConfig = "1.4.0"
val guava = "29.0-jre"
val squants = "1.7.0"
val avro = "1.10.0"
val avro4s = "4.0.0"
val avro4s_211 = "3.0.0-RC2"
val avro = "1.10.1"
val avro4s = "4.0.4"
}

val flinkDeps = (
Expand All @@ -41,13 +39,9 @@ val flinkDeps = (
Seq("connector-kafka", "connector-kinesis", "connector-cassandra",
"connector-elasticsearch7", "statebackend-rocksdb").map(a => "org.apache.flink" %% s"flink-$a" % V.flink) ++
Seq("org.apache.flink" %% "flink-test-utils" % V.flink % Test)
).map(
_.excludeAll(ExclusionRule(organization = "log4j"), ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"))
)

val loggingDeps = Seq("ch.qos.logback" % "logback-core" % V.logback % Provided,
"ch.qos.logback" % "logback-classic" % V.logback % Provided,
"org.slf4j" % "log4j-over-slf4j" % V.log4jOverSlf4j % Provided,
val loggingDeps = Seq("ch.qos.logback" % "logback-classic" % V.logback % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % V.scalaLogging)

val http4sDeps =
Expand All @@ -62,20 +56,33 @@ val otherDeps = Seq("com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test)

/**
* Exclude any transitive deps using log4j
* @param m
* the module
* @return
* module with deps excluded
*/
def excludeLog4j(m: ModuleID) = m.excludeAll(
ExclusionRule(
organization = "org.apache.logging.log4j",
name = "*"
),
ExclusionRule(organization = "org.slf4j", name = "*")
)

lazy val flink_runner =
(project in file("."))
.settings(crossScalaVersions := supportedScalaVersions,
libraryDependencies ++=
flinkDeps ++ loggingDeps ++ http4sDeps ++ circeDeps ++ otherDeps ++ {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMinor)) if scalaMinor == 11 => Seq("com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s_211)
case _ => Seq("com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s)
}
})
(flinkDeps ++ http4sDeps ++ circeDeps ++ otherDeps).map(excludeLog4j)
++ loggingDeps
)

scalacOptions ++= Seq("-encoding",
"utf8",
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.13
sbt.version=1.4.6
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.5")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
36 changes: 18 additions & 18 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import io.epiphanous.flinkrunner.model.{FlinkConfig, FlinkEvent}
* Flink Job Invoker
*/
class FlinkRunner[ADT <: FlinkEvent](
args: Array[String],
factory: FlinkRunnerFactory[ADT],
sources: Map[String, Seq[Array[Byte]]] = Map.empty,
optConfig: Option[String] = None)
extends LazyLogging {
args: Array[String],
factory: FlinkRunnerFactory[ADT],
sources: Map[String, Seq[Array[Byte]]] = Map.empty,
optConfig: Option[String] = None)
extends LazyLogging {

implicit val config: FlinkConfig = new FlinkConfig(args, factory, sources, optConfig)
implicit val env: SEE = config.configureStreamExecutionEnvironment
Expand All @@ -23,10 +23,10 @@ class FlinkRunner[ADT <: FlinkEvent](
* @param callback a function from an iterator to unit
*/
def process(
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit =
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit =
if (config.jobName == "help") showHelp()
else process1(callback)

Expand All @@ -41,22 +41,22 @@ class FlinkRunner[ADT <: FlinkEvent](
* from running flink job
*/
def process1(
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit = {
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit = {
if (config.jobArgs.headOption.exists(s => List("help", "--help", "-help", "-h").contains(s))) showJobHelp()
else {
config.getJobInstance.run match {
case Left(results) => callback(results.asInstanceOf[Iterator[ADT]].toStream)
case Right(_) => ()
case Right(_) => ()
}
}
}

/**
* Show help for a particular job
**/
* */
def showJobHelp(): Unit = {
val usage =
s"""|${config.jobName} - ${config.jobDescription}
Expand All @@ -77,9 +77,9 @@ class FlinkRunner[ADT <: FlinkEvent](
case s if s.isEmpty => " *** No jobs defined ***"
case s =>
s.map(jn => {
val desc = config.getString(s"jobs.$jn.description")
s" - $jn: $desc"
})
val desc = config.getString(s"jobs.$jn.description")
s" - $jn: $desc"
})
.mkString("\n")
}
val usage =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.epiphanous.flinkrunner
import java.util.Properties

import io.epiphanous.flinkrunner.flink.BaseFlinkJob
import io.epiphanous.flinkrunner.model.{FlinkEvent, KafkaSinkConfig, KafkaSourceConfig, SinkConfig, SourceConfig}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema}

import java.util.Properties

trait FlinkRunnerFactory[ADT <: FlinkEvent] {

def getJobInstance(name: String): BaseFlinkJob[_, _ <: ADT]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package io.epiphanous.flinkrunner.algorithm.cardinality

import com.google.common.hash.Funnel
import com.google.common.hash.Hashing.murmur3_128

Expand All @@ -15,7 +16,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

import HyperLogLog._

/** number of registers <code>m = 2**b</code>*/
/** number of registers <code>m = 2**b</code> */
val m = 1 << b

/** relativeError of cardinality estimates */
Expand Down Expand Up @@ -47,6 +48,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Incorporates an item into the registers, updates the cardinality estimate and returns it.
*
* @param item the item to add
* @return Long
*/
Expand All @@ -60,6 +62,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Compute the current distinct cardinality estimate.
*
* @return Long
*/
private def estimateCardinality: Long = {
Expand All @@ -82,6 +85,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
/**
* Merge another HyperLogLog[T] instance into this instance. Note the other instance must have the same b
* parameter as this instance.
*
* @param another the other HyperLogLog[T] instance
*/
def merge(another: HyperLogLog[T]) = {
Expand All @@ -95,6 +99,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Computes positive integer hash of item
*
* @param item item to hash
* @return Int
*/
Expand All @@ -105,6 +110,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Computes most significant set bit of an integer, where returned bit in [0,32].
*
* @param i the non-negative Int to examine
* @return Int
*/
Expand Down