In [350]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))
display(HTML("<style>#notebook { padding:0px !important; }</style>"))
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# context

In [351]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 4 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [352]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark import Row
import json

conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5")
         .appName("dont")
         .getOrCreate())

In [353]:
from pyspark.ml import PipelineModel
from pyspark.ml import Transformer

In [354]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# lab04

## train data

In [355]:
Structure_Schema = StructType([
    StructField('gender', StringType(), True),
    StructField('age', StringType(), True),
    StructField('uid', StringType(), True),
    StructField('user_json', MapType(StringType(), ArrayType(MapType(StringType(), StringType())), False), True)
    ])

In [356]:
data0 = spark.read.option("header", True).option("delimiter", "\t").option("inferSchema", True).csv("/labs/slaba04/gender_age_dataset.txt")

In [357]:
data1 = (
    data0
    .filter("gender <> '-' OR age <> '-'")
    .withColumn("target", F.concat(F.col("gender"), F.col("age")))
    .drop("gender", "age")
    .withColumn("parse_json", F.from_json("user_json", MapType(StringType(), ArrayType(MapType(StringType(), StringType())))))
)

In [375]:
data2 = (
    data1
    .withColumn("test", data1.parse_json.visits)
    .drop("user_json", "parse_json")
)

In [381]:
data3 = (
    data2
    .withColumn("urls", F.expr('transform(test, x -> x.url)'))
    .drop("test")
)

In [391]:
data4 = (
    data3
    .withColumn("urls", F.expr("transform(urls, x -> parse_url(x, 'HOST'))"))
)

In [392]:
data4.show(1, truncate=False)

+------------------------------------+------+-------------------------------------------------------------------------------+
|uid                                 |target|urls                                                                           |
+------------------------------------+------+-------------------------------------------------------------------------------+
|d50192e5-c44e-4ae8-ae7a-7cfe67c8b777|F18-24|[zebra-zoya.ru, news.yandex.ru, www.sotovik.ru, news.yandex.ru, www.sotovik.ru]|
+------------------------------------+------+-------------------------------------------------------------------------------+
only showing top 1 row



## kafka batch data

In [399]:
batch_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_ravil.badamshin",
    "failOnDataLoss": "False"
}

In [400]:
kafka_batch0 = spark.read.format("kafka").options(**batch_kafka_params).load()

In [401]:
kafka_batch1 = (
    kafka_batch0
    .withColumn("value", F.col("value").cast("string"))
    .select("value")
    .withColumn("uid", F.get_json_object("value", "$.uid"))
    .withColumn("visits", F.get_json_object("value", "$.visits"))
    .drop("value")
)

In [403]:
kafka_batch2 = (
    kafka_batch1
    .withColumn("parse_visits", F.from_json("visits", ArrayType(MapType(StringType(), StringType()))))
    .drop("visits")
)

In [405]:
kafka_batch3 = (
    kafka_batch2
    .withColumn("urls", F.expr('transform(parse_visits, x -> x.url)'))
    .drop("parse_visits")
)

In [407]:
kafka_batch4 = (
    kafka_batch3
    .withColumn("urls", F.expr("transform(urls, x -> parse_url(x, 'HOST'))"))
)

## train model

In [409]:
target_indexer = StringIndexer(inputCol="target", outputCol="targetIndex")
target_indexer.setHandleInvalid("keep")
target_indexer_fit = target_indexer.fit(data4)
target_indexed = target_indexer_fit.transform(data4)

In [410]:
(train_data, test_data) = target_indexed.randomSplit([0.8, 0.2])

In [411]:
rf = RandomForestClassifier(labelCol="targetIndex", featuresCol="urls_vector", numTrees=8, maxBins=10000)

In [412]:
targetConverter = IndexToString(inputCol="prediction", 
                                outputCol="predicted_target",
                                labels=target_indexer_fit.labels)

In [413]:
from pyspark.ml.feature import CountVectorizer
urls_count_vectorizer = CountVectorizer(inputCol="urls", outputCol="urls_vector", vocabSize=10000)

In [416]:
pipeline = Pipeline(stages=[
    urls_count_vectorizer,
    rf,
    targetConverter
])

In [417]:
model = pipeline.fit(train_data)

In [418]:
predictions = model.transform(test_data)

In [419]:
predictions.show(5)

