## Create session

In [1]:
import os
import sys
import numpy as np

os.environ["PYSPARK_PYTHON"] = "/opt/anaconda/envs/bd9/bin/python"
os.environ["SPARK_HOME"]     = "/usr/hdp/current/spark2-client"

spark_home = os.environ.get("SPARK_HOME", None)
if not spark_home:
    raise ValueError("SPARK_HOME environment variable is not set")

sys.path.insert(0, os.path.join(spark_home, "python"))
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

import pyspark
from pyspark import keyword_only
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from pyspark.ml import (
    Transformer, 
    Pipeline, 
    PipelineModel
)

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.param import Param, Params, TypeConverters

from pyspark.ml.feature import (
    VectorAssembler,
    StringIndexer,
    IndexToString,
    CountVectorizer
)

from pyspark.ml.classification import RandomForestClassifier

conf = SparkConf()\
       .setAppName("artem.spitsin_lab04")\
       .set("spark.driver.cores", "2")\
       .set("spark.driver.memory", "1g")\
       .set("spark.executor.instances", "5")\
       .set("spark.executor.cores", "6")\
       .set("spark.executor.memory", "4g")\
       .set("spark.executor.memoryOverhead", "2g")\
       .set("spark.sql.autoBroadcastJoinThreshold", -1)

ss = SparkSession\
     .builder\
     .appName("artem.spitsin_lab04")\
     .config(conf=conf)\
     .getOrCreate()

sc = ss.sparkContext

ss

## Paths

In [2]:
path_train_data = "/labs/slaba04/gender_age_dataset.txt"
path_model_pipeline = "hdfs:///user/artem.spitsin/lab04/model"

## Data schemas

In [3]:
# Schema convert to string for the ability to save a custom transformer 
json_schema = ArrayType(
    StructType([
        StructField("timestamp", LongType(), True),
        StructField("url", StringType(), True)
    ])
).simpleString()

schema_train_data = StructType([
    StructField("gender", StringType(), True),
    StructField("age", StringType(), True),
    StructField("uid", StringType(), True),
    StructField("user_json", StringType(), True)
])         

## Custom transformers

In [5]:
class HasBaseCols(Param):
    baseCols = Param(
        Params._dummy(), "baseCols", 
        "list of fields to be saved after parsing", 
        typeConverter=TypeConverters.toListString
    )

    def __init__(self):
        super(HasBaseCols, self).__init__()
        
    def setBaseCols(self, baseCols):
        if isinstance(baseCols, list):
            self._set(baseCols=baseCols)
        elif isinstance(baseCols, str):
            self._set(baseCols=[baseCols])
        else:
            raise ValueError("Invalid data type for 'baseCols', a list or string was expected.")
            
    def getBaseCols(self):
        return self.getOrDefault("baseCols")

class ParserJson(Transformer, HasBaseCols, DefaultParamsReadable, DefaultParamsWritable):
    """
    Converts nested data to a tabular view.
    """
    jsonSchema = Param(Params._dummy(), "jsonSchema", "json schema that is parsed", typeConverter=TypeConverters.toString)
    jsonCol = Param(
        Params._dummy(), "jsonCol", "name of the field with the json structure", typeConverter=TypeConverters.toString
    )
    
    @keyword_only
    def __init__(self, baseCols:list="uid", jsonCol:str="json_value", jsonSchema:str=None):
        super(ParserJson, self).__init__()
        self._set(**self._input_kwargs)

    def _transform(self, data): 
        jsonCol, jsonSchema = self.getOrDefault("jsonCol"), self.getOrDefault("jsonSchema")

        exploded_data = data\
                        .withColumn(jsonCol, F.from_json(jsonCol, schema=jsonSchema))\
                        .select(*self.getBaseCols(), F.explode_outer(jsonCol).alias("logs"))
        
        flatten_structs = [f"{field.name}.*" for field in exploded_data.schema.fields if type(field.dataType) == StructType] 
        
        return exploded_data.select(*self.getBaseCols(), *flatten_structs)

