In [0]:
sc.version

In [1]:
import org.apache.spark.ml.{Transformer, Estimator, Model, Pipeline, PipelineModel}
import org.apache.spark.ml.param.{Params, ParamMap, Param, DoubleParam, StringArrayParam}
import org.apache.spark.ml.util.{Identifiable, DefaultParamsReadable, DefaultParamsWritable}
import org.apache.spark.ml.feature.{StringIndexer, Imputer, VectorAssembler, Interaction, StandardScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector}
import org.apache.spark.ml.classification.{LogisticRegression}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import org.apache.spark.sql.{SQLContext, DataFrame, Dataset, Encoder, Encoders}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, lit, explode, udf}
import org.apache.spark.sql.types.{DoubleType, DataTypes, StructType, StructField}
import scala.collection.mutable.{ArrayBuffer}

In [2]:
// Common Parameters
trait NullColumnParams extends Params {
    // Defining a Param requires 3 elements:
    //  - Param definition
    //  - Param getter method
    //  - Param setter method
    
    /**---
        * Param for input column names.
    */
    final val inputCols: StringArrayParam = new StringArrayParam(this, "inputCols", "Input column names")
    
    final def getInputCols: Array[String] = $(inputCols)
    
    final def setInputCols(value: Array[String]): this.type = set(inputCols, value)
    
    setDefault(inputCols -> Array.empty[String])
    
    
    final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols", "Output column names")
    
    final def getOutputCols: Array[String] = $(outputCols)
    
    final def setOutputCols(value: Array[String]): this.type = set(outputCols, value)
    
    setDefault(outputCols -> Array.empty[String])
    
    
    final val missingValue: DoubleParam = new DoubleParam(this, "missingValue", "Replace null values")
    
    final def getMissingValue: Double = $(missingValue)
    
    final def setMissingValue(value: Double): this.type = set(missingValue, value)
    
    setDefault(missingValue -> -1.0)
 
}

In [3]:
class NullColumnModel(override val uid: String) extends Model[NullColumnModel]
    with DefaultParamsWritable // Enables Serialization
    with NullColumnParams {
    
    import spark.implicits._
    
    
    // Transformer requires 3 methods:
    //  - transform
    //  - transformSchema
    //  - copy

    
    /**
    * This method implements the main transformation.
    * Its required semantics are fully defined by the method API: take a Dataset or DataFrame,
    * and return a DataFrame.
    */
    override def transform(dataset: Dataset[_]): DataFrame = {
        val colsMap=($(inputCols) zip $(outputCols)).toMap

        colsMap.foldLeft(dataset.toDF)(
            (ds, columnMap) => ds.withColumn(columnMap._2, col(columnMap._1) )
        ).na.fill($(missingValue), $(outputCols))
    }
    
    /*
    * Check transform validity and derive the output schema from the input schema.
    *
    * We check validity for interactions between parameters during `transformSchema` and
    * raise an exception if any parameter value is invalid. Parameter value checks which
    * do not depend on other parameters are handled by `Param.validate()`.
    *
    * Typical implementation should first conduct verification on schema change and parameter
    * validity, including complex parameter interaction checks.
    */
    
    override def transformSchema(schema: StructType): StructType = {
        // Validate input type.
        // Input type validation is technically optional, but it is a good practice since it catches
        // schema errors early on.
        $(inputCols).foreach((inputColName: String) => {
            require(schema.fieldNames.contains(inputColName), s"Input column $inputColName must exist.")
        })
        
        StructType(
            schema.fields ++ $(outputCols).map(c=>{StructField(c,DoubleType,false)})
        )
    }
    /**
    * Creates a copy of this instance.
    * Requirements:
    *  - The copy must have the same UID.
    *  - The copy must have the same Params, with some possibly overwritten by the `extra`
    *    argument.
    *  - This should do a deep copy of any data members which are mutable.  That said,
    *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
    *    method often suffices.
    * @param extra  Param values which will overwrite Params in the copy.
    */
    override def copy(extra: ParamMap): NullColumnModel = defaultCopy(extra)
}

object NullColumnModel extends DefaultParamsReadable[NullColumnModel]

