# Execution order
1. Turn on main.py in tweepy2Spark
2. Turn on the SocketListener in the same folder.
3. Run all in this file.

### Create SparkSession

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Predicting TweetWorld Emotion") \
    .getOrCreate()

### UDF (Transform from JSON Str with Sigle quotes to Double quotes)

In [2]:
import ast 
import json 
from pyspark.sql.functions import udf

def convert_json_double(json_single):
    json_dict = ast.literal_eval(json_single)
    return json.dumps(json_dict)
    
convert_json_double_udf = udf(lambda x: convert_json_double(x))

### Get the column names from sample tweet.

In [3]:
import pyspark.sql.functions as F

with open("data/tweet.txt", 'r') as f:
    originSingleQuotes = f.readline()
    originDoubleQuotes = convert_json_double(originSingleQuotes)

sc = spark.sparkContext
originRDD = sc.parallelize([originDoubleQuotes])
originDF = spark.read.json(originRDD)

columns = originDF.columns

### Send a connection request to Server Socket.

In [4]:
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

### Transfrom data to analyze and extract data 

In [5]:
from pyspark.sql.functions import json_tuple

jsonDF = socketDF.select(convert_json_double_udf("value").alias("value"))
multiColDF = jsonDF.select(json_tuple("value", *columns)).toDF(*columns)

In [6]:
df = multiColDF.select("created_at", "text")

### Filter in English before estimating emotion

In [7]:
from langdetect import detect

def detect_language(text):
    return detect(text)

detect_language_udf = F.udf(lambda x: detect_language(x))

In [8]:
df = df.select("created_at","text", detect_language_udf("text").alias("lang"))

In [9]:
from pyspark.sql.functions import col
df = df.filter(col("lang") == "en")

### Estimating emotion

In [10]:
from textblob import TextBlob
positive = 2
netural = 1
negotive = 0

def get_sentiment(text):
    sentiment = TextBlob(text).sentiment.polarity
    if sentiment > 0:
        return positive
    elif sentiment < 0:
        return negotive
    else:
        return netural

get_sentiment_utf = F.udf(lambda x: get_sentiment(x))

In [11]:
sentimentDF = df.select("created_at", "text", get_sentiment_utf(col("text")).alias("sentiment_level"))

### Aggregate sentiment_level by time

In [12]:
# Convert created_at's type to datetime 
import datetime
from pyspark.sql.types import TimestampType

def from_created_at(x):
    """
    parsing format : "https://docs.python.org/3/library/datetime.html#datetime.date"
    
    The valuable of 'x' has a form of 'Thu Oct 21 07:02:44 +0000 2021' 
    """
    dt = datetime.datetime.strptime(x, "%a %b %d %H:%M:%S %z %Y")
    return dt.isoformat()

from_created_at_udf = udf(lambda x: from_created_at(x))

df = sentimentDF.select( 
    from_created_at_udf(col("created_at")).cast(TimestampType()).alias("time_interval"), col("sentiment_level")
)

df

DataFrame[time_interval: timestamp, sentiment_level: string]

In [13]:
from pyspark.sql.functions import window
from pyspark.sql.functions import count

windowedBySec = df \
    .withWatermark("time_interval", "0 seconds") \
    .groupBy(
        window(col("time_interval"), "1 seconds"), col("sentiment_level")
    ) \
    .count()

windowedByMin = df \
    .withWatermark("time_interval", "0 seconds") \
    .groupBy(
        window(col("time_interval"), "1 minutes"), col("sentiment_level")
    ) \
    .count()

### Run

In [14]:
import requests
import json

host = '127.0.0.1'
port = 5000
uri = 'update_react_num_per_sec'
new_name = ["positive_num", "neutral_num", "negative_num"]

def foreach_batch_json(df, epoch_id):
    count_by_level = df \
        .groupBy("window") \
        .pivot("sentiment_level", ["0", "1", "2"]) \
        .sum("count") \
        .drop("window") \
        .na.fill(0) \
        .toDF(*new_name)
    
    data_list = count_by_level \
        .toJSON() \
        .collect()
    
    if not data_list:
        return

    for data in data_list:
        requests.post(f'http://{host}:{port}/{uri}', data=json.loads(data))

In [15]:
ExecSec = windowedBySec \
    .writeStream \
    .trigger(once=True) \
    .foreachBatch(foreach_batch_json) \
    .start()

# ExecMin = windowedByMin \
#     .writeStream \
#     .foreachBatch(foreach_batch_json) \
#     .start()

In [16]:
# ExecSec.stop() 