#CS 5540 Project Phase 2 - Design and implement your ideas using Apache Spark
##Install Spark (Steps explained in detail in other notebook - Aparche_Ye.ipynb)

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

In [3]:
!tar xf spark-3.3.0-bin-hadoop3.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [6]:
import findspark

findspark.init()
findspark.find()

'/content/spark-3.3.0-bin-hadoop3'

In [7]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
conf = SparkConf().setAppName("SparkWordCount").setMaster('local')
import findspark
sc = SparkContext(conf=conf)

In [8]:
!pip install pyarrow==8.0.

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyarrow==8.0.
  Downloading pyarrow-8.0.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (29.3 MB)
[K     |████████████████████████████████| 29.3 MB 55.5 MB/s 
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 6.0.1
    Uninstalling pyarrow-6.0.1:
      Successfully uninstalled pyarrow-6.0.1
Successfully installed pyarrow-8.0.0


In [9]:
from pyspark.sql import SQLContext
import pyarrow

conf = SparkConf().set("spark.sql.execution.arrow.pyspark.enabled", "true").setMaster('local')
sqlContext = SQLContext(sc)



##After the successful installation of Spark, read the csv file containing tweets

###Loading data into PySpark

In [10]:
#store the tweets in spark df
df = sqlContext.read.csv("tweets_10k.csv", header='True', sep=',',inferSchema=False, multiLine=True)
df.show()

+---+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|_c0|               tweet|          created_at|            hashtags|                urls|             source|                user|
+---+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|  0|RT @JoeBiden: The...|2022-07-15 20:42:...|                  []|                  []|   Twitter for iPad|User(_api=<tweepy...|
|  1|@Jacob_Rees_Mogg ...|2022-07-15 20:42:...|                  []|                  []|    Twitter Web App|User(_api=<tweepy...|
|  2|RT @Sbh08Mae: @Re...|2022-07-15 20:42:...|[{'text': 'WA08',...|                  []|    Twitter Web App|User(_api=<tweepy...|
|  3|RT @JoeBiden: The...|2022-07-15 20:42:...|                  []|                  []|   Twitter for iPad|User(_api=<tweepy...|
|  4|@RussianEmbassy @...|2022-07-15 20:42:...|                  []|               

In [11]:
#schema of the datafarame
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- source: string (nullable = true)
 |-- user: string (nullable = true)



#Analytic Queries:

##Query 1: Get the number tweets tweeeted every minute from the collected data

Step 1: Extract the 'created_at' column taht containe time stamp.

In [12]:
#extract created_at column to a new dates spark datafarame
dates = df.select("created_at")
dates.show()

+--------------------+
|          created_at|
+--------------------+
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:42:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
|2022-07-15 20:41:...|
+--------------------+
only showing top 20 rows



Step 2: Format the 'created_at' column to get only hour and minute part of the time stamp

In [13]:
from pyspark.sql.functions import date_format

#use spark ql function date_format to extract the time part from the timestamp
q = df.withColumn('created_at', date_format('created_at', 'HH:mm'))
dates = q.select("created_at")
dates.show()

+----------+
|created_at|
+----------+
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:42|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
|     20:41|
+----------+
only showing top 20 rows



Step 3: Find the count of tweets tweeted every minute 

In [14]:
from pyspark.sql.functions import countDistinct

#drop all the null values
dates = dates.na.drop()

#get the count of tweets at every minute
distinct_count = dates.groupBy("created_at").count()
distinct_count.show()

+----------+-----+
|created_at|count|
+----------+-----+
|     18:47|   20|
|     17:58|   21|
|     19:03|   28|
|     18:05|   11|
|     17:35|   36|
|     17:23|   35|
|     16:29|   10|
|     12:41|   14|
|     12:26|   26|
|     13:56|   10|
|     13:49|   14|
|     13:13|    6|
|     18:12|   19|
|     18:09|   27|
|     17:14|    9|
|     16:13|   12|
|     19:19|   16|
|     18:32|   19|
|     18:13|   19|
|     14:47|   10|
+----------+-----+
only showing top 20 rows



##Query 2: Get the time at which maximum number of tweets were tweeted/retweeted or the most popular time.

The tweets were collected on July 15th 2022. The most popular time looks to be 17:17 or 5:17PM where in that one minute 88 tweets were collected.

In [15]:
dates.groupBy("created_at").count().orderBy("count", ascending=False).show(1)

+----------+-----+
|created_at|count|
+----------+-----+
|     17:17|   88|
+----------+-----+
only showing top 1 row



##Query 3: Get the time at which minimum number of tweets were tweeted/retweeted or find the least popular time.

The tweets were collected on July 15th 2022. The least popular time looks to be 12:13 PM where in that one minute only 1 tweet was collected.

In [16]:
dates.groupBy("created_at").count().orderBy("count", ascending=True).show(1)

+----------+-----+
|created_at|count|
+----------+-----+
|     12:13|    1|
+----------+-----+
only showing top 1 row



##Query 4: Get the range of time the tweets were collected

The tweets were collected from 12:02 to 20:42 (8:42PM)

In [17]:
from pyspark.sql.functions import col, max as max_, min as min_

(dates.withColumn("created_at", col("created_at"))
    .agg(min_("created_at"))).show()

+---------------+
|min(created_at)|
+---------------+
|          12:02|
+---------------+



In [18]:
(dates.withColumn("created_at", col("created_at"))
    .agg(max_("created_at"))).show()

+---------------+
|max(created_at)|
+---------------+
|          20:42|
+---------------+



##Query 5: Number of retweets to Joe Biden every hour

As seen below, 17:00 to 18:00 or 5PM-6PM is the time frame when most people retweeted to 'Joe Biden'. It is also interesting to note that 5:17PM which is in the time frame 5-6PM was the minute with maximum tweets in our collected data.

2-3PM is the time frame when there least number of retweets to Joe Biden.

In [24]:
tweet_and_time = df.select("created_at", "tweet")
joe_data = tweet_and_time.filter(col("tweet").startswith("RT @JoeBiden"))
joe_data=joe_data.withColumn('created_at', date_format('created_at', 'HH')).groupBy("created_at").count().orderBy("count", ascending=False).show()

+----------+-----+
|created_at|count|
+----------+-----+
|        17| 1050|
|        18|  503|
|        19|  402|
|        20|  274|
+----------+-----+



##Query 6: Sentiment analysis of tweets
Polarity is float which lies in the range of [-1,1] where 1 means positive statement and -1 means a negative statement. Subjective sentences generally refer to personal opinion, emotion or judgment whereas objective refers to factual information. Subjectivity is also a float which lies in the range of [0,1].

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

def preprocessing(lines):
    words = lines.select(explode(split(lines.tweet, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

    # text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

words = preprocessing(df.select('tweet'))
    # text classification to define polarity and subjectivity
words = text_classification(words)
words = words.repartition(1)
words.show()

+--------------------+--------------------+-------------------+
|                word|            polarity|       subjectivity|
+--------------------+--------------------+-------------------+
|  The price of oi...|-0.00444444444444...| 0.5311111111111111|
| I believe the pr...|0.044387755102040814| 0.3086734693877551|
|   knows gas &amp...|                 0.0|                0.0|
|  The price of oi...|-0.00444444444444...| 0.5311111111111111|
|           EU is ...|-0.13888888888888887| 0.2222222222222222|
|  That is an 8.5%...|               -0.03|0.29666666666666663|
|  The price of oi...|-0.00444444444444...| 0.5311111111111111|
|The price of oil ...|-0.01203703703703...| 0.4509259259259259|
|  Check our anime...|                 0.0|                0.0|
|  ⚡️Brain 207 LIS...|                 0.0|                0.0|
|  The price of oi...|-0.00444444444444...| 0.5311111111111111|
|  To solve all pr...|                 0.0|                0.0|
|  Today is day 31...|                 0

###Let's consider tweets with polarity >=0.5 to be positive and tweets with polarity value <=-0.5 to be negative

Positive tweets

In [21]:
words.select("word").filter(col("polarity") >= 0.5).show()

+--------------------+
|                word|
+--------------------+
| Republican strat...|
| Gas price July 1...|
|Welcome to the Wo...|
| Oh no....if they...|
|   I had a gas st...|
|"Biden ""We'll se...|
|   If that NYC pu...|
|" If that NYC pub...|
| He should sell l...|
|  Sure, and Biden...|
|US Presidents are...|
|I don't see many ...|
|Oh  , done good n...|
| 😂 sana si leni ...|
|   The price of g...|
|  ❌Extracting mor...|
| Just don't tell ...|
|  Good to see gas...|
|Rep. Emanuel Clea...|
|  Good to see gas...|
+--------------------+
only showing top 20 rows



Negative tweets

In [22]:
words.select("word").filter(col("polarity") <= -0.5).show()

+--------------------+
|                word|
+--------------------+
| Election Day 202...|
|  Gas buddy price...|
| Was this played ...|
|"BIDENS WORRIED A...|
|I will be holding...|
| You made the inf...|
|  We could have t...|
| Price controls f...|
|     And even tho...|
| Unhappy with Bid...|
|  While oil + gas...|
| Videos policies ...|
| THE BROWN STAIN ...|
|  We could have t...|
|"""It takes time ...|
| Then have those ...|
| Gas is too expen...|
| Time to come hom...|
| You are just afr...|
|It’s actually dis...|
+--------------------+
only showing top 20 rows



###Let's export the positve and negative tweets to text files

Some tweets considered positive have sarcastic tone and machine is not abe to detect sarcasm.

In [23]:
import pandas as pd

words.select("word").filter(col("polarity") >= 0.5).toPandas().to_csv('positive_tweets')
words.select("word").filter(col("polarity") <= -0.5).toPandas().to_csv('negative_tweets')