# SparkContext creation

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 5 --executor-memory 4g --executor-cores 2 --driver-memory 8g pyspark-shell'



spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
import pyspark.sql.types 
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from urllib.parse import urlparse
import re
import json

conf = SparkConf()
spark = SparkSession.builder.config(conf=conf).appName("Streaming_lab").getOrCreate()

# EDA

In [3]:
!hdfs dfs -ls /labs/slaba04

Found 1 items
-rw-r--r--   3 hdfs hdfs  655090069 2022-01-06 18:46 /labs/slaba04/gender_age_dataset.txt


In [4]:
# define data path and shema
path = '/labs/slaba04/gender_age_dataset.txt'
schema = StructType([
    StructField('gender', StringType()),
    StructField('age', StringType()),
    StructField('uid', StringType()),
    StructField('user_json', StringType()) 
])

# read data and get rid of missing value in gender and age
train_data = spark.read.csv(
    path, header = True, schema = schema, sep = '\t'
).where((F.col('gender') != '-') & (F.col('age') != '-'))

In [5]:
# we have `user_json` feature and we should also define structure for it
user_json_schema = StructType([
    StructField('visits', ArrayType(
        StructType([
            StructField('url', StringType(), True),
            StructField('timestamp', LongType(), True)
        ])
    ))
])
train_data = train_data.withColumn('parsed_user', F.from_json('user_json', schema=user_json_schema))

# one more thing - explode `visit` feature
train_data_exploded = train_data.withColumn("visit", F.explode(F.col("parsed_user.visits")))
train_data_exploded = train_data_exploded.select('*', F.col('visit.url'), F.col('visit.timestamp'))

In [6]:
def extract_domain(url:str=None):
    '''
    Function extracts domains from url
        Args:
            url:str
                Current URL
        Returns:
            domain:str
                Domain of the particular URL
    '''
    parsed_url = urlparse(url)
    return parsed_url.netloc

domain_udf = F.udf(extract_domain, StringType())
cleared_urls = train_data_exploded.withColumn("clean_url", domain_udf(F.col("url")))

# Classifier

In [7]:
features = cleared_urls.select('age', 'gender', 'uid', 'clean_url')

# collect all visited links by user
visited_links = features.groupBy('uid').agg(F.collect_list('clean_url').alias('all_links'))

# define `unique` datas only and join with visited links
unique_data = features.select('uid', 'age', 'gender').dropDuplicates(['uid'])
features_n_target = visited_links.join(other=unique_data, on=['uid'], how='inner').cache()

In [8]:
# create HashingTF for all visited links
hashingTF = HashingTF(inputCol='all_links', outputCol='rawFeatures', numFeatures=30000)
features_n_target_preprocessed = hashingTF.transform(features_n_target).drop('all_links')

# define and apply IDF
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(features_n_target_preprocessed)
features_n_target_preprocessed = idfModel.transform(features_n_target_preprocessed)

In [9]:
# define StringIndexer for `gender` column 
label_indexer_gender = StringIndexer(inputCol="gender", outputCol="label_gender").fit(features_n_target_preprocessed)
features_n_target_preprocessed = label_indexer_gender.transform(features_n_target_preprocessed)

In [10]:
# define StringIndexer for `gender` column 
label_indexer_age = StringIndexer(inputCol="age", outputCol="label_age").fit(features_n_target_preprocessed)
features_n_target_preprocessed = label_indexer_age.transform(features_n_target_preprocessed)

In [11]:
# deffine train-test split
fractions_age = features_n_target_preprocessed.select("age").distinct().withColumn("fraction", F.lit(0.75)).rdd.collectAsMap()
train_age = features_n_target_preprocessed.stat.sampleBy("age", fractions_age, seed=2002)
test_age = features_n_target_preprocessed.join(train_age, on=["uid"], how="left_anti")

fractions_gender = features_n_target_preprocessed.select("gender").distinct().withColumn(
    "fraction", F.lit(0.75)
).rdd.collectAsMap()
train_gender = features_n_target_preprocessed.stat.sampleBy("gender", fractions_gender, seed=2002)
test_gender = features_n_target_preprocessed.join(train_gender, on=["uid"], how="left_anti")

