# Federico Ariton
# Master of Science in Data Analytics
# Semester 2 - CA2 Integreated
# Student Number: sba22090


In [1]:
# Check inside stockprice
!hdfs dfs -ls /user/hduser/ca2-data/stockprice

# Check inside stocktweet
!hdfs dfs -ls /user/hduser/ca2-data/stocktweet



Found 41 items
-rw-r--r--   1 hduser supergroup      27248 2025-05-04 12:25 /user/hduser/ca2-data/stockprice/^GSPC.csv
-rw-r--r--   1 hduser supergroup      27598 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/AAPL.csv
-rw-r--r--   1 hduser supergroup       1475 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/ABNB.csv
-rw-r--r--   1 hduser supergroup      27440 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/AMT.csv
-rw-r--r--   1 hduser supergroup      27756 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/AMZN.csv
-rw-r--r--   1 hduser supergroup      27146 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/BA.csv
-rw-r--r--   1 hduser supergroup      27341 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/BABA.csv
-rw-r--r--   1 hduser supergroup      28475 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/BAC.csv
-rw-r--r--   1 hduser supergroup      25665 2025-05-04 12:22 /user/hduser/ca2-data/stockprice/BKNG.csv
-rw-r--r--   1 hduser supergroup      15550 2025-05-04 12:22 

In [6]:
pip install textblob