In [4]:
class NullColumnEncoder(override val uid: String) extends Estimator[NullColumnModel]
    with DefaultParamsWritable // Enables Serialization of MyCommonParams
    with NullColumnParams {
    
    import spark.implicits._

    def this() = this(Identifiable.randomUID("NullColumnEncoder"))

    /**
    * Creates a copy of this instance.
    * Requirements:
    *  - The copy must have the same UID.
    *  - The copy must have the same Params, with some possibly overwritten by the `extra`
    *    argument.
    *  - This should do a deep copy of any data members which are mutable.  That said,
    *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
    *    method often suffices.
    * @param extra  Param values which will overwrite Params in the copy.
    */
    override def copy(extra: ParamMap): Estimator[NullColumnModel] = defaultCopy(extra)

    
    override def transformSchema(schema: StructType): StructType = {
        // Validate input type.
        // Input type validation is technically optional, but it is a good practice since it catches
        // schema errors early on.
        $(inputCols).foreach((inputColName: String) => {
            require(schema.fieldNames.contains(inputColName), s"Input column $inputColName must exist.")
        })
        
        StructType(
            schema.fields
        )
    }
    
    override def fit(dataset: Dataset[_]): NullColumnModel = {
        copyValues(
            new NullColumnModel(
                uid + "_model"
            ).setParent(this)
        )
    }
}

// Companion object enables deserialization of MyCommonParams
object NullColumnEncoder extends DefaultParamsReadable[NullColumnEncoder]

In [5]:
// Common Parameters
trait ArrayOneHotEncoderParams extends Params {
    // Defining a Param requires 3 elements:
    //  - Param definition
    //  - Param getter method
    //  - Param setter method
    
    /**---
        * Param for input column names.
    */
    final val inputCols: StringArrayParam = new StringArrayParam(this, "inputCols", "Input column names")
    
    final def getInputCols: Array[String] = $(inputCols)
    
    final def setInputCols(value: Array[String]): this.type = set(inputCols, value)
    
    setDefault(inputCols -> Array.empty[String])
    
    /**---
        * Param for output column names.
    */
    final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols", "Output column names")
    
    final def getOutputCols: Array[String] = $(outputCols)
    
    final def setOutputCols(value: Array[String]): this.type = set(outputCols, value)
    
    setDefault(outputCols -> Array.empty[String])
    

    /**---
        * The scheme of the columns, if know. Otherwise it is created whan fit
    */

    final val knownCols: StringArrayParam = new StringArrayParam(this, "knownCols", "The scheme of the columns, if know(arrays of string like know_column_name:know_column_value)")
    
    final def getKnownCols: Array[String] = $(knownCols)
    
    final def setKnownCols(value: Array[String]): this.type = set(knownCols, value)
    
    setDefault(knownCols -> Array.empty[String])

    
    /**---
       * The name of the columns for which you want to create new columns
    */
    
    final val createColsByValues: StringArrayParam = new StringArrayParam(this, "createColsByValues", "The name of the columns for which you want to create new columns")
    
    final def getCreateColsByValues: Array[String] = $(createColsByValues)
    
    final def setCreateColsByValues(value: Array[String]): this.type = set(createColsByValues, value)
    
    setDefault(createColsByValues -> Array.empty[String])
    
}