+--------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+----------------+
|                 uid|target|                urls|targetIndex|         urls_vector|       rawPrediction|         probability|prediction|predicted_target|
+--------------------+------+--------------------+-----------+--------------------+--------------------+--------------------+----------+----------------+
|05031311-38ba-46b...|M45-54|[frolnews.blogspo...|        6.0|(10000,[130,201,2...|[1.99684071322843...|[0.24960508915355...|       0.0|          M25-34|
|05060864-cd46-48d...|M35-44|      [www.mnogo.ru]|        2.0|  (10000,[65],[1.0])|[1.92095459613138...|[0.24011932451642...|       0.0|          M25-34|
|0507c191-2ed5-47d...|F45-54|[playcast.ru, www...|        5.0|(10000,[530,2336]...|[1.76507591767248...|[0.22063448970906...|       0.0|          M25-34|
|050a33e2-e45e-4b7...|M25-34|[agligator.ru, ag...|        0.0|(10000,[3,9,10

In [420]:
predictions.select("predicted_target", "targetIndex", "urls_vector").show(5)

+----------------+-----------+--------------------+
|predicted_target|targetIndex|         urls_vector|
+----------------+-----------+--------------------+
|          M25-34|        6.0|(10000,[130,201,2...|
|          M25-34|        2.0|  (10000,[65],[1.0])|
|          M25-34|        5.0|(10000,[530,2336]...|
|          M25-34|        0.0|(10000,[3,9,10,36...|
|          M25-34|        2.0|(10000,[121,201,6...|
+----------------+-----------+--------------------+
only showing top 5 rows



In [421]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="targetIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model.transform(test_data))
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.758423


In [422]:
model.save("/user/ravil.badamshin/model4")

In [423]:
saved_model = PipelineModel.load("/user/ravil.badamshin/model4")

In [424]:
saved_predictions = saved_model.transform(test_data)

In [425]:
saved_predictions.select("predicted_target", "targetIndex", "urls_vector").show(5)

+----------------+-----------+--------------------+
|predicted_target|targetIndex|         urls_vector|
+----------------+-----------+--------------------+
|          M25-34|        6.0|(10000,[130,201,2...|
|          M25-34|        2.0|  (10000,[65],[1.0])|
|          M25-34|        5.0|(10000,[530,2336]...|
|          M25-34|        0.0|(10000,[3,9,10,36...|
|          M25-34|        2.0|(10000,[121,201,6...|
+----------------+-----------+--------------------+
only showing top 5 rows



In [426]:
accuracy = evaluator.evaluate(saved_predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.758423


## test on kafka batch

In [427]:
batch_predictions = saved_model.transform(kafka_batch4)

In [428]:
batch_predictions = (
    batch_predictions
    .select("*", F.col("predicted_target").substr(1, 1).alias("gender"), F.col("predicted_target").substr(2, 5).alias("age"))
)

In [429]:
batch_predictions.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------------+------+-----+
|                 uid|                urls|         urls_vector|       rawPrediction|         probability|prediction|predicted_target|gender|  age|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------+----------------+------+-----+
|bd7a30e1-a25d-4cb...|[www.interfax.ru,...|(10000,[0,3,6,9,1...|[2.12060761675731...|[0.26507595209466...|       0.0|          M25-34|     M|25-34|
|bd7a6f52-45db-49b...|[www.packagetrack...|(10000,[0,3,9,10,...|[1.97377440285684...|[0.24672180035710...|       0.0|          M25-34|     M|25-34|
|bd7a7fd9-ab06-42f...|[www.mk.ru, www.m...|(10000,[78,1279],...|[1.92095459613138...|[0.24011932451642...|       0.0|          M25-34|     M|25-34|
|bd7c5d7a-0def-41d...|[www.24open.ru, w...|(10000,[0,3,6,39,...|[1.87514658062919...|[0.23439332257864...|      

In [430]:
batch_predictions.select("uid", "gender", "age").show(5)

+--------------------+------+-----+
|                 uid|gender|  age|
+--------------------+------+-----+
|bd7a30e1-a25d-4cb...|     M|25-34|
|bd7a6f52-45db-49b...|     M|25-34|
|bd7a7fd9-ab06-42f...|     M|25-34|
|bd7c5d7a-0def-41d...|     M|25-34|
|bd7e54a2-0215-45c...|     M|25-34|
+--------------------+------+-----+
only showing top 5 rows



In [431]:
batch_predictions.select("uid", "prediction").groupBy("prediction").count().show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0|252654|
|       1.0|  1530|
|       3.0|    51|
|       2.0|   765|
+----------+------+



## kafka streaming

In [481]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_ravil.badamshin",
    "startingOffsets": "latest",
    "failOnDataLoss": "False"
}
write_kafka_params = {
   "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
   "topic": "ravil.badamshin"
}

In [482]:
streams = SparkSession.builder.getOrCreate().streams.active
if streams:
    for s in streams:
        desc = s.lastProgress["sources"][0]["description"]
        s.stop()
        print("Stopped {s}".format(s=desc))

In [499]:
SparkSession.builder.getOrCreate().streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x7fc8b5efb160>]

In [484]:
kafka_sdf0 = spark.readStream.format("kafka").options(**read_kafka_params).load()

In [485]:
kafka_sdf1 = (
    kafka_sdf0
    .withColumn("value", F.col("value").cast("string"))
    .select("value")
    .withColumn("uid", F.get_json_object("value", "$.uid"))
    .withColumn("visits", F.get_json_object("value", "$.visits"))
    .drop("value")
)

In [486]:
kafka_sdf2 = (
    kafka_sdf1
    .withColumn("parse_visits", F.from_json("visits", ArrayType(MapType(StringType(), StringType()))))
    .drop("visits")
)

In [487]:
kafka_sdf3 = (
    kafka_sdf2
    .withColumn("urls", F.expr('transform(parse_visits, x -> x.url)'))
    .drop("parse_visits")
)

In [488]:
kafka_sdf4 = (
    kafka_sdf3
    .withColumn("urls", F.expr("transform(urls, x -> parse_url(x, 'HOST'))"))
)

In [493]:
kafka_predictions0 = model.transform(kafka_sdf4)

In [495]:
kafka_predictions1 = (
    kafka_predictions0
    .select("uid", 
            F.col("predicted_target").substr(1, 1).alias("gender"), 
            F.col("predicted_target").substr(2, 5).alias("age"))
)

In [496]:
kafka_out_df = (
    kafka_predictions1
    .select(F.to_json(F.struct(*kafka_predictions1.columns)).alias("value"))
)

In [497]:
(kafka_out_df
 .writeStream.format("kafka").options(**write_kafka_params)
 .option("checkpointLocation", "streaming/chk/chk_kafka")
 .outputMode("append")
 .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7fc8b5e6f198>

In [500]:
spark.stop()