class DataLogPreparer(Transformer, HasBaseCols, DefaultParamsReadable, DefaultParamsWritable):
    """
    Prepares data before generating features.
    """
    urlCol = Param(
        Params._dummy(), "urlCol", 
        "name of the field with the url that the user visited", 
        typeConverter=TypeConverters.toString
    )
    logDateCol = Param(
        Params._dummy(), "logDateCol", 
        "name of the field with the value of the time when the user visited a particular url", 
        typeConverter=TypeConverters.toString
    )
    
    @keyword_only
    def __init__(self, baseCols:list="uid", urlCol:str="url", logDateCol:str="timestamp"):
        super(DataLogPreparer, self).__init__()
        self._set(**self._input_kwargs)
        
    def __create_time_features(self, data):
        return data\
               .withColumn("log_dt", F.from_unixtime(F.col(self.getOrDefault("logDateCol")) / 1000))\
               .withColumn("day_of_week", F.dayofweek("log_dt"))\
               .withColumn("hour", F.hour("log_dt"))\
               .drop("log_dt")
    
    def __get_clean_url(self, data):
        url_col = self.getOrDefault("urlCol")
        return data\
               .withColumn(f"clean_{url_col}", F.split(url_col, "/")[2])\
               .withColumn("host", F.regexp_replace(f"clean_{url_col}", "www.", ""))\
               .drop(f"clean_{url_col}")
    
    def _transform(self, data):
        prepared_data = self.__create_time_features(data)
        
        return self.__get_clean_url(prepared_data)\
               .groupby(*self.getBaseCols())\
               .agg(
                   F.collect_list("host").alias("logged_hosts"),
                   F.mean("day_of_week").alias("mean_day_of_week"),
                   F.mean("hour").alias("mean_hour")
               )

## Loading and preparation training data

In [6]:
filter_targets = "gender != '-' and age != '-'"

train_data = ss.read.options(delimiter="\t", header=True)\
               .schema(schema_train_data).csv(path_train_data)\
               .where(filter_targets)\
               .withColumn("visits", F.json_tuple("user_json", "visits"))

train_data.count(), len(train_data.columns)

(36138, 5)

## Creating pipeline and model

In [7]:
# Stages for preparation data
base_cols = ["uid", "gender", "age"]
parser_json = ParserJson(baseCols=base_cols, jsonCol="visits", jsonSchema=json_schema)
preparer_fields = DataLogPreparer(baseCols=base_cols, urlCol="url", logDateCol="timestamp")

# Stages for feature engineering
count_vectorizer = CountVectorizer(inputCol="logged_hosts", outputCol="hosts_features")
vector_assembler = VectorAssembler(inputCols=["hosts_features", "mean_day_of_week", "mean_hour"], outputCol="features")

# Stages for modeling
target_encoders, target_decoders, models = {}, {}, {}
for type_target in ["gender", "age"]:
    
    target_encoders[type_target] = StringIndexer(inputCol=type_target, outputCol=f"{type_target}_label")\
                                   .fit(train_data)
    
    models[type_target] = RandomForestClassifier(
        labelCol=target_encoders[type_target].getOutputCol(),
        probabilityCol=f"probability_{type_target}", 
        rawPredictionCol=f"rawPrediction_{type_target}",
        predictionCol=f"prediction_{type_target}",
        featuresCol="features"
    )
    
    target_decoders[type_target] = IndexToString(
        inputCol=f"prediction_{type_target}", outputCol=f"prediction_{type_target}_decode",
        labels=target_encoders[type_target].labels
    )

# Creating of pipeline
pipeline = Pipeline(stages=[
    parser_json,
    preparer_fields,
    count_vectorizer,
    vector_assembler,
    *target_encoders.values(),
    *models.values(),
    *target_decoders.values()
])

In [8]:
%%time
# Saving fitted pipeline
pipeline.fit(train_data).write().overwrite().save(path_model_pipeline)

CPU times: user 94.6 ms, sys: 23.5 ms, total: 118 ms
Wall time: 4min 4s


## Starting streaming

In [9]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_artem.spitsin",
    "startingOffsets": "latest",
    "failOnDataLoss": "False"
}

write_kafka_params = {
   "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
   "topic": "artem.spitsin"
}

kafka_sdf = ss.readStream.format("kafka").options(**read_kafka_params).load()\
            .select(
                F.json_tuple(F.col("value").cast("string"), "uid", "visits")
            )\
            .select(F.col("c0").alias("uid"), F.col("c1").alias("visits"))

# Get predictions
fitted_pipeline = PipelineModel.load(path_model_pipeline)
for ind_stage in range(2):
    fitted_pipeline.stages[ind_stage].setBaseCols(["uid"])

fitted_pipeline\
.transform(kafka_sdf)\
.select(
    F.to_json(
        F.struct(
            "uid",
            F.col("prediction_gender_decode").alias("gender"),
            F.col("prediction_age_decode").alias("age")
        )
    ).alias("value")
)\
.writeStream\
.format("kafka")\
.options(**write_kafka_params)\
.outputMode("complete")\
.option("checkpointLocation", "/user/artem.spitsin/laba04/checkpoint")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fb01a142d68>

## Stopping session

In [10]:
ss.catalog.clearCache()
ss.stop()