In [6]:
class ArrayOneHotEncoderModel(override val uid: String, knowColsMap: Map[String, Array[String]]) extends Model[ArrayOneHotEncoderModel]
    with DefaultParamsWritable // Enables Serialization
    with ArrayOneHotEncoderParams {
    
    import spark.implicits._
    
    //---
    private var newCols: ArrayBuffer[StructField] = ArrayBuffer.empty[StructField]
    
    
    // my functions
    
            
    private def bool2double(b:Boolean) = if (b) 1.0 else 0.0
    
    private def arrayToCol(df:DataFrame, inputColumn:String, outputColumn:String, arrVal:Array[String]): DataFrame = {
        if($(createColsByValues).contains(inputColumn)){
            arrVal.foldLeft(df){
                case (acc, arr) => {
                    val columnName:String = outputColumn+"_"+arr
                    newCols+=StructField(columnName, DoubleType, true) 
                    acc.withColumn(columnName, array_contains(col(inputColumn),arr).cast(DoubleType))
                }
            }
        }else{
            val containArray = udf[Vector, Array[String]](arr=>{
                                                                    Vectors.dense(
                                                                        arrVal.map(c=>{
                                                                            bool2double(arr.contains(c))
                                                                        })
                                                                    )
                                                                })
            newCols+=StructField(outputColumn, DoubleType, true) 
            df.withColumn(
                outputColumn,
                containArray( col(inputColumn) )
            )
        }
    }
    
    private def getArgName(df:DataFrame, colName:String): Array[String] = {
        if (knowColsMap.contains(colName)){
            knowColsMap(colName)
        }else{
            Array.empty[String]
        }
    }
    
    
    // Transformer requires 3 methods:
    //  - transform
    //  - transformSchema
    //  - copy

    
    /**
    * This method implements the main transformation.
    * Its required semantics are fully defined by the method API: take a Dataset or DataFrame,
    * and return a DataFrame.
    */
    override def transform(dataset: Dataset[_]): DataFrame = {
        val colsMap=($(inputCols) zip $(outputCols)).toMap

        colsMap.foldLeft(dataset.toDF){
            case (acc, columnMap) => {
                val argName: Array[String] = getArgName(df=acc,colName=columnMap._1)
                arrayToCol(df=acc, inputColumn=columnMap._1, outputColumn=columnMap._2, arrVal=argName)
            }
        }
    }
    
    /*
    * Check transform validity and derive the output schema from the input schema.
    *
    * We check validity for interactions between parameters during `transformSchema` and
    * raise an exception if any parameter value is invalid. Parameter value checks which
    * do not depend on other parameters are handled by `Param.validate()`.
    *
    * Typical implementation should first conduct verification on schema change and parameter
    * validity, including complex parameter interaction checks.
    */
    
    override def transformSchema(schema: StructType): StructType = {
        // Validate input type.
        // Input type validation is technically optional, but it is a good practice since it catches
        // schema errors early on.
        $(inputCols).foreach((inputColName: String) => {
            require(schema.fieldNames.contains(inputColName), s"Input column $inputColName must exist.")
        })
        
        StructType(
            schema.fields ++ newCols
        )
    }
    /**
    * Creates a copy of this instance.
    * Requirements:
    *  - The copy must have the same UID.
    *  - The copy must have the same Params, with some possibly overwritten by the `extra`
    *    argument.
    *  - This should do a deep copy of any data members which are mutable.  That said,
    *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
    *    method often suffices.
    * @param extra  Param values which will overwrite Params in the copy.
    */
    override def copy(extra: ParamMap): ArrayOneHotEncoderModel = defaultCopy(extra)
}

object ArrayOneHotEncoderModel extends DefaultParamsReadable[ArrayOneHotEncoderModel]

In [7]:
class ArrayOneHotEncoder(override val uid: String) extends Estimator[ArrayOneHotEncoderModel]
    with DefaultParamsWritable // Enables Serialization of MyCommonParams
    with ArrayOneHotEncoderParams {
    
    import spark.implicits._

    def this() = this(Identifiable.randomUID("OneHotEncoderExtended"))
    
    private def knownColsMapParser(date: Array[String]): Map[String, Array[String]] = {
        date.foldLeft(Map.empty[String, Array[String]]){
            case (mp, arr)=>{
                val pair = arr.split(":")
                if(mp.contains(pair(0))){
                    mp ++ Map(pair(0)->(mp(pair(0)):+pair(1)))
                }else{
                    mp ++ Map(pair(0)->Array(pair(1)))
                }
            }
        }
    }
    

    /**
    * Creates a copy of this instance.
    * Requirements:
    *  - The copy must have the same UID.
    *  - The copy must have the same Params, with some possibly overwritten by the `extra`
    *    argument.
    *  - This should do a deep copy of any data members which are mutable.  That said,
    *    Transformers should generally be immutable (except for Params), so the `defaultCopy`
    *    method often suffices.
    * @param extra  Param values which will overwrite Params in the copy.
    */
    override def copy(extra: ParamMap): Estimator[ArrayOneHotEncoderModel] = defaultCopy(extra)

    override def transformSchema(schema: StructType): StructType = {
        // Validate input type.
        // Input type validation is technically optional, but it is a good practice since it catches
        // schema errors early on.
        $(inputCols).foreach((inputColName: String) => {
            require(schema.fieldNames.contains(inputColName), s"Input column $inputColName must exist.")
        })
        
        StructType(
            schema.fields
        )
    }
    
    override def fit(dataset: Dataset[_]): ArrayOneHotEncoderModel = {
        
        val knowColsMap: Map[String, Array[String]] = $(inputCols)
            .foldLeft(
                knownColsMapParser($(knownCols))
            ){ case (mp, column)=>{
                    if(!(mp.contains(column))){
                        val argName: Array[String] = dataset.select(explode(col(column)))
                                                .dropDuplicates()
                                                .as[String]
                                                .collect 
                        mp ++ Map(column->argName)
                    }else{
                        mp
                    }
                }
            }

        copyValues(
            new ArrayOneHotEncoderModel(
                uid + "_model",
                knowColsMap
            ).setParent(this)
        )
    }
}
// Companion object enables deserialization of MyCommonParams
object ArrayOneHotEncoder extends DefaultParamsReadable[ArrayOneHotEncoder]