## Train model

### `Age` model

In [12]:
randomForestAge = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="label_age", featuresCol="features", seed=2002)
model_randomForestAge = randomForestAge.fit(train_age)

predictions_age = model_randomForestAge.transform(test_age)
evaluator = MulticlassClassificationEvaluator(labelCol="label_age", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_age)
print("Accuracy = %g" % accuracy)

Accuracy = 0.433396


### `Gender` model

In [13]:
randomForestGender = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="label_gender", featuresCol="features", seed=2002)
model_randomForestGender = randomForestGender.fit(train_gender)

predictions_gender = model_randomForestGender.transform(test_gender)
evaluator = MulticlassClassificationEvaluator(labelCol="label_gender", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_gender)
print("Accuracy = %g" % accuracy)

Accuracy = 0.525694


# Kafka

In [139]:
KAFKA_BOOTSTRAP_SERVER = 'spark-master-1.newprolab.com:6667'
KAFKA_INPUT_SERVER = 'input_nikita.ermishov'
KAFKA_OUTPUT_TOPIC = 'nikita.ermishov'

In [140]:
def create_console_sink(df):
    '''
    Function that allow to show df
    '''
    
    return df.writeStream.format("console").trigger(processingTime="5 seconds").option(
        "truncate", "false"
    ).option("numRows", "20")

kafka_read_sdf = spark.readStream.format('kafka').option(
    'kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVER
).option('subscribe', KAFKA_INPUT_SERVER).option('startingOffsets', 'earliest').option('failOnDataLoss', 'False').load()

In [142]:
# kafka_read_sdf.show()

In [143]:
# define schemas
event_schema = StructType([
    StructField('uid', StringType(), True),
    StructField('visits', StringType(), True) 
])

visits_schema = ArrayType(
    StructType([
        StructField('url', StringType(), True),
        StructField('timestamp', LongType(), True)
    ])
)

# and streaming dataframe
clean_sdf = kafka_read_sdf.select(F.col('value').cast('string').alias('value')).select(
    F.from_json(F.col('value'), event_schema).alias('event')
).select('event.uid', F.from_json(F.col('event.visits'), visits_schema).alias('visits'))

In [144]:
sdf_explode = clean_sdf.withColumn("url", F.explode(F.col("visits.url"))).select('uid', 'url')
sdf_example = sdf_explode.withColumn("clean_url", domain_udf(F.col("url")))

In [145]:
sdf_collect = sdf_example.groupBy('uid').agg(F.collect_list('clean_url').alias('all_links'))

define all transforms as it was before

In [146]:
hashingTF = HashingTF(inputCol='all_links', outputCol='rawFeatures', numFeatures=30000)
sdf_features = hashingTF.transform(sdf_collect).drop('all_links')

In [151]:
sdf_features = sdf_features.withColumn('features', F.col('rawFeatures'))

In [152]:
prediction_test_age = model_randomForestAge.transform(sdf_features)
prediction_test_gender = model_randomForestGender.transform(sdf_features)

In [153]:
convert_age = IndexToString(inputCol = 'prediction', outputCol = 'age', labels = label_indexer_age.labels)
prediction_test_age = convert_age.transform(prediction_test_age)

In [154]:
convert_gender = IndexToString(inputCol = 'prediction', outputCol = 'gender', labels = label_indexer_gender.labels)
prediction_test_gender = convert_gender.transform(prediction_test_gender)

In [155]:
sdf_prediction = prediction_test_gender.join(prediction_test_age, 'uid', 'inner').select('uid', 'gender', 'age')

In [157]:
sdf_kafka_out = (
    sdf_prediction
    .select(F.to_json(F.struct(*sdf_prediction.columns)).alias('value'))
)

In [159]:
(
    sdf_kafka_out
    .writeStream
    .format('kafka')
    .outputMode('append')
    .option('kafka.bootstrap.servers', KAFKA_BOOTSTRAP_SERVER)
    .option('topic', KAFKA_OUTPUT_TOPIC)
    #.save()
)

<pyspark.sql.streaming.DataStreamWriter at 0x7f9290eac2e8>

In [275]:
# sq.stop()

In [186]:
sc.stop()