In [1]:
!pip3 install pyspark wordcloud matplotlib numpy textblob

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

In [3]:
import pyspark
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover


In [14]:
spark = SparkSession \
        .builder \
        .master("spark://192.168.56.110:7077") \
        .config("spark.sql.streaming.schemaInference", True) \
        .appName("Sentiment Analysis Spark") \
        .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

# To always shoe the results of Dataframes and improve the formatting output
spark.conf.set("spark.sql.repl.eagerVal.enabled", True)

In [15]:
tweets = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.56.110:9092") \
    .option("subscribe", "twitter-data2") \
    .option("startingOffsets", "earliest") \
    .option("includeHeaders", "true") \
    .load()

tweets.printSchema()
# lines = tweets.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")\
#     .select(col("value").alias("text"))

# lines.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)



In [16]:
from textblob import TextBlob
from deep_translator import GoogleTranslator

def preprocessing(tweets):
    df = tweets.select(col("value").alias("text"))
    df = df.na.replace('', None)
    df = df.na.drop()
    df = df.withColumn('text', regexp_replace('text', r'http\S+', ''))
    df = df.withColumn('text', regexp_replace('text', '#', ''))
    df = df.withColumn('text', regexp_replace('text', 'RT', ''))
    df = df.withColumn('text', regexp_replace('text', ':', ''))
                       
    return df
def sentiment_analysis(text):
#     translated = GoogleTranslator(source='auto', target='en').translate(text)
    analysis = TextBlob(text).sentiment.polarity
    if analysis > 0:
        return 1
    else:               
        return 0 
                             
def text_classification(df):
    sentiment_analysis_udf = udf(sentiment_analysis, StringType())
    df = df.withColumn("label", sentiment_analysis_udf("text"))
                       
    return df

In [17]:
def forEachBatch(df, df_id):
    ## Training Start Here
    df = preprocessing(df)
    df = text_classification(df)
    
    data = df.withColumn("label", col("label").cast(IntegerType()))
    
    divided_data = data.randomSplit([0.7, 0.3])
    training_data = divided_data[0].na.drop()
    testing_data = divided_data[1].na.drop()

    train_rows = training_data.count()
    test_rows = testing_data.count()
    
    tokenizer = Tokenizer(inputCol="text", outputCol="SentimentWords")
    tokenized_train = tokenizer.transform(training_data)
#     tokenized_train.show(truncate=False, n=5)
                         
    swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                        outputCol="MeaningfulWords")
    sw_removed_train = swr.transform(tokenized_train)
#     sw_removed_train.show(truncate=False, n=5)
                         
    hash_tf = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
    numeric_train_data = hash_tf.transform(sw_removed_train).select("label","MeaningfulWords", "features")
#     numeric_train_data.show(truncate=False, n=5)
                         
                         
    lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.01)
    model = lr.fit(numeric_train_data)
    print("Training is done!")

    tokenized_test = tokenizer.transform(testing_data)
    sw_removed_test = swr.transform(tokenized_test)
    numeric_test = hash_tf.transform(sw_removed_test).select(
        'label','MeaningfulWords', 'features' 
    )

    prediction = model.transform(numeric_test)
    prediction_final = prediction.select("MeaningfulWords", "prediction", "label")
#     prediction_final.show(truncate=False, n=4)

    correct_prediction = prediction_final.filter(
        prediction_final['prediction'] == prediction_final['label']).count()
    total_data = prediction_final.count()

    print("correct prediction:", correct_prediction, "total data:", total_data, 
         ", accuracy:", correct_prediction/total_data * 100, "%")

In [18]:
# Read the data stream. 
try:
    query_stream_memory = tweets \
        .writeStream \
        .foreachBatch(forEachBatch) \
        .trigger(processingTime="10 seconds") \
        .outputMode("append") \
        .start()
    
except KeyboardInterrupt:
    print("Finalizando streaming...")

22/09/19 01:23:08 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4efd00cf-fb15-4741-8cc8-2c40dfda0ae5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

Training is done!


[Stage 38:>                                                         (0 + 1) / 1]                                                                                

correct prediction: 6 total data: 8 , accuracy: 75.0 %


22/09/19 01:23:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 12129 milliseconds
