# 0. Data Retrieval and Environment Setup

## Download data from Kaggle

In [None]:
!pip install -q kaggle

Upload Kaggle JSON file before proceeding ahead.

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets list

ref                                                             title                                             size  lastUpdated          downloadCount  voteCount  usabilityRating  
--------------------------------------------------------------  -----------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
thedrcat/daigt-v2-train-dataset                                 DAIGT V2 Train Dataset                            29MB  2023-11-16 01:38:36           1706        174  1.0              
derrekdevon/real-estate-sales-2001-2020                         Real Estate Sales 2001-2020                       28MB  2023-12-07 15:36:26           1055         27  1.0              
muhammadbinimran/housing-price-prediction-data                  Housing Price Prediction Data                    763KB  2023-11-21 17:56:32           7543        137  1.0              
jocelyndumlao/cardiovascular-disease-dataset                    Cardiovascu

In [None]:
!kaggle datasets download -d kazanova/sentiment140

Downloading sentiment140.zip to /content
 95% 77.0M/80.9M [00:01<00:00, 70.0MB/s]
100% 80.9M/80.9M [00:01<00:00, 61.5MB/s]


In [None]:
! mkdir train
! unzip sentiment140.zip -d train

Archive:  sentiment140.zip
  inflating: train/training.1600000.processed.noemoticon.csv  


## Set up PySpark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

# 1. EDA

## Read the csv file

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
df = spark.read.csv("train/training.1600000.processed.noemoticon.csv", inferSchema=True, encoding='ISO-8859-1')
df.printSchema()
df.show(5)