In [8]:
// Read date
val dataDir = "./notebook/otus/homework/data/train"
val dfRaw = spark.read.load(dataDir)

In [9]:
// Create constant

val label = "Liked"
val arrayLabelColumn = "feedback"

val labelColumn = arrayLabelColumn+"_"+label

val nullTolerance = 1
val defaultMissingValue = -1.0

val strategyMissingValue = "mean"

val modelPath = "./notebook/otus/tmp/spark-logistic-regression-model"


# Allocation of columns for training

In [11]:
// Allocation of columns by type

val Pattern = "(^Array.*)".r

val colsNumber  = ArrayBuffer[String]()
val colsString  = ArrayBuffer[String]()
val colsArray   = ArrayBuffer[String]()
val colsDate    = ArrayBuffer[String]()
val colsUnknown = ArrayBuffer[String]()

dfRaw
    .dtypes
    .foreach(ct=>{
        val (c,t) = ct
        t match {
            case "IntegerType"| "LongType" | "DoubleType" => colsNumber+=c
            case "StringType"                             => colsString+=c
            case "DateType"                               => colsDate+=c
            case Pattern(_)                               => colsArray+=c
            case _                                        => colsUnknown+=c
        }
    })

In [12]:
def getNotFullNullColumns(dataset: Dataset[_], inputCols: Array[String], nullTolerance: Int) = {        
    val rowNotNull = dataset.select(
        inputCols.map( c => { sum( col(c).isNotNull.cast("long") ) }.as(c) ):_*
    ).collect.apply(0)
    
    val mapNotNull = rowNotNull.getValuesMap[Long](rowNotNull.schema.fieldNames)
    
    mapNotNull.filter { case (key, value) => value<nullTolerance }.keys.toArray
}

In [13]:
// Allocation of columns by Number completely null or not

val colsNumberFullNull = getNotFullNullColumns(
    dataset=dfRaw,
    inputCols=colsNumber.toArray,
    nullTolerance=nullTolerance
)
val colsNumberNotFullNull = colsNumber -- colsNumberFullNull

In [14]:
// created cols names for VectorAssembler
val outputColsStringIndexed     = colsString.map(_+"_indexed").toArray
val outputColsFullNull          = colsNumberFullNull.map(_+"_null").toArray
val outputColsNumberImputedMean = colsNumberNotFullNull.map(_+"_imputedMean").toArray
val outputColsArray             = (colsArray - arrayLabelColumn).map(_+"_oh").toArray

val inputColsVA = outputColsStringIndexed ++ outputColsFullNull ++ outputColsNumberImputedMean ++ outputColsArray

# Create Label

In [16]:
val dfFit = new ArrayOneHotEncoder()
    .setInputCols(Array(arrayLabelColumn))
    .setOutputCols(Array(arrayLabelColumn))
    .setKnownCols(
        Array(
            arrayLabelColumn+":"+label
        )
    ).setCreateColsByValues(
        Array(arrayLabelColumn)
    ).fit(dfRaw)
    .transform(dfRaw)


In [17]:
z.show(dfFit)

