In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassificationModel


In [None]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2").\
        getOrCreate()        
        

In [None]:
gender_model = DecisionTreeClassificationModel.load(
        "hdfs://namenode:9000//user/data/spark_ml_101/ec_web_logs_analysis/models/model_age_group_prediction/")

In [None]:
# connect to Kafka stream
logs_stream = spark.readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "broker:29092") \
                .option("subscribe", "ec_web_logs_stream") \
                .load()

# # schema
logs_stream = logs_stream.select(logs_stream["value"].cast("string")) \
                     .selectExpr("split(value,',')[0] as device_id",
                                 "split(value,',')[1] as timestamp",
                                 "cast(split(value,',')[2] as int) as product_category_id",
                                 "split(value,',')[3] as ip",
                                 "cast(split(value,',')[6] as int) device_type",
                                 "cast(split(value,',')[7] as int) connection_type")

# Prepare features and preprocessing
data_prep = logs_stream.select("device_id", "timestamp", "product_category_id", "device_type", "connection_type")

data_prep = VectorAssembler(inputCols=["product_category_id", "device_type", "connection_type"],
                            outputCol="features").transform(data_prep)

inferred_gender_added = gender_model.transform(data_prep)\
        .select(col("device_id"), col("timestamp"), col("prediction").alias("inferred_gender"), col("features"))

In [None]:
result = inferred_gender_added.selectExpr("cast(timestamp as string) as key",  
                                             "cast(concat(device_id, ',', timestamp, ',', inferred_gender, ',', inferred_gender) as string) as value")



In [None]:
 query = inferred_gender_added.select("device_id", "timestamp", "inferred_gender")\
                .writeStream \
                .format("csv") \
                .option("format", "append") \
                .option("path", "hdfs://namenode:9000//user/data/spark_ml_101/ec_web_logs_analysis/streaming") \
                .option("checkpointLocation", "hdfs://namenode:9000//user/data/checkpoint") \
                .outputMode("append") \
                .start()\
                .awaitTermination()


# query = inferred_gender_added.writeStream\
#                     .outputMode("append")\
#                     .format("console") \
#                     .option("truncate","false") \
#                     .start()\
#                     .awaitTermination()