In [None]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 2 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
import json
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import CountVectorizerModel
from pyspark.ml.classification import RandomForestClassificationModel



conf = SparkConf()

spark = (SparkSession
         .builder
         .enableHiveSupport()
         .config(conf=conf)
         .appName("dash")
         .getOrCreate())

In [None]:
vectorizer_model = CountVectorizerModel.load("/user/daria.sharipova/lab04_vectorizer_model")
gender_classifier_model = RandomForestClassificationModel.load("/user/daria.sharipova/lab04_gender_model")
age_classifier_model = RandomForestClassificationModel.load("/user/daria.sharipova/lab04_age_model")

In [None]:
read_kafka_params = {
    'kafka.bootstrap.servers': 'spark-master-1.newprolab.com:6667',
    'subscribe': 'input_daria.sharipova',
    'startingOffsets': 'latest',
    'failOnDataLoss': 'False'
}
kafka_sdf = spark.readStream.format('kafka').options(**read_kafka_params).load()
event_schema = StructType([
    StructField('uid', StringType()),
    StructField('visits', StringType())
])
visits_schema = ArrayType(
    StructType([
        StructField('url', StringType()),
        StructField('timestamp', LongType())
    ])
)

In [None]:
stream_df = kafka_sdf.withColumn('value', F.col('value').cast('string'))
stream_df = stream_df.withColumn('event', F.from_json('value', event_schema))
stream_df = stream_df.withColumn('uid', F.col('event')['uid'])
stream_df = stream_df.withColumn('user_json', F.col('event')['visits'])
stream_df = stream_df.withColumn('user_json', F.from_json('user_json', visits_schema))

# Определение функции для парсинга URL
def parse_url(url):
    try:
        return urlparse(url).hostname
    except:
        return None

# Регистрация функции как UDF
parse_url_udf = udf(parse_url, StringType())

stream_df = stream_df.withColumn('visit', F.explode('user_json'))
stream_df = stream_df.withColumn('visit_url', F.col('visit')['url'])
stream_df = stream_df.withColumn('parsed_visit_url', parse_url_udf('visit_url'))

stream_df = stream_df\
    .groupBy('uid')\
    .agg(
        F.collect_list('parsed_visit_url').alias('visits_urls')
)

stream_df = vectorizer_model.transform(stream_df)
stream_df = gender_classifier_model.transform(stream_df)
stream_df = age_classifier_model.transform(stream_df)

In [None]:
# Определение UDF для декодирования пола
def decode_gender(gender):
    return 'F' if gender == 0 else 'M'

# Определение UDF для декодирования возраста
def decode_age(age):
    if age == 0:
        return '18-24'
    elif age == 1:
        return '25-34'
    elif age == 2:
        return '35-44'
    elif age == 3:
        return '45-54'
    else:
        return '55+'

# Регистрация UDF
decode_gender_udf = udf(decode_gender, StringType())
decode_age_udf = udf(decode_age, StringType()) 
    
stream_df = stream_df.withColumn('gender', decode_gender_udf('gender_prediction'))
stream_df = stream_df.withColumn('age', decode_age_udf('age_prediction'))

stream_df = stream_df[['uid', 'gender', 'age']]

In [None]:
predictions_df = stream_df\
    .select(F.to_json(F.struct(*stream_df.columns)).alias('value'))

write_kafka_params = {
   'kafka.bootstrap.servers': 'spark-master-1.newprolab.com:6667',
   'topic': 'daria.sharipova'
}
streaming_query = predictions_df.writeStream.format('kafka').options(**write_kafka_params)\
    .option('checkpointLocation', 'checkpoints/checkpoints_lab04')\
    .outputMode('complete').start()