In [1]:
import os 

kafka = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
mongo = "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0"

os.environ["PYSPARK_PYTHON"]="python3.7"
os.environ["PYSPARK_DRIVER_PYTHON"]="python3.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = ("--packages {0},{1} pyspark-shell".format(kafka, mongo))

In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

KAFKA_BROKER = "kafka:9092"
KAFKA_TOPIC = "fixcer"

spark = SparkSession \
        .builder \
        .master("spark://spark-master:7077") \
        .appName("Spark") \
        .config("spark.mongodb.input.uri", "mongodb://172.16.0.11:27017/bigdata.apps") \
        .config("spark.mongodb.output.uri", "mongodb://172.16.0.11:27017/bigdata.application") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
        .getOrCreate()

In [3]:
df = spark.read.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BROKER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss", False) \
        .option("maxOffsetsPerTrigger", 1000) \
        .load()

In [4]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
schema =  StructType([
    StructField("androidVersion", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("comments", ArrayType(StringType()), True),
    StructField("contentRating", StringType(), True),
    StructField("currentVersion", IntegerType(), True),
    StructField("installs", LongType(), True),
    StructField("lastUpdate", LongType(), True),
    StructField("price", DoubleType(), True),
    StructField("ratings", LongType(), True),
    StructField("reviews", LongType(), True),
    StructField("score", DoubleType(), True),
    StructField("size", IntegerType(), True),
    StructField("title", StringType(), True)
]) 

In [6]:
df = df.selectExpr("CAST(value AS STRING) as json") \
        .withColumn('json', from_json(col('json'), schema)) \
        .select('json.*')

In [7]:
df.printSchema()

root
 |-- androidVersion: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- contentRating: string (nullable = true)
 |-- currentVersion: integer (nullable = true)
 |-- installs: long (nullable = true)
 |-- lastUpdate: long (nullable = true)
 |-- price: double (nullable = true)
 |-- ratings: long (nullable = true)
 |-- reviews: long (nullable = true)
 |-- score: double (nullable = true)
 |-- size: integer (nullable = true)
 |-- title: string (nullable = true)



In [8]:
from textblob import TextBlob
from pyspark.ml import PipelineModel

model = PipelineModel.load("hdfs://namenode/user/root/model")

In [9]:
schema = StructType([StructField('comment', StringType(), True)])

In [10]:
def to_sentiment(comments):
    positive, negative = 0, 0
    
    for comment in comments:
        try:
            sentiment = TextBlob(comment).polarity
            if sentiment >= 0:
                positive += 1
            else:
                negative += 1
        except:
            negative += 1
            
    return [positive, negative]

convert = udf(to_sentiment, ArrayType(IntegerType()))

In [11]:
df = df.withColumn("comments", convert(col("comments")))

In [12]:
df = df.withColumn('positive', df.comments[0]).withColumn('negative', df.comments[1])

In [13]:
df = df.drop('comments')

In [14]:
df = df.filter(df.positive*df.negative > 0)

In [15]:
df.printSchema()

root
 |-- androidVersion: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- contentRating: string (nullable = true)
 |-- currentVersion: integer (nullable = true)
 |-- installs: long (nullable = true)
 |-- lastUpdate: long (nullable = true)
 |-- price: double (nullable = true)
 |-- ratings: long (nullable = true)
 |-- reviews: long (nullable = true)
 |-- score: double (nullable = true)
 |-- size: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- positive: integer (nullable = true)
 |-- negative: integer (nullable = true)



In [16]:
df.write.format("mongo").mode("append").save()

In [17]:
spark.stop()