In [18]:
def balanceDataFrame(df: DataFrame, labelName:String, weightName:String): DataFrame = {

    val numNegatives = df.filter(col(labelName) === 0).count
    val dfSize = df.count
    val balancingRatio = (dfSize - numNegatives).toDouble / dfSize

    val calculateWeights = udf { d: Double =>
      if (d == 0.0) {
        1 * balancingRatio
      }
      else {
        (1 * (1.0 - balancingRatio))
      }
    }

    df.withColumn(weightName, calculateWeights(col(labelName)))
}

In [19]:
val dfBalanced = balanceDataFrame(dfFit, labelColumn, "classWeightCol")

In [20]:
z.show(dfBalanced)

# Create and fit pipeline

In [22]:
val train = dfBalanced

In [23]:
val strategyNumberImputer = new Imputer()
  .setInputCols(colsNumberNotFullNull.toArray)
  .setOutputCols(outputColsNumberImputedMean)
  .setStrategy(strategyMissingValue)

In [24]:
val nullColumnEncoder = new NullColumnEncoder()
    .setInputCols(colsNumberFullNull.toArray)
    .setOutputCols(outputColsFullNull)
    .setMissingValue(defaultMissingValue)

In [25]:
val stringIndexer = new StringIndexer()
    .setInputCols(colsString.toArray)
    .setOutputCols(outputColsStringIndexed)
    .setHandleInvalid("keep")

In [26]:
val arrayColumnsEncoder = new ArrayOneHotEncoder()
    .setInputCols(
        (colsArray - arrayLabelColumn).toArray
    ).setOutputCols(outputColsArray)

In [27]:
val pipelineFeatureTransform = new Pipeline()
    .setStages(
        Array(
            nullColumnEncoder,
            strategyNumberImputer,
            arrayColumnsEncoder,
            stringIndexer
        )
    )

In [28]:
val vectorAssembler = new VectorAssembler()
    .setInputCols(inputColsVA)
    .setOutputCol("features")

In [29]:
val scaler = new StandardScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .setWithStd(true)
    .setWithMean(false)

In [30]:
val pipelineFeatureCreate = new Pipeline()
    .setStages(
        Array(
            pipelineFeatureTransform.fit(train),
            vectorAssembler,
            scaler
        )
    )

In [31]:
val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.09)
    .setElasticNetParam(0.4)
    .setFeaturesCol("scaledFeatures")
    .setLabelCol(labelColumn)
    .setWeightCol("classWeightCol")


In [32]:
val pipeline = new Pipeline()
    .setStages(
        Array(
            pipelineFeatureCreate.fit(train),
            lr
        )
    )

In [33]:
val model = pipeline.fit(train)

In [34]:
//save model
//model.write.overwrite().save(modelPath)

In [35]:
//load model
//val model = PipelineModel.load(modelPath)

In [36]:
val predictions = model.transform(dfFit)

In [37]:
val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol(labelColumn)
  .setRawPredictionCol("prediction")
  .setMetricName("areaUnderROC")

In [38]:
val accuracy = evaluator.evaluate(predictions)
println(s"Accuracy: ${accuracy}")

# Create test stream

In [40]:
import org.apache.spark.sql.execution.streaming.{MemoryStream, Sink}
import org.apache.spark.sql.streaming.{StreamingQuery, OutputMode}
import org.apache.spark.sql.sources.{StreamSinkProvider}
import org.apache.spark.sql.{Row, ForeachWriter}

In [41]:
// Read test date
val testDir = "./notebook/otus/homework/data/test"
val dfTest = spark.read.load(testDir)

In [42]:
import org.apache.spark.sql.types._

class Schema2CaseClass {
  type TypeConverter = (DataType) => String

  def schemaToCaseClass(schema:StructType, className:String)(implicit tc:TypeConverter):String = {
    def genField(s:StructField):String = {
      val f = tc(s.dataType)
      s match {
        case x if(x.nullable) => s"  ${s.name}:Option[$f]"
        case _ => s"  ${s.name}:$f"
      }
    }

    val fieldsStr = schema.map(genField).mkString(",\n  ")
    
    s"""
       |case class $className (
       |  $fieldsStr
       |)
    """.stripMargin


  }

