In [None]:
spark.stop()

In [None]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master("local[2]") \
                    .appName("spark-course") \
                    .config("spark.driver.memory", "1024m") \
                    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5") \
                    .getOrCreate()
spark

In [None]:
from pyspark.ml import Transformer
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasOutputCol, HasInputCol
from pyspark.ml.feature import CountVectorizer

from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, StringType, ArrayType, MapType, IntegerType

from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Summarizer

import re

In [None]:
!hdfs dfs -ls -h /labs/slaba04/

In [None]:
dt = spark.read.csv('/labs/slaba04/gender_age_dataset.txt', sep = '\t', header = True)

In [None]:
dt.show(5)

In [None]:
schema = schema_of_json('{"visits": [{"url": "http://zebra-zoya.ru/200028-chehol-organayzer-dlja-macbook-11-grid-it.html?utm_campaign=397720794&utm_content=397729344&utm_medium=cpc&utm_source=begun", "timestamp": 1419688144068}, {"url": "http://news.yandex.ru/yandsearch?cl4url=chezasite.com/htc/htc-one-m9-delay-86327.html&lr=213&rpt=story", "timestamp": 1426666298001}, {"url": "http://www.sotovik.ru/news/240283-htc-one-m9-zaderzhivaetsja.html", "timestamp": 1426666298000}, {"url": "http://news.yandex.ru/yandsearch?cl4url=chezasite.com/htc/htc-one-m9-delay-86327.html&lr=213&rpt=story", "timestamp": 1426661722001}, {"url": "http://www.sotovik.ru/news/240283-htc-one-m9-zaderzhivaetsja.html", "timestamp": 1426661722000}]}')

In [None]:
df = dt.select(from_json(col("user_json").cast("string"), schema = schema).alias("s"), 'gender', 'age', 'uid') \
    .select(col("s.*"), 'gender', 'age', 'uid')
df.show(2, 20)

In [None]:
@udf(ArrayType(StringType()))
def get_url_array(x):
    y = []
    for i in x:
        g = re.search(r'(?<=http://)[\w\.-]+|(?<=https://)[\w\.-]+', i[1])
        if g:
            url = g.group(0)
            y.append(url)
    return y

df2 = df.withColumn('url_array', get_url_array(df.visits))
df2.show(3)

In [None]:
#топ сайтов
gdf_ = df2.select('uid', explode(df2.url_array).alias('url')).distinct().groupBy('url').agg(count('url').alias('cnt'))
#gdf_.show(3)
gdf2_ = gdf_.repartition(1)
gdf3_ = gdf2_.orderBy('cnt', ascending=False).limit(500)
gdf3_cache = gdf3_.cache()
gdf3_cache.show(5)

In [None]:
df3 = df2.select('uid', 'age', 'gender', explode(df2.url_array).alias('url')) \
      .join(gdf3_cache, on='url', how='inner') \
      .groupBy('uid', 'age', 'gender') \
      .agg(collect_list('url').alias('url'))

df3.show(3)

In [None]:
count_vectorizer = CountVectorizer(inputCol='url', outputCol='url_vector')
count_vectorizer_model = count_vectorizer.fit(df3)
df4 = count_vectorizer_model.transform(df3)

In [None]:
df5 = df4[(df4.age.isin('>=55', '45-54', '35-44', '25-34', '18-24')) & (df4.gender.isin('F', 'M'))] \
    ['gender', 'age', 'url_vector']

df6 = \
(
df5
    .withColumn('age_', when(col('age') == '18-24', 1)
                      .when(col('age') == '25-34', 2)
                      .when(col('age') == '35-44', 3)
                      .when(col('age') == '45-54', 4)
                      .when(col('age') == '>=55', 5)
                      )
    .withColumn('gender_', when(col('gender') == 'F', 1)
                          .when(col('gender') == 'M', 0)
                          )
)
df6_cache = df6.cache()
df6_cache.show(2)

In [None]:
@udf(IntgerType)
def url_num(x):
    return len(x)

df2_1 = df2.withColumn('url_num', url_num(df2.url_array))

In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

model_age = RandomForestClassifier(featuresCol='url_vector', labelCol='age_').fit(df6_cache)
model_gender = GBTClassifier(featuresCol='url_vector', labelCol='gender_').fit(df6_cache)

# Из кафки

In [None]:
#pipeline
def transform_batch(kafka_sdf):
    
    data0 = kafka_sdf.select(
    json_tuple(col("value").cast("string"), "uid", "visits").alias("uid", "user_js"))

    data = data0.select('uid', 'user_js', concat(lit('{"visits": '), col('user_js'), lit('}')) \
            .alias('user_json'))

    data1 = data.select(
        from_json(col("user_json").cast("string"), schema = schema).alias("s"),
        'uid'
    ).select(col("s.*"),'uid')
    
    data2 = data1.withColumn('url', get_url_array(data1.visits))
    
    data5 = count_vectorizer_model.transform(data2)
    
    data6 = model_age.transform(data5).select('uid', 'url_vector', col('prediction').alias('age_'))
    data7 = model_gender.transform(data6).select('uid', 'age_', col('prediction').alias('gender_'))
    data8 = data7\
            .withColumn('age', when(col('age_') == 1, '18-24')
                              .when(col('age_') == 2, '25-34')
                              .when(col('age_') == 3, '35-44')
                              .when(col('age_') == 4, '45-54')
                              .when(col('age_') == 5, '>=55')
                      )\
           .withColumn('gender', when(col('gender_') == 1, 'F')
                                .when(col('gender_') == 0, 'M')
                          ) \
           .select('uid', 'age', 'gender')
    return data8

In [None]:
#чтение из кафка
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_margarita.cherentsova",
    "startingOffsets": "latest"
}
kafka_sdf = spark.readStream.format("kafka").options(**read_kafka_params).load()

#модель
transfoem_kafka = transform_batch(kafka_sdf)

#запись

kafka_doc = to_json(struct(col("*")))
raw = transfoem_kafka.select(kafka_doc.alias("value")) \

#запись в кафка
write_kafka_params = {
   "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
   "topic": "margarita.cherentsova"
}
raw.writeStream.format("kafka").options(**write_kafka_params)\
    .option("checkpointLocation", "streaming/chk/chk_kafka")\
    .outputMode("append").start()

In [None]:
def kill_all():
    streams = SparkSession.builder.getOrCreate().streams.active
    if streams:
        for s in streams:
            desc = s.lastProgress["sources"][0]["description"]
            s.stop()
            print("Running stream {s}".format(s=desc))

In [None]:
kill_all()