Defaulting to user installation because normal site-packages is not writeable
Collecting textblob
  Downloading textblob-0.19.0-py3-none-any.whl (624 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m624.3/624.3 KB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
[?25hCollecting nltk>=3.9
  Downloading nltk-3.9.1-py3-none-any.whl (1.5 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
[?25hCollecting regex>=2021.8.3
  Downloading regex-2024.11.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (781 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m781.7/781.7 KB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lower, regexp_replace, avg, count, lit, when, lag, stddev
from pyspark.sql.types import FloatType
from textblob import TextBlob
from pyspark.sql.window import Window




## Spark Session and configuration

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SentimentAnalysisWithSparkNLP") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.2.3") \
    .getOrCreate()

## Load annd Preprocess Tweet Data

In [5]:
# Load tweets CSV
tweets_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("hdfs:///user/hduser/ca2-data/stocktweet/stocktweet.csv")

# Show schema and sample
tweets_df.printSchema()
tweets_df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

root
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- tweet: string (nullable = true)

+------+----------+------+--------------------+
|    id|      date|ticker|               tweet|
+------+----------+------+--------------------+
|100001|01/01/2020|  AMZN|$AMZN Dow futures...|
|100002|01/01/2020|  TSLA|$TSLA Daddy's dri...|
|100003|01/01/2020|  AAPL|$AAPL We’ll been ...|
|100004|01/01/2020|  TSLA|$TSLA happy new y...|
|100005|01/01/2020|  TSLA|"$TSLA haha just ...|
+------+----------+------+--------------------+
only showing top 5 rows



In [6]:
from pyspark.sql.functions import col, lower, regexp_replace, to_date

# Remove $ and lowercase ticker
tweets_df = tweets_df.withColumn("ticker", regexp_replace(lower(col("ticker")), "\\$", ""))

# Clean tweet text
tweets_df = tweets_df.withColumn("tweet_clean", regexp_replace(lower(col("tweet")), "[^a-zA-Z\\s]", ""))

# CORRECT date parsing
tweets_df = tweets_df.withColumn("date", to_date(col("date"), "dd/MM/yyyy"))

# Show clean output
tweets_df.select("ticker", "date", "tweet", "tweet_clean").show(5, truncate=False)


+------+----------+-------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|ticker|date      |tweet                                                                                                                                      |tweet_clean                                                                                                               |
+------+----------+-------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|amzn  |2020-01-01|$AMZN Dow futures up by 100 points already 🥳                                                                                        

In [7]:
tweets_df.printSchema()


root
 |-- id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- ticker: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tweet_clean: string (nullable = true)



In [8]:
tweets_df.show(5)

+------+----------+------+--------------------+--------------------+
|    id|      date|ticker|               tweet|         tweet_clean|
+------+----------+------+--------------------+--------------------+
|100001|2020-01-01|  amzn|$AMZN Dow futures...|amzn dow futures ...|
|100002|2020-01-01|  tsla|$TSLA Daddy's dri...|tsla daddys drink...|
|100003|2020-01-01|  aapl|$AAPL We’ll been ...|aapl well been ri...|
|100004|2020-01-01|  tsla|$TSLA happy new y...|tsla happy new ye...|
|100005|2020-01-01|  tsla|"$TSLA haha just ...|tsla haha just a ...|
+------+----------+------+--------------------+--------------------+
only showing top 5 rows



In [9]:
pip install spark-nlp


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


## Sentimental Analysis Using Spark NLP

In [10]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, ViveknSentimentModel
from pyspark.ml import Pipeline

# Step 1: Convert to document
document = DocumentAssembler() \
    .setInputCol("tweet_clean") \
    .setOutputCol("document")

# Step 2: Tokenize
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Step 3: Apply pretrained sentiment model
sentiment = ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "token"]) \
    .setOutputCol("sentiment")

# Pipeline
pipeline = Pipeline(stages=[document, tokenizer, sentiment])

# Fit and apply
model = pipeline.fit(tweets_df)
result = model.transform(tweets_df)

# Show results
result.select("tweet", "sentiment.result").show(5, truncate=False)


sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[ / ]sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[ — ]Download done! Loading the resource.
[ \ ]

                                                                                

[ | ]



[OK!]




+-------------------------------------------------------------------------------------------------------------------------------------------+----------+
|tweet                                                                                                                                      |result    |
+-------------------------------------------------------------------------------------------------------------------------------------------+----------+
|$AMZN Dow futures up by 100 points already 🥳                                                                                              |[negative]|
|$TSLA Daddy's drinkin' eArly tonight! Here's to a PT of ohhhhh $1000 in 2020! 🍻                                                           |[negative]|
|$AAPL We’ll been riding since last December from $172.12 what to do. Decisions decisions hmm 🤔. I have 20 mins to decide. Any suggestions?|[positive]|
|$TSLA happy new year, 2020, everyone🍷🎉🙏                                             

## Extract Sentiment Label

In [11]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Extract the first element from the result array
extract_sentiment = udf(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else None, StringType())
result = result.withColumn("sentiment_label", extract_sentiment(col("sentiment.result")))

# Show results
result.select("tweet", "sentiment_label").show(5, truncate=False)


[Stage 10:>                                                         (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|tweet                                                                                                                                      |sentiment_label|
+-------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|$AMZN Dow futures up by 100 points already 🥳                                                                                              |negative       |
|$TSLA Daddy's drinkin' eArly tonight! Here's to a PT of ohhhhh $1000 in 2020! 🍻                                                           |negative       |
|$AAPL We’ll been riding since last December from $172.12 what to do. Decisions decisions hmm 🤔. I have 20 mins to decide. Any suggestions?|positive       |
|$TSLA happy new year, 2020, everyone🍷🎉🙏               

                                                                                

## Aggregate Sentiment Scores per Ticker-Date

In [12]:
agg_df = result.groupBy("ticker", "date", "sentiment_label").count().orderBy("date")
agg_df.show(10, truncate=False)


[Stage 11:>                                                         (0 + 1) / 1]

+---------------------------------------------------------+----+---------------+-----+
|ticker                                                   |date|sentiment_label|count|
+---------------------------------------------------------+----+---------------+-----+
| 116.50????? 😍🙏🏼"                                     |null|null           |1    |
| and obviously sales hit!                                |null|null           |1    |
| nissan leaf                                             |null|negative       |1    |
| lol ! 🖕"                                               |null|null           |1    |
| will drop down to 2500 📉📉📉 then it will rise to 4000"|null|null           |1    |
|null                                                     |null|null           |1119 |
| still would be fine. it’s tesla. 💥"                    |null|null           |1    |
| overvalued                                              |null|positive       |1    |
| up a lot today🚀🚀 🚀"                              

                                                                                

In [13]:
pivot_df = agg_df.groupBy("ticker", "date") \
    .pivot("sentiment_label", ["positive", "negative", "neutral"]) \
    .sum("count") \
    .fillna(0)

pivot_df.show(10)


[Stage 14:>                                                         (0 + 1) / 1]

+------+----------+--------+--------+-------+
|ticker|      date|positive|negative|neutral|
+------+----------+--------+--------+-------+
|   bac|2020-07-16|       0|       1|      0|
|   ccl|2020-11-13|       1|       1|      0|
|  amzn|2020-08-05|       0|       2|      0|
|    ba|2020-04-15|       4|       5|      0|
|    ba|2020-12-22|       2|       0|      0|
|  tsla|2020-11-11|       1|       2|      0|
|  nvda|2020-12-01|       1|       0|      0|
|  tsla|2020-01-13|       8|       1|      0|
|    ba|2020-03-25|      20|      20|      0|
|  tsla|2020-01-24|       5|       2|      0|
+------+----------+--------+--------+-------+
only showing top 10 rows



                                                                                

In [14]:
pivot_df.show(100)

[Stage 20:>                                                         (0 + 1) / 1]

+------+----------+--------+--------+-------+
|ticker|      date|positive|negative|neutral|
+------+----------+--------+--------+-------+
|   bac|2020-07-16|       0|       1|      0|
|   ccl|2020-11-13|       1|       1|      0|
|  amzn|2020-08-05|       0|       2|      0|
|    ba|2020-04-15|       4|       5|      0|
|    ba|2020-12-22|       2|       0|      0|
|  tsla|2020-11-11|       1|       2|      0|
|  nvda|2020-12-01|       1|       0|      0|
|  tsla|2020-01-13|       8|       1|      0|
|    ba|2020-03-25|      20|      20|      0|
|  tsla|2020-01-24|       5|       2|      0|
|  sbux|2020-10-08|       0|       1|      0|
|  baba|2020-12-03|       1|       0|      0|
|    ma|2020-03-05|       0|       1|      0|
|  tsla|2020-01-29|       7|      10|      0|
|  tsla|2020-07-14|      14|      15|      0|
|    fb|2020-07-07|       0|       2|      0|
|    fb|2020-03-05|       1|       3|      0|
|  baba|2020-02-12|       1|       1|      0|
|     v|2020-03-18|       1|      

                                                                                

## Calculate Average Sentiment Score and Volume

In [15]:
from pyspark.sql.functions import when, avg, count

# Assign +1 to positive, -1 to negative, 0 to neutral
result = result.withColumn(
    "sentiment_score",
    when(col("sentiment_label") == "positive", 1)
    .when(col("sentiment_label") == "negative", -1)
    .otherwise(0)
)

# Aggregate avg sentiment and tweet volume per ticker-date
avg_sentiment_df = result.groupBy("ticker", "date").agg(
    avg("sentiment_score").alias("avg_sentiment"),
    count("tweet").alias("tweet_volume")
)

avg_sentiment_df.show(10)

[Stage 26:>                                                         (0 + 1) / 1]

+------+----------+-------------------+------------+
|ticker|      date|      avg_sentiment|tweet_volume|
+------+----------+-------------------+------------+
|  tsla|2020-01-13| 0.7777777777777778|           9|
|  tsla|2020-01-24|0.42857142857142855|           7|
|    ba|2020-03-25|                0.0|          41|
|    ba|2020-04-15|-0.1111111111111111|           9|
|   bac|2020-07-16|               -1.0|           1|
|  amzn|2020-08-05|               -1.0|           2|
|  tsla|2020-11-11|-0.3333333333333333|           3|
|   ccl|2020-11-13|                0.0|           2|
|  nvda|2020-12-01|                1.0|           1|
|    ba|2020-12-22|                1.0|           2|
+------+----------+-------------------+------------+
only showing top 10 rows



                                                                                

In [16]:
final_sentiment_df = pivot_df.join(avg_sentiment_df, on=["ticker", "date"], how="left")
final_sentiment_df.show(10)

[Stage 30:>                                                         (0 + 1) / 1]

+------+----------+--------+--------+-------+-------------------+------------+
|ticker|      date|positive|negative|neutral|      avg_sentiment|tweet_volume|
+------+----------+--------+--------+-------+-------------------+------------+
|   bac|2020-07-16|       0|       1|      0|               -1.0|           1|
|   ccl|2020-11-13|       1|       1|      0|                0.0|           2|
|  amzn|2020-08-05|       0|       2|      0|               -1.0|           2|
|    ba|2020-04-15|       4|       5|      0|-0.1111111111111111|           9|
|    ba|2020-12-22|       2|       0|      0|                1.0|           2|
|  tsla|2020-11-11|       1|       2|      0|-0.3333333333333333|           3|
|  nvda|2020-12-01|       1|       0|      0|                1.0|           1|
|  tsla|2020-01-13|       8|       1|      0| 0.7777777777777778|           9|
|    ba|2020-03-25|      20|      20|      0|                0.0|          41|
|  tsla|2020-01-24|       5|       2|      0|0.42857

                                                                                

## Load and Merge Stock Price Data

In [18]:
from pyspark.sql.functions import lit

tickers = ["AAPL", "TSLA", "AMZN", "DIS", "BA", "MSFT"]
stock_dfs = []

for ticker in tickers:
    path = f"hdfs:///user/hduser/ca2-data/stockprice/{ticker}.csv"
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
    df = df.withColumn("ticker", lit(ticker))
    stock_dfs.append(df)

# Union all into one DataFrame
from functools import reduce
from pyspark.sql import DataFrame

stock_df = reduce(DataFrame.unionByName, stock_dfs)

# Show schema and sample
stock_df.printSchema()
stock_df.show(5)

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- ticker: string (nullable = false)

+-------------------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+------+
|               Date|             Open|             High|              Low|            Close|        Adj Close|   Volume|ticker|
+-------------------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+------+
|2019-12-31 00:00:00|72.48249816894531|73.41999816894531|72.37999725341797| 73.4124984741211|71.52082061767578|100805600|  AAPL|
|2020-01-02 00:00:00|74.05999755859375| 75.1500015258789|73.79750061035156| 75.0875015258789|73.15264892578125|135480400|  AAPL|
|2020-01-03 00:00:00| 74.2874984741211| 75.1449

In [19]:
from pyspark.sql.functions import upper

final_sentiment_df = final_sentiment_df.withColumn("ticker", upper(col("ticker")))


In [20]:
from pyspark.sql.functions import to_date

# Rename and format date
stock_df = stock_df.withColumnRenamed("Date", "date")
stock_df = stock_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))


In [21]:
# Join stock data with sentiment features
merged_df = stock_df.join(final_sentiment_df, on=["ticker", "date"], how="left")

# Fill NA for sentiment columns with defaults
merged_df = merged_df.fillna({
    "positive": 0,
    "negative": 0,
    "neutral": 0,
    "avg_sentiment": 0.0,
    "tweet_volume": 0
})


## Feature Engineering 

In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, avg, stddev

w = Window.partitionBy("ticker").orderBy("date")

merged_df = merged_df \
    .withColumn("lag_Close_1", lag("Close", 1).over(w)) \
    .withColumn("lag_sentiment_1", lag("avg_sentiment", 1).over(w)) \
    .withColumn("avg_Close_5", avg("Close").over(w.rowsBetween(-4, 0))) \
    .withColumn("volatility_5", stddev("Close").over(w.rowsBetween(-4, 0)))


In [23]:
merged_df.select(
    "ticker", "date", "Close", "avg_sentiment", "tweet_volume", 
    "positive", "negative", "neutral", 
    "lag_Close_1", "lag_sentiment_1", "avg_Close_5", "volatility_5"
).show(10)


                                                                                

+------+----------+-----------------+-------------------+------------+--------+--------+-------+-----------------+-------------------+-----------------+------------------+
|ticker|      date|            Close|      avg_sentiment|tweet_volume|positive|negative|neutral|      lag_Close_1|    lag_sentiment_1|      avg_Close_5|      volatility_5|
+------+----------+-----------------+-------------------+------------+--------+--------+-------+-----------------+-------------------+-----------------+------------------+
|  AAPL|2019-12-31| 73.4124984741211|                0.0|           0|       0|       0|      0|             null|               null| 73.4124984741211|              null|
|  AAPL|2020-01-02| 75.0875015258789|                0.0|           8|       4|       4|      0| 73.4124984741211|                0.0|            74.25|1.1844060164061108|
|  AAPL|2020-01-03|74.35749816894531|-0.3333333333333333|           6|       2|       4|      0| 75.0875015258789|                0.0|74.285

In [24]:
from pyspark.sql.functions import when

merged_df = merged_df.withColumn(
    "has_sentiment",
    when(col("tweet_volume") > 0, 1).otherwise(0)
)


In [25]:
merged_df.show(10)

                                                                                

+------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+--------+--------+-------+-------------------+------------+-----------------+-------------------+-----------------+------------------+-------------+
|ticker|      date|             Open|             High|              Low|            Close|        Adj Close|   Volume|positive|negative|neutral|      avg_sentiment|tweet_volume|      lag_Close_1|    lag_sentiment_1|      avg_Close_5|      volatility_5|has_sentiment|
+------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+---------+--------+--------+-------+-------------------+------------+-----------------+-------------------+-----------------+------------------+-------------+
|  AAPL|2019-12-31|72.48249816894531|73.41999816894531|72.37999725341797| 73.4124984741211|71.52082061767578|100805600|       0|       0|      0|                0.0|           0|             null|

In [26]:
merged_df.printSchema()


root
 |-- ticker: string (nullable = false)
 |-- date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- positive: long (nullable = false)
 |-- negative: long (nullable = false)
 |-- neutral: long (nullable = false)
 |-- avg_sentiment: double (nullable = false)
 |-- tweet_volume: long (nullable = false)
 |-- lag_Close_1: double (nullable = true)
 |-- lag_sentiment_1: double (nullable = true)
 |-- avg_Close_5: double (nullable = true)
 |-- volatility_5: double (nullable = true)
 |-- has_sentiment: integer (nullable = false)



## Saving Final Processed Data

## MongoDB

In [27]:
merged_df.write.mode("overwrite").parquet("hdfs:///processed-data/stock_sentiment.parquet")


                                                                                