  object implicits {
    implicit val defaultTypeConverter:TypeConverter = (t:DataType) => { t match {
      case _:ByteType => "Byte"
      case _:ShortType => "Short"
      case _:IntegerType => "Int"
      case _:LongType => "Long"
      case _:FloatType => "Float"
      case _:DoubleType => "Double"
      case _:DecimalType => "java.math.BigDecimal"
      case _:StringType => "String"
      case _:BinaryType => "Array[Byte]"
      case _:BooleanType => "Boolean"
      case _:TimestampType => "java.sql.Timestamp"
      case _:DateType => "java.sql.Date"
      case _: ArrayType => {
        val e = t match { case ArrayType(elementType, _) => elementType }
        s"Seq[${defaultTypeConverter(e)}]"
      }
      case _:MapType => "scala.collection.Map"
      case _:StructType => "org.apache.spark.sql.Row"
      case _ => "String"
    }}
  }
}

In [43]:
val s2cc = new Schema2CaseClass
val sh = s2cc.schemaToCaseClass(dfTest.schema, "schemaClass")(s2cc.implicits.defaultTypeConverter)
println(sh)

In [44]:
case class schemaClass (
    instanceId_userId:Option[Int],
    instanceId_objectType:Option[String],
    instanceId_objectId:Option[Int],
    audit_pos:Option[Long],
    audit_clientType:Option[String],
    audit_timestamp:Option[Long],
    audit_timePassed:Option[Long],
    audit_experiment:Option[String],
    audit_resourceType:Option[Long],
    metadata_ownerId:Option[Int],
    metadata_ownerType:Option[String],
    metadata_createdAt:Option[Long],
    metadata_authorId:Option[Int],
    metadata_applicationId:Option[Long],
    metadata_numCompanions:Option[Int],
    metadata_numPhotos:Option[Int],
    metadata_numPolls:Option[Int],
    metadata_numSymbols:Option[Int],
    metadata_numTokens:Option[Int],
    metadata_numVideos:Option[Int],
    metadata_platform:Option[String],
    metadata_totalVideoLength:Option[Int],
    metadata_options:Option[Seq[String]],
    relationsMask:Option[Long],
    userOwnerCounters_USER_FEED_REMOVE:Option[Double],
    userOwnerCounters_USER_PROFILE_VIEW:Option[Double],
    userOwnerCounters_VOTE_POLL:Option[Double],
    userOwnerCounters_USER_SEND_MESSAGE:Option[Double],
    userOwnerCounters_USER_DELETE_MESSAGE:Option[Double],
    userOwnerCounters_USER_INTERNAL_LIKE:Option[Double],
    userOwnerCounters_USER_INTERNAL_UNLIKE:Option[Double],
    userOwnerCounters_USER_STATUS_COMMENT_CREATE:Option[Double],
    userOwnerCounters_PHOTO_COMMENT_CREATE:Option[Double],
    userOwnerCounters_MOVIE_COMMENT_CREATE:Option[Double],
    userOwnerCounters_USER_PHOTO_ALBUM_COMMENT_CREATE:Option[Double],
    userOwnerCounters_COMMENT_INTERNAL_LIKE:Option[Double],
    userOwnerCounters_USER_FORUM_MESSAGE_CREATE:Option[Double],
    userOwnerCounters_PHOTO_MARK_CREATE:Option[Double],
    userOwnerCounters_PHOTO_VIEW:Option[Double],
    userOwnerCounters_PHOTO_PIN_BATCH_CREATE:Option[Double],
    userOwnerCounters_PHOTO_PIN_UPDATE:Option[Double],
    userOwnerCounters_USER_PRESENT_SEND:Option[Double],
    userOwnerCounters_UNKNOWN:Option[Double],
    userOwnerCounters_CREATE_TOPIC:Option[Double],
    userOwnerCounters_CREATE_IMAGE:Option[Double],
    userOwnerCounters_CREATE_MOVIE:Option[Double],
    userOwnerCounters_CREATE_COMMENT:Option[Double],
    userOwnerCounters_CREATE_LIKE:Option[Double],
    userOwnerCounters_TEXT:Option[Double],
    userOwnerCounters_IMAGE:Option[Double],
    userOwnerCounters_VIDEO:Option[Double],
    ownerUserCounters_USER_FEED_REMOVE:Option[Double],
    ownerUserCounters_USER_PROFILE_VIEW:Option[Double],
    ownerUserCounters_VOTE_POLL:Option[Double],
    ownerUserCounters_USER_SEND_MESSAGE:Option[Double],
    ownerUserCounters_USER_DELETE_MESSAGE:Option[Double],
    ownerUserCounters_USER_INTERNAL_LIKE:Option[Double],
    ownerUserCounters_USER_INTERNAL_UNLIKE:Option[Double],
    ownerUserCounters_USER_STATUS_COMMENT_CREATE:Option[Double],
    ownerUserCounters_PHOTO_COMMENT_CREATE:Option[Double],
    ownerUserCounters_MOVIE_COMMENT_CREATE:Option[Double],
    ownerUserCounters_USER_PHOTO_ALBUM_COMMENT_CREATE:Option[Double],
    ownerUserCounters_COMMENT_INTERNAL_LIKE:Option[Double],
    ownerUserCounters_USER_FORUM_MESSAGE_CREATE:Option[Double],
    ownerUserCounters_PHOTO_MARK_CREATE:Option[Double],
    ownerUserCounters_PHOTO_VIEW:Option[Double],
    ownerUserCounters_PHOTO_PIN_BATCH_CREATE:Option[Double],
    ownerUserCounters_PHOTO_PIN_UPDATE:Option[Double],
    ownerUserCounters_USER_PRESENT_SEND:Option[Double],
    ownerUserCounters_UNKNOWN:Option[Double],
    ownerUserCounters_CREATE_TOPIC:Option[Double],
    ownerUserCounters_CREATE_IMAGE:Option[Double],
    ownerUserCounters_CREATE_MOVIE:Option[Double],
    ownerUserCounters_CREATE_COMMENT:Option[Double],
    ownerUserCounters_CREATE_LIKE:Option[Double],
    ownerUserCounters_TEXT:Option[Double],
    ownerUserCounters_IMAGE:Option[Double],
    ownerUserCounters_VIDEO:Option[Double],
    membership_status:Option[String],
    membership_statusUpdateDate:Option[Long],
    membership_joinDate:Option[Long],
    membership_joinRequestDate:Option[Long],
    owner_create_date:Option[Long],
    owner_birth_date:Option[Int],
    owner_gender:Option[Int],
    owner_status:Option[Int],
    owner_ID_country:Option[Long],
    owner_ID_Location:Option[Int],
    owner_is_active:Option[Int],
    owner_is_deleted:Option[Int],
    owner_is_abused:Option[Int],
    owner_is_activated:Option[Int],
    owner_change_datime:Option[Long],
    owner_is_semiactivated:Option[Int],
    owner_region:Option[Int],
    user_create_date:Option[Long],
    user_birth_date:Option[Int],
    user_gender:Option[Int],
    user_status:Option[Int],
    user_ID_country:Option[Long],
    user_ID_Location:Option[Int],
    user_is_active:Option[Int],
    user_is_deleted:Option[Int],
    user_is_abused:Option[Int],
    user_is_activated:Option[Int],
    user_change_datime:Option[Long],
    user_is_semiactivated:Option[Int],
    user_region:Option[Int],
    objectId:Option[Int],
    auditweights_ageMs:Option[Double],
    auditweights_closed:Option[Double],
    auditweights_ctr_gender:Option[Double],
    auditweights_ctr_high:Option[Double],
    auditweights_ctr_negative:Option[Double],
    auditweights_dailyRecency:Option[Double],
    auditweights_feedOwner_RECOMMENDED_GROUP:Option[Double],
    auditweights_feedStats:Option[Double],
    auditweights_friendCommentFeeds:Option[Double],
    auditweights_friendCommenters:Option[Double],
    auditweights_friendLikes:Option[Double],
    auditweights_friendLikes_actors:Option[Double],
    auditweights_hasDetectedText:Option[Double],
    auditweights_hasText:Option[Double],
    auditweights_isPymk:Option[Double],
    auditweights_isRandom:Option[Double],
    auditweights_likersFeedStats_hyper:Option[Double],
    auditweights_likersSvd_prelaunch_hyper:Option[Double],
    auditweights_matrix:Option[Double],
    auditweights_notOriginalPhoto:Option[Double],
    auditweights_numDislikes:Option[Double],
    auditweights_numLikes:Option[Double],
    auditweights_numShows:Option[Double],
    auditweights_onlineVideo:Option[Double],
    auditweights_partAge:Option[Double],
    auditweights_partCtr:Option[Double],
    auditweights_partSvd:Option[Double],
    auditweights_processedVideo:Option[Double],
    auditweights_relationMasks:Option[Double],
    auditweights_source_LIVE_TOP:Option[Double],
    auditweights_source_MOVIE_TOP:Option[Double],
    auditweights_svd_prelaunch:Option[Double],
    auditweights_svd_spark:Option[Double],
    auditweights_userAge:Option[Double],
    auditweights_userOwner_CREATE_COMMENT:Option[Double],
    auditweights_userOwner_CREATE_IMAGE:Option[Double],
    auditweights_userOwner_CREATE_LIKE:Option[Double],
    auditweights_userOwner_IMAGE:Option[Double],
    auditweights_userOwner_MOVIE_COMMENT_CREATE:Option[Double],
    auditweights_userOwner_PHOTO_COMMENT_CREATE:Option[Double],
    auditweights_userOwner_PHOTO_MARK_CREATE:Option[Double],
    auditweights_userOwner_PHOTO_VIEW:Option[Double],
    auditweights_userOwner_TEXT:Option[Double],
    auditweights_userOwner_UNKNOWN:Option[Double],
    auditweights_userOwner_USER_DELETE_MESSAGE:Option[Double],
    auditweights_userOwner_USER_FEED_REMOVE:Option[Double],
    auditweights_userOwner_USER_FORUM_MESSAGE_CREATE:Option[Double],
    auditweights_userOwner_USER_INTERNAL_LIKE:Option[Double],
    auditweights_userOwner_USER_INTERNAL_UNLIKE:Option[Double],
    auditweights_userOwner_USER_PRESENT_SEND:Option[Double],
    auditweights_userOwner_USER_PROFILE_VIEW:Option[Double],
    auditweights_userOwner_USER_SEND_MESSAGE:Option[Double],
    auditweights_userOwner_USER_STATUS_COMMENT_CREATE:Option[Double],
    auditweights_userOwner_VIDEO:Option[Double],
    auditweights_userOwner_VOTE_POLL:Option[Double],
    auditweights_x_ActorsRelations:Option[Long],
    auditweights_likersSvd_spark_hyper:Option[Double],
    auditweights_source_PROMO:Option[Double],
    //date:Option[java.sql.Date]
    date:Option[String]
)

