In [25]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 2 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [26]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import Row
import json

conf = SparkConf()

spark = (SparkSession
         .builder
         .config(conf=conf)
         .appName("test")
         .getOrCreate())

In [27]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.ml.classification import LogisticRegression
import json
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
from pyspark.ml.feature import CountVectorizer, StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import LongType


def parse_json(array_str):
    json_obj = json.loads(array_str)
    for item in json_obj["visits"]:
        yield (item["url"], item["timestamp"])

In [28]:
df = spark.read.option("delimiter", "\t").csv("/labs/slaba04/gender_age_dataset.txt", header = True)

In [29]:
df.show(5)

+------+-----+--------------------+--------------------+
|gender|  age|                 uid|           user_json|
+------+-----+--------------------+--------------------+
|     F|18-24|d50192e5-c44e-4ae...|{"visits": [{"url...|
|     M|25-34|d502331d-621e-472...|{"visits": [{"url...|
|     F|25-34|d50237ea-747e-48a...|{"visits": [{"url...|
|     F|25-34|d502f29f-d57a-46b...|{"visits": [{"url...|
|     M| >=55|d503c3b2-a0c2-4f4...|{"visits": [{"url...|
+------+-----+--------------------+--------------------+
only showing top 5 rows



In [30]:
json_schema = ArrayType(StructType([StructField('url', StringType(), nullable=False), StructField('timestamp', StringType(), nullable=False)]))

udf_parse_json = udf(lambda str: parse_json(str), json_schema)

In [31]:
df = df.withColumn("visits", udf_parse_json(F.col("user_json")))
df = df.select(F.col("gender"), F.col("age"), F.col("uid"), F.col("visits"))
df = df.withColumn("visits", F.explode(F.col("visits")))
df = df.withColumn("url", F.col("visits.url"))
df = df.drop(F.col("visits"))

df = df.withColumn("url", F.lower(F.expr("parse_url(url, 'HOST')"))).withColumn("url", F.regexp_replace(F.col("url"), "www.", "")).withColumn("url", F.regexp_replace(F.col("url"), "[.]", "-"))
df = df.filter(F.col("url").isNotNull())
df = df.withColumn("gender_age", F.concat(F.col("gender"), F.lit(":"), F.col("age")))

df = df.groupBy(F.col("gender_age"), F.col("uid")).agg(F.collect_list(F.col("url")).alias("domains"))

df = df.filter(df.gender_age != F.lit("-:-"))

In [32]:
cv = CountVectorizer(inputCol="domains", outputCol="features")

indexer = StringIndexer(inputCol="gender_age", outputCol="label")
labels = indexer.fit(df).labels

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20, 
    maxBins=32, 
    maxDepth=5,
    seed=37)

converter = IndexToString(inputCol="prediction", labels=labels, outputCol="res")

pipeline = Pipeline(stages=[cv, indexer, rf, converter])

In [33]:
model = pipeline.fit(df)
model.write().overwrite().save("Lab_4_model")

In [45]:
# подключение к топику на чтение данных
read_kafka_params = {
    "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
    "subscribe": "input_aleksandr.balandin",
    "startingOffsets": "latest"
}
dfInput = spark.readStream.format("kafka").options(**read_kafka_params).load()
query.awaitTermination()

KeyboardInterrupt: 

In [38]:
df = dfInput.selectExpr("CAST(value AS STRING)")
schema = StructType([
  StructField("uid", StringType(), True),
  StructField("visits", StringType(), True),
])

In [39]:
df = df.withColumn("jsonData", F.from_json(F.col("value"), schema)).select("jsonData.*")
df = df.withColumn("visits", F.concat(F.lit("{\"visits\": "), F.col("visits"), F.lit("}")))

json_schema = ArrayType(StructType([StructField('url', StringType(), nullable=False), StructField('timestamp', StringType(), nullable=False)]))

udf_parse_json = udf(lambda str: parse_json(str), json_schema)

In [40]:
df = df.withColumn("visits", udf_parse_json(F.col("visits")))

df = df.withColumn("visits", F.explode(F.col("visits")))
df = df.withColumn("url", F.col("visits.url"))
df = df.drop(F.col("visits"))


df = df.withColumn("url", F.lower(F.expr("parse_url(url, 'HOST')"))).withColumn("url", F.regexp_replace(F.col("url"), "www.", "")).withColumn("url", F.regexp_replace(F.col("url"), "[.]", "-"))

df = df.groupBy(F.col("uid")).agg(F.collect_list(F.col("url")).alias("domains"))

In [41]:
model = PipelineModel.load("Lab_4_model")
df = model.transform(df)
df = df.select(F.col("uid"), F.col("res").alias("gender_age"))

In [42]:
split_col = F.split(F.col("gender_age"), ':')
df = df.withColumn('gender', split_col.getItem(0))
df = df.withColumn('age', split_col.getItem(1))
df = df.select(F.col("uid"), F.col("gender"), F.col("age"))

In [44]:

write_kafka_params = {
   "kafka.bootstrap.servers": 'spark-master-1.newprolab.com:6667',
   "topic": "aleksandr.balandin"
}
query = df.selectExpr("CAST(uid AS STRING) AS key", "to_json(struct(*)) AS value")\
    .writeStream\
    .format("kafka").options(**write_kafka_params)\
    .option("checkpointLocation", "streaming/chk/chk_kafka")\
    .option("maxOffsetsPerTrigger", 200)\
    .outputMode("update").start()

query.awaitTermination()

Py4JJavaError: An error occurred while calling o3118.start.
: java.lang.IllegalStateException: Cannot start query with id 238c58ea-cc4f-4593-a7fa-f9754d920013 as another query with same id is already active. Perhaps you are attempting to restart a query from checkpoint that is already active.
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:345)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


In [46]:
sc.stop()