Skip to content

Commit

Permalink
clean up build (#22)
Browse files Browse the repository at this point in the history
* clean up build

* clean up build

* optimize imports and reformat code

* fix tests

Co-authored-by: Robert Lyons <nextdude@gmail.com>
  • Loading branch information
nextdude-mdsol and nextdude committed Feb 17, 2021
1 parent 64c22fa commit e9c1466
Show file tree
Hide file tree
Showing 70 changed files with 980 additions and 801 deletions.
37 changes: 23 additions & 14 deletions .scalafmt.conf
@@ -1,20 +1,29 @@
style = intellij
project.git = true
maxColumn = 120
align = most
version = "2.7.5"
maxColumn = 75
align {
openParenCallSite = true
openParenDefnSite = true
tokens = [ "%", "%%",
{ code = "=>", owner = Case },
{ code = "->", owner = For }
preset = most
multiline = true
tokens.add = [
{code = "<-", owner = "For"},
{code = "%", owner = "Term.ApplyInfix"},
{code = "%%", owner = "Term.ApplyInfix"},
{code = "=", owner = "(Enumerator.Val|Defn.(Va(l|r)|Def|Type))"}
]
arrowEnumeratorGenerator = true
}
optIn.breakChainOnFirstMethodDot = true
verticalMultilineAtDefinitionSite = true
danglingParentheses = true
danglingParentheses {
defnSite = false
callSite = true
}
docstrings {
style = Asterisk
oneline = keep
wrap = yes
}
rewrite {
rules = [SortImports, RedundantBraces]
redundantBraces.maxLines = 1
rules = [SortImports, RedundantBraces, RedundantParens]
redundantBraces {
maxLines = 1
stringInterpolation = true
}
}
39 changes: 23 additions & 16 deletions build.sbt
Expand Up @@ -17,9 +17,8 @@ Test / fork := true
resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"

val V = new {
val flink = "1.11.2"
val flink = "1.11.3"
val logback = "1.2.3"
val log4jOverSlf4j = "1.7.30"
val scalaLogging = "3.9.2"
val scalaTest = "3.2.2"
val scalaCheck = "1.14.3"
Expand All @@ -29,9 +28,8 @@ val V = new {
val typesafeConfig = "1.4.0"
val guava = "29.0-jre"
val squants = "1.7.0"
val avro = "1.10.0"
val avro4s = "4.0.0"
val avro4s_211 = "3.0.0-RC2"
val avro = "1.10.1"
val avro4s = "4.0.4"
}

val flinkDeps = (
Expand All @@ -41,13 +39,9 @@ val flinkDeps = (
Seq("connector-kafka", "connector-kinesis", "connector-cassandra",
"connector-elasticsearch7", "statebackend-rocksdb").map(a => "org.apache.flink" %% s"flink-$a" % V.flink) ++
Seq("org.apache.flink" %% "flink-test-utils" % V.flink % Test)
).map(
_.excludeAll(ExclusionRule(organization = "log4j"), ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"))
)

val loggingDeps = Seq("ch.qos.logback" % "logback-core" % V.logback % Provided,
"ch.qos.logback" % "logback-classic" % V.logback % Provided,
"org.slf4j" % "log4j-over-slf4j" % V.log4jOverSlf4j % Provided,
val loggingDeps = Seq("ch.qos.logback" % "logback-classic" % V.logback % Provided,
"com.typesafe.scala-logging" %% "scala-logging" % V.scalaLogging)

val http4sDeps =
Expand All @@ -62,20 +56,33 @@ val otherDeps = Seq("com.beachape" %% "enumeratum" % V.enumeratum,
"com.typesafe" % "config" % V.typesafeConfig,
"com.google.guava" % "guava" % V.guava,
"org.typelevel" %% "squants" % V.squants,
"com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s,
"org.scalactic" %% "scalactic" % V.scalaTest % Test,
"org.scalatest" %% "scalatest" % V.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % V.scalaCheck % Test)

/**
* Exclude any transitive deps using log4j
* @param m
* the module
* @return
* module with deps excluded
*/
def excludeLog4j(m: ModuleID) = m.excludeAll(
ExclusionRule(
organization = "org.apache.logging.log4j",
name = "*"
),
ExclusionRule(organization = "org.slf4j", name = "*")
)

lazy val flink_runner =
(project in file("."))
.settings(crossScalaVersions := supportedScalaVersions,
libraryDependencies ++=
flinkDeps ++ loggingDeps ++ http4sDeps ++ circeDeps ++ otherDeps ++ {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, scalaMinor)) if scalaMinor == 11 => Seq("com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s_211)
case _ => Seq("com.sksamuel.avro4s" %% "avro4s-core" % V.avro4s)
}
})
(flinkDeps ++ http4sDeps ++ circeDeps ++ otherDeps).map(excludeLog4j)
++ loggingDeps
)

scalacOptions ++= Seq("-encoding",
"utf8",
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version=1.3.13
sbt.version=1.4.6
2 changes: 1 addition & 1 deletion project/plugins.sbt
@@ -1,3 +1,3 @@
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.3")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.5")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
36 changes: 18 additions & 18 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunner.scala
Expand Up @@ -7,11 +7,11 @@ import io.epiphanous.flinkrunner.model.{FlinkConfig, FlinkEvent}
* Flink Job Invoker
*/
class FlinkRunner[ADT <: FlinkEvent](
args: Array[String],
factory: FlinkRunnerFactory[ADT],
sources: Map[String, Seq[Array[Byte]]] = Map.empty,
optConfig: Option[String] = None)
extends LazyLogging {
args: Array[String],
factory: FlinkRunnerFactory[ADT],
sources: Map[String, Seq[Array[Byte]]] = Map.empty,
optConfig: Option[String] = None)
extends LazyLogging {

implicit val config: FlinkConfig = new FlinkConfig(args, factory, sources, optConfig)
implicit val env: SEE = config.configureStreamExecutionEnvironment
Expand All @@ -23,10 +23,10 @@ class FlinkRunner[ADT <: FlinkEvent](
* @param callback a function from an iterator to unit
*/
def process(
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit =
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit =
if (config.jobName == "help") showHelp()
else process1(callback)

Expand All @@ -41,22 +41,22 @@ class FlinkRunner[ADT <: FlinkEvent](
* from running flink job
*/
def process1(
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit = {
callback: PartialFunction[Stream[ADT], Unit] = {
case _ => ()
}
): Unit = {
if (config.jobArgs.headOption.exists(s => List("help", "--help", "-help", "-h").contains(s))) showJobHelp()
else {
config.getJobInstance.run match {
case Left(results) => callback(results.asInstanceOf[Iterator[ADT]].toStream)
case Right(_) => ()
case Right(_) => ()
}
}
}

/**
* Show help for a particular job
**/
* */
def showJobHelp(): Unit = {
val usage =
s"""|${config.jobName} - ${config.jobDescription}
Expand All @@ -77,9 +77,9 @@ class FlinkRunner[ADT <: FlinkEvent](
case s if s.isEmpty => " *** No jobs defined ***"
case s =>
s.map(jn => {
val desc = config.getString(s"jobs.$jn.description")
s" - $jn: $desc"
})
val desc = config.getString(s"jobs.$jn.description")
s" - $jn: $desc"
})
.mkString("\n")
}
val usage =
Expand Down
@@ -1,13 +1,14 @@
package io.epiphanous.flinkrunner
import java.util.Properties

import io.epiphanous.flinkrunner.flink.BaseFlinkJob
import io.epiphanous.flinkrunner.model.{FlinkEvent, KafkaSinkConfig, KafkaSourceConfig, SinkConfig, SourceConfig}
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema}

import java.util.Properties

trait FlinkRunnerFactory[ADT <: FlinkEvent] {

def getJobInstance(name: String): BaseFlinkJob[_, _ <: ADT]
Expand Down
@@ -1,4 +1,5 @@
package io.epiphanous.flinkrunner.algorithm.cardinality

import com.google.common.hash.Funnel
import com.google.common.hash.Hashing.murmur3_128

Expand All @@ -15,7 +16,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

import HyperLogLog._

/** number of registers <code>m = 2**b</code>*/
/** number of registers <code>m = 2**b</code> */
val m = 1 << b

/** relativeError of cardinality estimates */
Expand Down Expand Up @@ -47,6 +48,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Incorporates an item into the registers, updates the cardinality estimate and returns it.
*
* @param item the item to add
* @return Long
*/
Expand All @@ -60,6 +62,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Compute the current distinct cardinality estimate.
*
* @return Long
*/
private def estimateCardinality: Long = {
Expand All @@ -82,6 +85,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {
/**
* Merge another HyperLogLog[T] instance into this instance. Note the other instance must have the same b
* parameter as this instance.
*
* @param another the other HyperLogLog[T] instance
*/
def merge(another: HyperLogLog[T]) = {
Expand All @@ -95,6 +99,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Computes positive integer hash of item
*
* @param item item to hash
* @return Int
*/
Expand All @@ -105,6 +110,7 @@ case class HyperLogLog[T](funnel: Funnel[T], b: Int) {

/**
* Computes most significant set bit of an integer, where returned bit in [0,32].
*
* @param i the non-negative Int to examine
* @return Int
*/
Expand Down

0 comments on commit e9c1466

Please sign in to comment.