In [45]:
val dataTest = dfTest.as[schemaClass]

In [46]:
/**
* Add Column Index to dataframe to each row
*/
def addColumnIndex(df: DataFrame) = {
spark.sqlContext.createDataFrame(
  df.rdd.zipWithIndex.map {
    case (row, index) => Row.fromSeq(row.toSeq :+ index)
  },
  // Create schema for index column
  StructType(df.schema.fields :+ StructField("index", LongType, false)))
}

In [47]:
implicit val ctx = spark.sqlContext

val dataStream = MemoryStream[schemaClass]

In [48]:
val threadSleepTime = 500
val awaitQueryTime = 300000
val idName = "id"

val stream = dataStream.toDF()

val query = stream
        .writeStream
        .foreachBatch(
            (batchDS: Dataset[_], batchId: Long) => {
                System.out.println(s"--- new batch $batchId ---")
                val prediction = model.transform(batchDS).select("prediction")
                System.out.println(prediction.show)
            }
        )
        //.format("console")
        .start()

In [49]:
val emul = new Thread(new Runnable() {
    override def run(): Unit = {
        System.out.println("--- Thread Start ---")
        val dataCount = dataTest.count
        val dataWithIndex = addColumnIndex(dataTest.toDF).withColumn(idName, monotonicallyIncreasingId)
        var n = 0
        while (query.isActive && n<dataCount) {
            dataStream.addData(
                dataWithIndex
                    .filter(s"$idName == $n")
                    .drop(idName)
                    .as[schemaClass]
                    .first
            )
            n+=1

            //Thread.sleep(threadSleepTime)
        }
    }
})

In [50]:
emul.start()

query.awaitTermination(awaitQueryTime)
query.stop()

In [51]:
emul.interrupt()