# Data Processing

## Data Aggregation

In [2]:
import pandas as pd

In [25]:
# read the two csv files
tweet_df = pd.read_csv("../data/tweet_data.csv")
stock_df = pd.read_csv("../data/stock_data.csv")

In [26]:
# transform the date column in the stock data to datetime and keep the date 
stock_df['Date'] = pd.to_datetime(stock_df['Date'])
stock_df['Date'] = stock_df['Date'].dt.date

In [27]:
# Transform the date column in the tweet data to datetime and keep the date
tweet_df['Date'] = pd.to_datetime(tweet_df['Date'])
tweet_df['Date'] = tweet_df['Date'].dt.date

In [28]:
# drop the full company name column from the tweet data
tweet_df.drop('Company Name', axis=1, inplace=True)

In [29]:
# print the columns of the two dataframes
print("Tweet Data Columns: ")
for col in tweet_df.columns:
    print(col)
print()
print("Stock Data Columns: ")
for col in stock_df.columns:
    print(col)

Tweet Data Columns: 
Date
Tweet
Stock Name

Stock Data Columns: 
Date
Open
High
Low
Close
Adj Close
Volume
Stock Name


In [30]:
# check how many dates figure in both dataframes
print("Number of dates in both data: ", len(set(tweet_df['Date'].unique()) & set(stock_df['Date'].unique())))

Number of dates in both data:  252


In [36]:
# join the two dataframes on the date column
joined_df = pd.merge(tweet_df, stock_df, 
                     on=['Date', 'Stock Name'], 
                     how='left')

In [40]:
# print the columns of the joined dataframe
print("Joined Data Columns: ")
for col in joined_df.columns:
    print(col)

# print the shape of the joined dataframe
print("Joined Data Shape: ", joined_df.shape)

Joined Data Columns: 
Date
Tweet
Stock Name
Open
High
Low
Close
Adj Close
Volume
Joined Data Shape:  (80793, 9)


In [41]:
# save the new dataframe to a csv file
joined_df.to_csv("../data/joined_data.csv", index=False)

## Data Cleaning

In [3]:
# read the new dataframe from the csv file
df = pd.read_csv("../data/joined_data.csv")

### Missing Values

In [4]:
# get a rundown of the missing values per column
print("Missing Values: ")
for col in df.columns:
    print(col, df[col].isnull().sum())

Missing Values: 
Date 0
Tweet 0
Stock Name 0
Open 17117
High 17117
Low 17117
Close 17117
Adj Close 17117
Volume 17117


In [5]:
# drop the rows with missing values
df.dropna(inplace=True)

In [6]:
# print the shape of the dataframe
print("New shape of the dataframe: ", df.shape)

New shape of the dataframe:  (63676, 9)


In [12]:
# save the cleaned dataframe to a csv file
df.to_csv("../data/cleaned_data.csv", index=False)

## Data Preprocessing

### Tweet Cleaning

In [6]:
import emoji
import re
import pandas as pd

In [2]:
# get a list of the stop words
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english')) 

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\elaty\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
def split_hashtag(hashtag):
    # Splitting based on the presence of capital letters (camel case)
    parts = re.findall('[A-Z][^A-Z]*', hashtag)
    # If the hashtag is all lowercase, just return it as is.
    if not parts:
        return hashtag
    # Re-join the parts with a space
    return ' '.join(part for part in parts)

# Process hashtags
def replace_with_processed(match):
    hashtag = match.group(1)
    return split_hashtag(hashtag)

# create a function to clean the text of a tweet
def clean_tweet(tweet):
    # Convert emojis to words
    tweet = emoji.demojize(tweet)

    # Replace hashtags in the tweet with their processed forms
    tweet = re.sub(r'#(\w+)', replace_with_processed, tweet)
    
    # Lowercase the tweet
    tweet = tweet.lower()

    # Remove stopwords (optional, based on your preference)
    tweet = ' '.join([word for word in tweet.split() if word not in stop_words])
    
    # Remove URLs
    tweet = re.sub(r'http\S+|www\S+|https\S+', '', tweet, flags=re.MULTILINE)
    
    # Remove user @ references
    tweet = re.sub(r'\@\w+','', tweet)
    
    # Remove punctuations and special characters (except $ and %)
    tweet = re.sub(r'[^\w\s\$%]', '', tweet)
    
    return tweet

### Preprocessing

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import log1p, col, udf
from pyspark.sql.types import DoubleType
import joblib
import pyspark

In [2]:
print(pyspark.__version__)

3.5.0


In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("spark://localhost:7077") \
    .getOrCreate()

In [3]:
# Load your data into a DataFrame (adjust path and format as needed)
df = spark.read.csv("../data/cleaned_data.csv", header=True, inferSchema=True)

In [4]:
# Log transform the Volume column
df = df.withColumn("Volume", log1p(df["Volume"] + 1))

In [5]:
# UDF to extract the first element of a vector column
first_element = udf(lambda v:float(v[0]), DoubleType())

def scale_column(df, inputCol, scaler):
    # Vectorize the column
    assembler = VectorAssembler(inputCols=[inputCol], outputCol=inputCol + "_Vec")
    df = assembler.transform(df)
    
    # Fit and transform the data
    scalerModel = scaler.setInputCol(inputCol + "_Vec").setOutputCol(inputCol + "_Scaled")
    df = scalerModel.fit(df).transform(df)
    
    # Extract the scaled value and update the original column
    df = df.withColumn(inputCol, first_element(col(inputCol + "_Scaled")))
    
    # Drop the intermediate vector columns
    df = df.drop(inputCol + "_Vec", inputCol + "_Scaled")
    
    return df

In [6]:
# Columns to scale
columns_to_scale = ['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

# Apply the scalers
for col_name in columns_to_scale:
    # Min-Max Scaling
    min_max_scaler = MinMaxScaler()
    df = scale_column(df, col_name, min_max_scaler)
    
    # Standard Scaling (Z-score Normalization)
    standard_scaler = StandardScaler(withStd=True, withMean=True)
    df = scale_column(df, col_name, standard_scaler)

# Show the result
df.show()

Py4JJavaError: An error occurred while calling o110.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 10) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:54)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more
