In [None]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{CountVectorizer, StringIndexer, IndexToString}
import org.apache.spark.ml.{Pipeline, PipelineModel}

# Train

In [None]:
val spark = SparkSession.builder()
    .appName("MLProject Lab07 DE")
    //.master("yarn")
    //.config("spark.submit.deployMode", "cluster")
    .config("spark.driver.memory", "9g")
    .config("spark.driver.cores", "3")
    .config("spark.executor.instances", "6")
    .config("spark.executor.memory", "9g")
    .config("spark.executor.cores", "3")
    //.config("spark.sql.shuffle.partitions", "81")
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate()

In [None]:
spark.stop()

In [None]:
//val trainDir = spark.conf.get("spark.train.train_dir")
val trainDir = "/labs/laba07/laba07.json"
//val modelPath = spark.conf.get("spark.train.pipeline_dir")
val modelPath = "/user/andrey.blednykh2/model"

In [None]:
val schema = StructType(Seq(
    StructField("gender_age", StringType, true),
    StringField("uid", StringType, true),
    StringField("visits", StringType, true)
))

In [None]:
// sampling ratio
val jsonWeblogs = spark.read.json(trainDir)
//    .select(from_json(col("value"), schema))

In [None]:
trainWeblogs.printSchema

In [None]:
val training = jsonWeblogs
    .withColumn("parsedVisits", explode(col("visits")))
    .withColumn("timestamp", col("parsedVisits.timestamp"))
    .withColumn("urlRaw", col("parsedVisits.url"))
    //.withColumn("host", lower(callUDF("parse_url", $"urlRaw", lit("HOST")))) // a log of bags... http, https, NULL domains... for partial correct url
    .withColumn("cleaning1", regexp_replace(col("urlRaw"), "https://", "http://"))
    .withColumn("cleaning2", regexp_replace(col("cleaning1"), "http://http://", "http://")) 
    .withColumn("host", regexp_extract($"cleaning2","^(([^:\\/?#]+):)?(\\/\\/([^\\/?#]*))?([^?#]*)(\\?([^#]*))?(#(.*))?", 4))
    .withColumn("cleaning3", regexp_replace($"host", "^www.", ""))
    .withColumn("domain", regexp_replace($"cleaning3", "^\\.", ""))  // special for kasparov with www1.
    .select("uid", "gender_age", "domain")
    .groupBy("uid", "gender_age")
    .agg(collect_list("domain").alias("domains"))

In [None]:
trainParsed.show(2, 100, true)

In [None]:
trainParsed.printSchema

#### Обучение

In [None]:
val cv = new CountVectorizer()
    .setInputCol("domains")
    .setOutputCol("features")

In [None]:
val indexer = new StringIndexer()
    .setInputCol("gender_age")
    .setOutputCol("label")
    .fit(training); 

In [None]:
val lr = new LogisticRegression()
    .setMaxIter(1000)
    .setRegParam(0.001)

In [None]:
val lc = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(indexer.labels);

In [None]:
val pipeline = new Pipeline()
    .setStages(Array(cv, indexer, lr, lc))

In [None]:
val model = pipeline.fit(training)

In [None]:
model.write.overwrite().save(modelPath)

# Test

In [None]:
val kafkaInputParams = Map(
    "kafka.bootstrap.servers" -> "spark-master-1:6667",
    "subscribe" -> "andrey.blednykh2",
    "startingOffsets" -> """earliest"""
)

In [None]:
val kafkaWeblogs = spark
    .readStream
    .format("kafka")
    .options(kafkaInputParams)
    .load

In [None]:
val schema = StructType(Seq(
    StringField("uid", StringType, true),
    StringField("visits", StringType, true)
))

In [None]:
val schemaTest = ArrayType(
    StructType(Seq(
        StructField("url", StringType, true),
        StructField("timestamp", StringType, true)
      ))
    )

In [None]:
val kafkaValues = kafkaData
    .select(col("value").cast("string"))

In [None]:
val kafkaWeblogs = kafkaValues
    .withColumn("value", from_json(col("value"), schema))
    .select(col("value.*"))

In [None]:
val testing = kafkaWeblogs
    .withColumn("visitsAR", from_json(col("visits"), schemaTest))
    .withColumn("parsedVisits", explode(col("visitsAR")))
    .withColumn("timestamp", col("parsedVisits.timestamp"))
    .withColumn("urlRaw", col("parsedVisits.url"))
    //.withColumn("host", lower(callUDF("parse_url", $"urlRaw", lit("HOST")))) // a log of bags... http, https, NULL domains... for partial correct url
    .withColumn("cleaning1", regexp_replace(col("urlRaw"), "https://", "http://"))
    .withColumn("cleaning2", regexp_replace(col("cleaning1"), "http://http://", "http://"))
    .withColumn("host", regexp_extract($"cleaning2","^(([^:\\/?#]+):)?(\\/\\/([^\\/?#]*))?([^?#]*)(\\?([^#]*))?(#(.*))?", 4))
    .withColumn("cleaning3", regexp_replace($"host", "^www.", ""))
    .withColumn("domain", regexp_replace($"cleaning3", "^\\.", ""))  // special for kasparov with www1.
    .select("uid", "domain")
    .groupBy("uid")
    .agg(collect_list("domain").alias("domains"))

In [None]:
val model = PipelineModel.load(modelPath)

In [None]:
val logPrediction = model.transform(testing)

In [None]:
val resultForKafka = logPrediction
    .select(col("uid"), col("predictedLabel").alias("gender_age")) //.show(20, 200, true)

In [None]:
val kafkaOutputParams = Map(
    "kafka.bootstrap.servers" -> "spark-master-1:6667",
    "topic" -> "andrey_blednykh2_lab07_out",
    "checkpointLocation" -> "streaming/checkpoint/",
    "truncate" -> "false"
)

In [None]:
resultForKafka
    .select(col("uid").cast("string").alias("key"),
        to_json(struct("uid", "gender_age")).alias("value"))
    .writeStream
    .format("kafka")
    .options(kafkaOutputParams)
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .outputMode("update")
    .start()
    .awaitTermination()

In [None]:
SparkSession
    .active
    .streams
    .active
    .foreach { x =>
        val desc = x.lastProgress.sources.head.description
        x.stop
        println(s"Stopped ${desc}")
    }  