In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 10 --executor-memory 5g --executor-cores 4 --driver-memory 3g pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
import json

conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("test")
         .getOrCreate())

In [76]:
spark

In [4]:
!hdfs dfs -ls /labs/slaba04/

Found 1 items
-rw-r--r--   3 hdfs hdfs  655090069 2022-01-06 18:46 /labs/slaba04/gender_age_dataset.txt


In [13]:
train_data = spark.read.csv('/labs/slaba04/gender_age_dataset.txt', sep='\t', header=True)
train_data.show(5)

+------+-----+--------------------+--------------------+
|gender|  age|                 uid|           user_json|
+------+-----+--------------------+--------------------+
|     F|18-24|d50192e5-c44e-4ae...|{"visits": [{"url...|
|     M|25-34|d502331d-621e-472...|{"visits": [{"url...|
|     F|25-34|d50237ea-747e-48a...|{"visits": [{"url...|
|     F|25-34|d502f29f-d57a-46b...|{"visits": [{"url...|
|     M| >=55|d503c3b2-a0c2-4f4...|{"visits": [{"url...|
+------+-----+--------------------+--------------------+
only showing top 5 rows



In [5]:
train_data.count()

41138

In [37]:
train_data.select("gender").distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
|     -|
+------+



In [38]:
train_data.filter(train_data.gender=='-').count()

5000

In [39]:
train_data.select("age").distinct().show()

+-----+
|  age|
+-----+
| >=55|
|45-54|
|    -|
|35-44|
|25-34|
|18-24|
+-----+



In [14]:
train_data = train_data.filter((train_data.gender != '-') & (train_data.age != '-'))
train_data.count()

36138

In [15]:
gender_dict = {'F': '0', 'M': '1'}
age_dict = {'18-24': '0', '25-34': '1', '35-44': '2', '45-54': '3', '>=55': '4'}

In [16]:
train_data = train_data.replace(gender_dict, subset=["gender"])\
                       .replace(age_dict, subset=["age"])\
                       .withColumn('gender', F.col("gender").cast(IntegerType()))\
                       .withColumn('age', F.col("age").cast(IntegerType()))
train_data.show(5)

+------+---+--------------------+--------------------+
|gender|age|                 uid|           user_json|
+------+---+--------------------+--------------------+
|     0|  0|d50192e5-c44e-4ae...|{"visits": [{"url...|
|     1|  1|d502331d-621e-472...|{"visits": [{"url...|
|     0|  1|d50237ea-747e-48a...|{"visits": [{"url...|
|     0|  1|d502f29f-d57a-46b...|{"visits": [{"url...|
|     1|  4|d503c3b2-a0c2-4f4...|{"visits": [{"url...|
+------+---+--------------------+--------------------+
only showing top 5 rows



In [17]:
age_data = train_data.select("uid", "age")
gender_data = train_data.select("uid", "gender")

In [26]:
user_jsons = train_data.select("user_json").collect()
uids = train_data.select("uid").collect()

In [27]:
schema = StructType([StructField('timestamp', StringType()),\
                    StructField('url', StringType()),\
                    StructField('uid', StringType())])

In [28]:
def create_df(uid, user_json):
    data = json.loads(user_json[0])['visits']
    df = spark.createDataFrame(Row(**x) for x in data)
    df = df.withColumn('timestamp', F.col('timestamp').cast(LongType()))
    df = df.withColumn('uid', F.lit(uid[0]))
    return df

In [29]:
mydf = create_df(uids[10], user_jsons[10])
mydf.show()

+-------------+--------------------+--------------------+
|    timestamp|                 url|                 uid|
+-------------+--------------------+--------------------+
|1419755762980|http://muz4in.net...|d51294ed-1b95-4e4...|
|1419581810838|http://www.smachn...|d51294ed-1b95-4e4...|
|1418632266469|http://www.fotoco...|d51294ed-1b95-4e4...|
|1418632216018|http://fotocollag...|d51294ed-1b95-4e4...|
|1418123008253|http://www.labiri...|d51294ed-1b95-4e4...|
|1418123007649|http://www.labiri...|d51294ed-1b95-4e4...|
|1417932238909|http://kaluga.tax...|d51294ed-1b95-4e4...|
|1427126630000|http://love.mail....|d51294ed-1b95-4e4...|
|1427126509000|http://love.mail....|d51294ed-1b95-4e4...|
|1427126307000|http://go.youlame...|d51294ed-1b95-4e4...|
|1427121385001|http://angelreiki...|d51294ed-1b95-4e4...|
|1427121385000|http://google.ru/...|d51294ed-1b95-4e4...|
|1427121331000|http://www.angelr...|d51294ed-1b95-4e4...|
|1427121329001|http://www.angelr...|d51294ed-1b95-4e4...|
|1427121329000

In [30]:
from tqdm import tqdm

In [40]:
view_df = spark.createDataFrame([], schema).toPandas()

In [None]:
for i in tqdm(range(len(uids)), leave=True):
    df = create_df(uids[i], user_jsons[i]).toPandas()
    view_df = view_df.append(df)

In [None]:
view_df.to_csv('view_df.csv')

In [49]:
view_df.write.csv("view_df.csv")

In [4]:
schema_view = StructType(fields=[StructField("timestamp", StringType()),
                                StructField("url", StringType()),
                                StructField("uid", StringType())])


In [5]:
view_df = spark.read.csv('view_df.csv', schema=schema_view)
view_df.show(5)

+-------------+--------------------+--------------------+
|    timestamp|                 url|                 uid|
+-------------+--------------------+--------------------+
|1419688144068|http://zebra-zoya...|d50192e5-c44e-4ae...|
|1426666298001|http://news.yande...|d50192e5-c44e-4ae...|
|1426666298000|http://www.sotovi...|d50192e5-c44e-4ae...|
|1426661722001|http://news.yande...|d50192e5-c44e-4ae...|
|1426661722000|http://www.sotovi...|d50192e5-c44e-4ae...|
+-------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
view_df.filter(view_df.url.isNull()).count()

656

In [12]:
view_df.count()

5314400

In [6]:
view_df = view_df.na.drop("any")
view_df.count()

5312174

In [7]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT

In [8]:
import re
regex = re.compile(u'[\w\d]{2,}', re.U)
def tokenizing(string):
    return regex.findall(string.lower())

def sparse_to_array(v):
    v = Vectors.dense(v)
    new_array = list([float(x) for x in v])
    return new_array

tokenization = F.udf(tokenizing, ArrayType(StringType()))
convert_dense = F.udf(lambda vector: Vectors.dense(vector), VectorUDT())
#convert_dense = F.udf(sparse_to_array, ArrayType(FloatType()))

In [9]:
hashingTF = HashingTF(numFeatures=750, inputCol="tokens", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="features")

In [10]:
view_df = view_df.groupBy("uid")\
                        .agg(F.concat_ws(", ", F.collect_list(view_df.url)).alias("url"),\
                             F.count("timestamp").alias("count_time"))
view_df.show(3, vertical=True)

-RECORD 0--------------------------
 uid        | 0392f398-ea7e-4a1... 
 url        | http://www.adme.r... 
 count_time | 212                  
-RECORD 1--------------------------
 uid        | 094b1e7e-97a6-441... 
 url        | http://kinoclips.... 
 count_time | 62                   
-RECORD 2--------------------------
 uid        | 095544a2-64f7-422... 
 url        | http://well-good.... 
 count_time | 4                    
only showing top 3 rows



In [11]:
view_df = view_df.withColumn("tokens", tokenization("url"))
view_df = hashingTF.transform(view_df)
idf_model_view = idf.fit(view_df)
view_df = idf_model_view.transform(view_df)
view_df = view_df.withColumn("features", convert_dense("features"))
view_df.show(3, vertical=True)

-RECORD 0--------------------------
 uid        | 0392f398-ea7e-4a1... 
 url        | http://www.adme.r... 
 count_time | 212                  
 tokens     | [http, www, adme,... 
 tf         | (750,[0,7,8,10,12... 
 features   | [9.36918946316104... 
-RECORD 1--------------------------
 uid        | 094b1e7e-97a6-441... 
 url        | http://kinoclips.... 
 count_time | 62                   
 tokens     | [http, kinoclips,... 
 tf         | (750,[0,8,10,11,1... 
 features   | [11.2430273557932... 
-RECORD 2--------------------------
 uid        | 095544a2-64f7-422... 
 url        | http://well-good.... 
 count_time | 4                    
 tokens     | [http, well, good... 
 tf         | (750,[147,165,407... 
 features   | [0.0,0.0,0.0,0.0,... 
only showing top 3 rows



In [18]:
from pyspark.ml.feature import VectorAssembler

In [57]:
vec_assembler = VectorAssembler(inputCols=["features", "count_time"], outputCol="result_features")
view_df = vec_assembler.transform(view_df)

In [18]:
age_data = age_data.join(view_df, on="uid", how="inner")
age_data = age_data.select("features", "age")
age_data.show(5)

+--------------------+---+
|            features|age|
+--------------------+---+
|[9.36918946316104...|  4|
|[11.2430273557932...|  0|
|[0.0,0.0,0.0,0.0,...|  1|
|[0.0,0.0,0.0,2.02...|  4|
|[0.0,0.0,0.0,0.0,...|  4|
+--------------------+---+
only showing top 5 rows



In [25]:
age_data.count()

36138

In [19]:
gender_data = gender_data.join(view_df, on="uid", how="inner")
gender_data = gender_data.select("features", "gender")
gender_data.show(5)

+--------------------+------+
|            features|gender|
+--------------------+------+
|[9.36918946316104...|     0|
|[11.2430273557932...|     0|
|[0.0,0.0,0.0,0.0,...|     0|
|[0.0,0.0,0.0,2.02...|     0|
|[0.0,0.0,0.0,0.0,...|     1|
+--------------------+------+
only showing top 5 rows



In [27]:
gender_data.count()

36138

In [28]:
gender_data.dtypes

[('result_features', 'vector'), ('gender', 'int')]

In [45]:
gender_data.groupby("gender").count().collect()

[Row(gender=1, count=18698), Row(gender=0, count=17440)]

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel

GBTClassifier для пола

In [22]:
gbt_gender = GBTClassifier(featuresCol='features', labelCol='gender', maxDepth=3, maxIter=25, stepSize=0.1)
evaluator_gender = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="gender", metricName='accuracy')

In [23]:
model_gbt_gender = gbt_gender.fit(gender_data)
predictions_train_gender = model_gbt_gender.transform(gender_data)
evaluator_gender.evaluate(predictions_train_gender)

0.6041562897780729

In [99]:
idf_model_view.save('idf_model_750')
model_gbt_gender.save('gbt_gender_750')

RandomForestClassifier для возраста

In [105]:
rf_age = RandomForestClassifier(featuresCol='features', labelCol='age', maxDepth=7, numTrees=30)
evaluator_age = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="age", metricName='accuracy')

In [106]:
model_rf_age = rf_age.fit(age_data)
predictions_train_age = model_rf_age.transform(age_data)
evaluator_age.evaluate(predictions_train_age)

0.460125076097183

In [107]:
model_rf_age.save('rf_age_750')

In [84]:
from pyspark.sql.functions import *

In [85]:
schema_kafka = StructType(fields=[StructField("uid", StringType()),
                                StructField("visits", StringType())])

In [86]:
def parse_json(string):
    data = json.loads(string)
    s =''
    for i in data:
        s = ' '.join([s, i['url']])
    return str(s)
parsing = F.udf(parse_json, StringType())

In [87]:
reverse_gender_dict = {'0': 'F', '1': 'M'}
reverse_age_dict = {'0': '18-24', '1': '25-34', '2': '35-44', '3': '45-54', '4': '>=55'}

In [88]:
rf_age_load = RandomForestClassificationModel.load('rf_age_750')
gbt_gender_load = GBTClassificationModel.load('gbt_gender_750')

In [113]:
def write_kafka(topic, data):
    write_kafka_params = {"kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667'}
    kafka_doc = to_json(struct(col("*")))
    data.select(kafka_doc.alias("value"))\
        .withColumn("topic", lit(topic))\
        .write.format("kafka")\
        .options(**write_kafka_params).save()
    

In [94]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    #"subscribe": "test_topic0, test_topic1, test_topic2"
    "subscribe": "input_kirill.kuznetsov",
    #"startingOffsets": "latest"
}
kafka_sdf = spark.readStream.format("kafka").options(**read_kafka_params).load()

kafka_df = kafka_sdf.select(F.col("value").cast("string"))
kafka_df = kafka_df.select(from_json(col("value").cast("string"), schema_kafka).alias("s")).select(col("s.*"))
kafka_df = kafka_df.withColumn('url', parsing('visits')).select('uid', 'url')
kafka_df = kafka_df.withColumn("tokens", tokenization("url"))
kafka_df = hashingTF.transform(kafka_df)
kafka_df = idf_model_view.transform(kafka_df)
kafka_df = kafka_df.withColumn("features", convert_dense("features"))
kafka_df = rf_age_load.transform(kafka_df).withColumn('age', F.col('prediction')).select('uid', 'features', 'age')
kafka_df = kafka_df.withColumn('age', F.col('age').cast(IntegerType()).cast(StringType()))
kafka_df = kafka_df.replace(reverse_age_dict, subset=["age"])
kafka_df = gbt_gender_load.transform(kafka_df).withColumn('gender', F.col('prediction')).select('uid', 'gender', 'age')
kafka_df = kafka_df.withColumn('gender', F.col('gender').cast(IntegerType()).cast(StringType()))
kafka_df = kafka_df.replace(reverse_gender_dict, subset=["gender"])

kafka_doc = to_json(struct(col("*")))
kafka_df = kafka_df.select(kafka_doc.alias('value'))

#kafka_df = kafka_df.withColumn('value', convert2str('uid', 'gender', 'age')).select('value')

write_kafka_params = {
   "kafka.bootstrap.servers": 'spark-node-1.newprolab.com:6667',
   "topic": "kirill.kuznetsov"
}

sink = kafka_df.writeStream.format("kafka").options(**write_kafka_params)\
    .option("checkpointLocation", "streaming/chk/chk_kafka")\
    .outputMode("append")


In [105]:
sq = sink.start()

In [109]:
sq.isActive

False

In [120]:
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    #"subscribe": "test_topic0, test_topic1, test_topic2"
    "subscribe": "input_kirill.kuznetsov",
    #"startingOffsets": "latest"
}

kafka_sdf = spark.read.format("kafka").options(**read_kafka_params).load()

kafka_df = kafka_sdf.select(F.col("value").cast("string"))
kafka_df = kafka_df.select(from_json(col("value").cast("string"), schema_kafka).alias("s")).select(col("s.*"))
kafka_df = kafka_df.withColumn('url', parsing('visits')).select('uid', 'url')
kafka_df = kafka_df.withColumn("tokens", tokenization("url"))
kafka_df = hashingTF.transform(kafka_df)
kafka_df = idf_model_view.transform(kafka_df)
kafka_df = kafka_df.withColumn("features", convert_dense("features"))
kafka_df = rf_age_load.transform(kafka_df).withColumn('age', F.col('prediction')).select('uid', 'features', 'age')
kafka_df = kafka_df.withColumn('age', F.col('age').cast(IntegerType()).cast(StringType()))
kafka_df = kafka_df.replace(reverse_age_dict, subset=["age"])
kafka_df = gbt_gender_load.transform(kafka_df).withColumn('gender', F.col('prediction')).select('uid', 'gender', 'age')
kafka_df = kafka_df.withColumn('gender', F.col('gender').cast(IntegerType()).cast(StringType()))
kafka_df = kafka_df.replace(reverse_gender_dict, subset=["gender"])
kafka_doc = to_json(struct(col("*")))
kafka_df = kafka_df.select(kafka_doc.alias('value'))
kafka_df.show(5000)
#kafka_df = kafka_df.withColumn('value', col('age'))
#kafka_df = kafka_df.withColumn('res', convert2str('uid', 'gender', 'age'))
write_kafka("kirill.kuznetsov", kafka_df)

KeyboardInterrupt: 

In [25]:
sq.stop()

NameError: name 'sq' is not defined

In [26]:
spark.stop()