Skip to content

Commit

Permalink
Fix case clauses
Browse files Browse the repository at this point in the history
Author: William Guo <guoyp@apache.org>

Closes #425 from guoyuepeng/fix_case_clauses.
  • Loading branch information
guoyuepeng authored and bhlx3lyx7 committed Sep 29, 2018
1 parent 485c5cf commit 18fc4cf
Show file tree
Hide file tree
Showing 88 changed files with 887 additions and 510 deletions.
36 changes: 14 additions & 22 deletions measure/src/main/scala/org/apache/griffin/measure/Application.scala
Expand Up @@ -18,15 +18,16 @@ under the License.
*/
package org.apache.griffin.measure

import org.apache.griffin.measure.configuration.enums._
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param}
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp

import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

/**
* application entrance
Expand All @@ -49,17 +50,15 @@ object Application extends Loggable {
// read param files
val envParam = readParamFile[EnvConfig](envParamFile) match {
case Success(p) => p
case Failure(ex) => {
case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
}
}
val dqParam = readParamFile[DQConfig](dqParamFile) match {
case Success(p) => p
case Failure(ex) => {
case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
}
}
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)

Expand All @@ -68,32 +67,28 @@ object Application extends Loggable {
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
case _ => {
case _ =>
error(s"${procType} is unsupported process type!")
sys.exit(-4)
}
}

startup

// dq app init
dqApp.init match {
case Success(_) => {
case Success(_) =>
info("process init success")
}
case Failure(ex) => {
case Failure(ex) =>
error(s"process init error: ${ex.getMessage}")
shutdown
sys.exit(-5)
}
}

// dq app run
dqApp.run match {
case Success(_) => {
case Success(_) =>
info("process run success")
}
case Failure(ex) => {
case Failure(ex) =>
error(s"process run error: ${ex.getMessage}")

if (dqApp.retryable) {
Expand All @@ -102,19 +97,16 @@ object Application extends Loggable {
shutdown
sys.exit(-5)
}
}
}

// dq app end
dqApp.close match {
case Success(_) => {
case Success(_) =>
info("process end success")
}
case Failure(ex) => {
case Failure(ex) =>
error(s"process end error: ${ex.getMessage}")
shutdown
sys.exit(-5)
}
}

shutdown
Expand Down
Expand Up @@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils

import org.apache.griffin.measure.configuration.enums._

/**
Expand All @@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String,
def getDataSources: Seq[DataSourceParam] = {
dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
val (seq, names) = ret
if (!names.contains(ds.getName)){
if (!names.contains(ds.getName)) {
(seq :+ ds, names + ds.getName)
} else ret
}._1
Expand Down Expand Up @@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
* rule param
* @param dslType dsl type of this rule (must)
* @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
* @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
* @param outDfName name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
* @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
* @param outDfName name of output dataframe of this rule, by default will be generated
* as data connector dataframe name with index suffix
* @param rule rule to define dq step calculation (must)
* @param details detail config of rule (optional)
* @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
Expand Down Expand Up @@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String
def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")

def validate(): Unit = {}
}
}
Expand Up @@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils

import org.apache.griffin.measure.configuration.enums._

/**
Expand Down Expand Up @@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String,
def validate(): Unit = {
assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
}
}
}
Expand Up @@ -18,11 +18,13 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader

import scala.reflect.ClassTag
import scala.util.Try

import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}

import scala.reflect.ClassTag
import scala.util.Try


/**
* read params from config file path
Expand Down
Expand Up @@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader

import scala.reflect.ClassTag
import scala.util.Try

import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.JsonUtil

import scala.reflect.ClassTag
import scala.util.Try

/**
* read params from json string directly
Expand Down
Expand Up @@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader

import scala.reflect.ClassTag
import scala.util.Try

import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.Param

import scala.reflect.ClassTag
import scala.util.Try

trait ParamReader extends Loggable with Serializable {

Expand Down
Expand Up @@ -31,7 +31,13 @@ sealed trait DqType {

object DqType {
private val dqTypes: List[DqType] = List(
AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
AccuracyType,
ProfilingType,
UniquenessType,
DistinctnessType,
TimelinessType,
CompletenessType,
UnknownType
)
def apply(ptn: String): DqType = {
dqTypes.find(dqType => ptn match {
Expand Down
Expand Up @@ -29,7 +29,13 @@ sealed trait FlattenType {
}

object FlattenType {
private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
private val flattenTypes: List[FlattenType] = List(
DefaultFlattenType,
EntriesFlattenType,
ArrayFlattenType,
MapFlattenType
)

val default = DefaultFlattenType
def apply(ptn: String): FlattenType = {
flattenTypes.find(tp => ptn match {
Expand Down
Expand Up @@ -29,7 +29,13 @@ sealed trait OutputType {
}

object OutputType {
private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
private val outputTypes: List[OutputType] = List(
MetricOutputType,
RecordOutputType,
DscUpdateOutputType,
UnknownOutputType
)

val default = UnknownOutputType
def apply(ptn: String): OutputType = {
outputTypes.find(tp => ptn match {
Expand Down
Expand Up @@ -29,7 +29,11 @@ sealed trait ProcessType {
}

object ProcessType {
private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType)
private val procTypes: List[ProcessType] = List(
BatchProcessType,
StreamingProcessType
)

def apply(ptn: String): ProcessType = {
procTypes.find(tp => ptn match {
case tp.idPattern() => true
Expand Down
Expand Up @@ -30,15 +30,22 @@ sealed trait SinkType {

object SinkType {
private val sinkTypes: List[SinkType] = List(
ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
ConsoleSinkType,
HdfsSinkType,
ElasticsearchSinkType,
MongoSinkType,
UnknownSinkType
)

def apply(ptn: String): SinkType = {
sinkTypes.find(tp => ptn match {
case tp.idPattern() => true
case _ => false
}).getOrElse(UnknownSinkType)
}

def unapply(pt: SinkType): Option[String] = Some(pt.desc)

def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
Expand All @@ -48,36 +55,36 @@ object SinkType {
/**
* console sink, will sink metric in console
*/
case object ConsoleSinkType extends SinkType {
case object ConsoleSinkType extends SinkType {
val idPattern = "^(?i)console|log$".r
val desc = "console"
}

/**
* hdfs sink, will sink metric and record in hdfs
*/
case object HdfsSinkType extends SinkType {
case object HdfsSinkType extends SinkType {
val idPattern = "^(?i)hdfs$".r
val desc = "hdfs"
}

/**
* elasticsearch sink, will sink metric in elasticsearch
*/
case object ElasticsearchSinkType extends SinkType {
case object ElasticsearchSinkType extends SinkType {
val idPattern = "^(?i)es|elasticsearch|http$".r
val desc = "elasticsearch"
}

/**
* mongo sink, will sink metric in mongo db
*/
case object MongoSinkType extends SinkType {
case object MongoSinkType extends SinkType {
val idPattern = "^(?i)mongo|mongodb$".r
val desc = "distinct"
}

case object UnknownSinkType extends SinkType {
case object UnknownSinkType extends SinkType {
val idPattern = "".r
val desc = "unknown"
}
Expand Up @@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.context

import org.apache.griffin.measure.configuration.enums._
import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}

import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.datasource._
import org.apache.griffin.measure.sink.{Sink, SinkFactory}
import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}

/**
* dq context: the context of each calculation
Expand Down Expand Up @@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId,
}
}
dataSourceNames.foreach(name => compileTableRegister.registerTable(name))

def getDataSourceName(index: Int): String = {
if (dataSourceNames.size > index) dataSourceNames(index) else ""
}
Expand All @@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId,
val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq

val dataSourceTimeRanges = loadDataSources()

def loadDataSources(): Map[String, TimeRange] = {
dataSources.map { ds =>
(ds.name, ds.loadData(this))
}.toMap
}

printTimeRanges

private val sinkFactory = SinkFactory(sinkParams, name)
private val defaultSink: Sink = createSink(contextId.timestamp)

def getSink(timestamp: Long): Sink = {
if (timestamp == contextId.timestamp) getSink()
else createSink(timestamp)
}

def getSink(): Sink = defaultSink

private def createSink(t: Long): Sink = {
procType match {
case BatchProcessType => sinkFactory.getSinks(t, true)
Expand Down

0 comments on commit 18fc4cf

Please sign in to comment.