root
 |-- _c0: integer (nullable = true)
 |-- _c1: long (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



## Data Preprocessing

In [None]:
df.filter(df['_c3'] != "NO_QUERY").show()

+---+---+---+---+---+---+
|_c0|_c1|_c2|_c3|_c4|_c5|
+---+---+---+---+---+---+
+---+---+---+---+---+---+



There is no information in _c3 column. Therefore, delete it.

In [None]:
# drop _c3 column
df = df.drop('_c3')

# rename the columns
df = df.withColumnRenamed('_c0', 'target') \
       .withColumnRenamed('_c1', 'tweet_id') \
       .withColumnRenamed('_c2', 'timestamp') \
       .withColumnRenamed('_c4', 'user') \
       .withColumnRenamed('_c5', 'text')

df.show(5, truncate = False)

+------+----------+----------------------------+---------------+-------------------------------------------------------------------------------------------------------------------+
|target|tweet_id  |timestamp                   |user           |text                                                                                                               |
+------+----------+----------------------------+---------------+-------------------------------------------------------------------------------------------------------------------+
|0     |1467810369|Mon Apr 06 22:19:45 PDT 2009|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
|0     |1467810672|Mon Apr 06 22:19:49 PDT 2009|scotthamilton  |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!    |
|0     |1467810917|Mon Apr 06 22:19:53 PDT 2009|mattycus       |@Kenichan I dived many times fo

In [None]:
# The timezone name cannot be dealt with directly
# Explore the distribution of timezone

@udf
def time_zone(s):
  lst = s.split()
  return lst[-2]

df.groupby(time_zone('timestamp')).count().show()

+--------------------+-------+
|time_zone(timestamp)|  count|
+--------------------+-------+
|                 PDT|1600000|
+--------------------+-------+



All the tweets were recorded in the PDT time zone. Therefore, we can simply extract the time part without manipulating the time differences.

In [None]:
# convert string to date
from time import strftime, strptime

@udf
def udf_to_date(s):
  lst = s.split()
  s = " ".join(lst[1:4]+[lst[-1]])
  return strftime("%Y-%m-%d %H:%M:%S", strptime(s, "%b %d %H:%M:%S %Y"))

df = df.withColumn('datetime', to_timestamp(udf_to_date('timestamp')))
df.show(5)

+------+----------+--------------------+---------------+--------------------+-------------------+
|target|  tweet_id|           timestamp|           user|                text|           datetime|
+------+----------+--------------------+---------------+--------------------+-------------------+
|     0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|2009-04-06 22:19:45|
|     0|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|2009-04-06 22:19:49|
|     0|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|2009-04-06 22:19:53|
|     0|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|2009-04-06 22:19:57|
|     0|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|2009-04-06 22:19:57|
+------+----------+--------------------+---------------+--------------------+-------------------+
only showing top 5 rows



## Data Exploration

In [None]:
df.printSchema()

root
 |-- target: integer (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [None]:
# Total tweets
print("Total Tweets: ", sep='')
display(df.count())

Total Tweets: 


1600000

In [None]:
# Tweet distribution across date
df.groupby(year('datetime').alias('year'),
           month('datetime').alias('month'),
           dayofmonth('datetime').alias('day')
           ).count().orderBy('year', 'month', 'day').show(50)

+----+-----+---+------+
|year|month|day| count|
+----+-----+---+------+
|2009|    4|  6|  3360|
|2009|    4|  7| 17311|
|2009|    4| 17|  3084|
|2009|    4| 18| 21754|
|2009|    4| 19| 27469|
|2009|    4| 20| 18460|
|2009|    4| 21|  8587|
|2009|    5|  1|  7716|
|2009|    5|  2| 27434|
|2009|    5|  3| 35333|
|2009|    5|  4| 15481|
|2009|    5|  9| 11739|
|2009|    5| 10| 26029|
|2009|    5| 11|  4186|
|2009|    5| 13|  4066|
|2009|    5| 14| 17460|
|2009|    5| 16|  9146|
|2009|    5| 17| 40154|
|2009|    5| 18| 36469|
|2009|    5| 21|  2132|
|2009|    5| 22| 39074|
|2009|    5| 23|   169|
|2009|    5| 25|   169|
|2009|    5| 26| 10778|
|2009|    5| 27|   841|
|2009|    5| 28| 15903|
|2009|    5| 29| 73827|
|2009|    5| 30|103673|
|2009|    5| 31| 94588|
|2009|    6|  1|110290|
|2009|    6|  2| 64192|
|2009|    6|  3| 41588|
|2009|    6|  4|  7842|
|2009|    6|  5| 58757|
|2009|    6|  6|111676|
|2009|    6|  7| 96350|
|2009|    6| 14|  8272|
|2009|    6| 15|109781|
|2009|    6| 16|

In [None]:
# Total users
print("The number of users:")
df.select(df.user).distinct().count()

The number of users:


659775

In [None]:
# Top 10 active users overview
df.groupby('user').count().orderBy('count', ascending=False).show(10)

+---------------+-----+
|           user|count|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|       tweetpet|  310|
|SallytheShizzle|  281|
|    VioletsCRUK|  279|
|    mcraddictal|  276|
|       tsarnick|  248|
|    what_bugs_u|  246|
|    Karen230683|  238|
|      DarkPiano|  236|
+---------------+-----+
only showing top 10 rows



# 2. NLP Pipeline

In [None]:
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import Transformer

class PreprocessTransformer(Transformer):
  def __init__(self):
      super(PreprocessTransformer, self).__init__()

  def _transform(self, df):
    # Remove URLs
    df = df.withColumn('text_no_url', regexp_replace('text', r'http\S+', ''))
    # Remove user mentions
    df = df.withColumn('clean_text', regexp_replace('text_no_url', r'@\w+', ''))
    return df

class VaderSentimentTransformer(Transformer):
  def __init__(self):
    super(VaderSentimentTransformer, self).__init__()

  def _transform(self, df):
    @udf
    def udf_sentiment_vader(text):
        sia = SentimentIntensityAnalyzer()
        sentiment = sia.polarity_scores(text)
        return sentiment['compound']
    return df.withColumn("sentiment_score", udf_sentiment_vader(df["clean_text"]))

In [None]:
pipeline = Pipeline(stages=[PreprocessTransformer(), VaderSentimentTransformer()])

model = pipeline.fit(df)
result = model.transform(df)

result.show(10, truncate=False)

+------+----------+----------------------------+---------------+-------------------------------------------------------------------------------------------------------------------+-------------------+---------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------+---------------+
|target|tweet_id  |timestamp                   |user           |text                                                                                                               |datetime           |text_no_url                                                                                                    |clean_text                                                                                                     |sentiment_score|
+------+----------+----------------------------+---------------+------------------------------------------------------

# Sentiment Analysis

## Public Sentiment Trend

In [None]:
# Create the result table for SQL
result.select(result.datetime, result.user, result.sentiment_score).createOrReplaceTempView("result")

In [None]:
# Public sentiment score over hours of day
spark.sql(" \
SELECT \
  HOUR(datetime) AS hour \
  , ROUND(AVG(sentiment_score), 3) AS avg_sentiment_score \
FROM result \
GROUP BY 1 \
ORDER BY 1;"\
          ).show(24)

+----+-------------------+
|hour|avg_sentiment_score|
+----+-------------------+
|   0|              0.165|
|   1|              0.172|
|   2|              0.171|
|   3|              0.162|
|   4|              0.154|
|   5|              0.145|
|   6|              0.149|
|   7|              0.145|
|   8|              0.134|
|   9|               0.13|
|  10|              0.133|
|  11|              0.138|
|  12|              0.123|
|  13|              0.121|
|  14|              0.128|
|  15|              0.117|
|  16|              0.113|
|  17|              0.116|
|  18|              0.121|
|  19|              0.126|
|  20|              0.132|
|  21|              0.134|
|  22|              0.145|
|  23|              0.153|
+----+-------------------+



According to the sentiment score distribution acroos hours, the public sentiment seems to be low in tweets posted in the afternoon and high in the midnight.  <br><br>It may be because afternoon is usually working/school hours when posting tweets or checking social media isn't convenient/allowed. It means tweets that contain relatively more emotions are posted in this time slot regardless of the inconvenience. And people are more likely to post when it comes to complaints.

In [None]:
# Tweet distribution across hour
df.groupby(hour('datetime').alias('hour')).count().orderBy('hour').show(24)

+----+-----+
|hour|count|
+----+-----+
|   0|80865|
|   1|75268|
|   2|73991|
|   3|74253|
|   4|76995|
|   5|78623|
|   6|80852|
|   7|83654|
|   8|76287|
|   9|67278|
|  10|60689|
|  11|61009|
|  12|51653|
|  13|49689|
|  14|50380|
|  15|50643|
|  16|55720|
|  17|51843|
|  18|53485|
|  19|57722|
|  20|57059|
|  21|68964|
|  22|78328|
|  23|84750|
+----+-----+



The distribution of the total tweets across hours seems to fit with the previous inference. However, it may also be because of non-random sampling which cannot be validated for the project.

## User Sentiment by User Activity

In [None]:
# user active days count
df.createOrReplaceTempView("df")
spark.sql(" \
  SELECT \
    user \
    , COUNT(DISTINCT DATE(datetime)) AS active_day_cnt \
  FROM df \
  GROUP BY user \
  ORDER BY 2 DESC \
 ").show(10)

+---------------+--------------+
|           user|active_day_cnt|
+---------------+--------------+
|       lost_dog|            38|
|          StDAY|            37|
|        adlyman|            34|
|       judez_xo|            34|
|   MyAppleStuff|            33|
|torilovesbradie|            33|
|        bigenya|            33|
|    Karen230683|            32|
|   prateekgupta|            32|
|    _magic8ball|            31|
+---------------+--------------+
only showing top 10 rows



In [None]:
# All dates included
print("The number of dates covered:")
df.select(to_date('datetime').alias('date')).distinct().count()

The number of dates covered:


48

In [None]:
# Calculate the percentages of users being active no more 10 days, 5 days, and 3 days
spark.sql("""
  WITH active_days AS (
    SELECT
      user
      , COUNT(DISTINCT DATE(datetime)) AS active_day_cnt
    FROM df
    GROUP BY user
  )
  SELECT
    "<= 10" AS category
    , ROUND(SUM(INT(active_day_cnt <= 10)) / COUNT(user) * 100, 2) AS perc
  FROM active_days
  UNION
  SELECT
    "<= 5" AS category
    , ROUND(SUM(INT(active_day_cnt <= 5)) / COUNT(user) * 100, 2) AS perc
  FROM active_days
  UNION
  SELECT
    "<= 3" AS category
    , ROUND(SUM(INT(active_day_cnt <= 3)) / COUNT(user) * 100, 2) AS perc
  FROM active_days;
""").show()

+--------+-----+
|category| perc|
+--------+-----+
|    <= 5|94.84|
|   <= 10|98.87|
|    <= 3|88.65|
+--------+-----+



As we presumed, the number of active days for around 95% users are below 5 days and 89% users are below 3 days. However, there is a wide spread of high active users - 38 days, 37 days, 34 days. <br><br>So I choose to categorize active days into a new variable tweet_freq and explore the relationship between tweet_freq and the average sentiment score of the certain user segment.<br><br>
Posted tweets in more than 5 separate days is as "high", between 3 to 5 is as "moderate", and below 3 is as "normal".

In [None]:
spark.sql("""
  WITH active_days AS (
    SELECT
      user
      , AVG(sentiment_score) AS user_sentiment_score
      , COUNT(DISTINCT DATE(datetime)) AS active_day_cnt
    FROM result
    GROUP BY user
  )
  SELECT
    CASE WHEN active_day_cnt <= 3 THEN 'normal'
         WHEN active_day_cnt <= 5 THEN 'moderate'
         ELSE 'high' END AS tweet_freq
    , ROUND(AVG(user_sentiment_score), 3) AS avg_sentiment_score
  FROM active_days
  GROUP BY 1
  ORDER BY 2 DESC
    """).show()

+----------+-------------------+
|tweet_freq|avg_sentiment_score|
+----------+-------------------+
|      high|              0.155|
|  moderate|              0.137|
|    normal|              0.115|
+----------+-------------------+

