In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, avg, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("StockSentimentAnalysis").getOrCreate()


In [2]:
# Load sentiment data
sentiment_df = spark.read.csv("classified_news_sentiment.csv", header=True, inferSchema=True)

# Convert date to standard format
sentiment_df = sentiment_df.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))

# Load stock data for AAPL as an example
aapl_df = spark.read.csv("stockdata/AAPL.csv", header=True, inferSchema=True)
aapl_df = aapl_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

acn_df = spark.read.csv("stockdata/ACN.csv", header=True, inferSchema=True)
acn_df = acn_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

amd_df = spark.read.csv("stockdata/AMD.csv", header=True, inferSchema=True)
amd_df = amd_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

amzn_df = spark.read.csv("stockdata/AMZN.csv", header=True, inferSchema=True)
amzn_df = amzn_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

googl_df = spark.read.csv("stockdata/GOOGL.csv", header=True, inferSchema=True)
googl_df = googl_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

ibm_df = spark.read.csv("stockdata/IBM.csv", header=True, inferSchema=True)
ibm_df = ibm_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

infy_df = spark.read.csv("stockdata/INFY.csv", header=True, inferSchema=True)
infy_df = infy_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

msft_df = spark.read.csv("stockdata/MSFT.csv", header=True, inferSchema=True)
msft_df = msft_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

orcl_df = spark.read.csv("stockdata/ORCL.csv", header=True, inferSchema=True)
orcl_df = orcl_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

sap_df = spark.read.csv("stockdata/SAP.csv", header=True, inferSchema=True)
sap_df = sap_df.withColumn("date", to_date(col("date"), "dd-MM-yyyy"))

In [3]:
# Join sentiment and stock price data on ticker and date
joined_df = sentiment_df.join(aapl_df, (sentiment_df.Ticker == 'AAPL') & (sentiment_df.Date == aapl_df.date), "inner")
joined_df = sentiment_df.join(acn_df, (sentiment_df.Ticker == 'ACN') & (sentiment_df.Date == acn_df.date), "inner")
joined_df = sentiment_df.join(amd_df, (sentiment_df.Ticker == 'AMD') & (sentiment_df.Date == amd_df.date), "inner")
joined_df = sentiment_df.join(amzn_df, (sentiment_df.Ticker == 'AMZN') & (sentiment_df.Date == amzn_df.date), "inner")
joined_df = sentiment_df.join(googl_df, (sentiment_df.Ticker == 'GOOGL') & (sentiment_df.Date == googl_df.date), "inner")
joined_df = sentiment_df.join(ibm_df, (sentiment_df.Ticker == 'IBM') & (sentiment_df.Date == ibm_df.date), "inner")
joined_df = sentiment_df.join(infy_df, (sentiment_df.Ticker == 'INFY') & (sentiment_df.Date == infy_df.date), "inner")
joined_df = sentiment_df.join(msft_df, (sentiment_df.Ticker == 'MSFT') & (sentiment_df.Date == msft_df.date), "inner")
joined_df = sentiment_df.join(orcl_df, (sentiment_df.Ticker == 'ORCL') & (sentiment_df.Date == orcl_df.date), "inner")
joined_df = sentiment_df.join(sap_df, (sentiment_df.Ticker == 'SAP') & (sentiment_df.Date == sap_df.date), "inner")


# Create a new column: price movement (Up/Down)
joined_df = joined_df.withColumn(
    "Price_Change",
    when(col("close") > col("open"), 1).otherwise(0)  # 1 for Up, 0 for Down
)

# Encode sentiment
sentiment_indexer = StringIndexer(inputCol="Sentiment", outputCol="Sentiment_Index")
joined_df = sentiment_indexer.fit(joined_df).transform(joined_df)

# Feature assembly
feature_cols = ["Sentiment_Score", "Sentiment_Index"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(joined_df)


In [4]:
# Average sentiment score by price movement
joined_df.groupBy("Price_Change").agg(avg("Sentiment_Score")).show()

# Count of sentiments
sentiment_df.groupBy("Sentiment").count().show()


+------------+--------------------+
|Price_Change|avg(Sentiment_Score)|
+------------+--------------------+
|           1| 0.12051860465116278|
|           0| 0.20363714285714282|
+------------+--------------------+

+---------+-----+
|Sentiment|count|
+---------+-----+
|  Neutral|  306|
| Positive|  358|
| Negative|  168|
+---------+-----+



In [5]:
train_df, test_df = assembled_df.randomSplit([0.8, 0.2], seed=42)


In [8]:
# Random Forest Classifier
rf = RandomForestClassifier(labelCol="Price_Change", featuresCol="features", numTrees=50)
model = rf.fit(train_df)

# Predictions
predictions = model.transform(test_df)


In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(predictions)

# Precision (weighted)
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(predictions)

# Recall (weighted)
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(predictions)

# F1-Score (weighted)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="f1"
)
f1 = evaluator_f1.evaluate(predictions)

# AUC (using BinaryClassificationEvaluator, applicable for multiclass with 1-vs-rest strategy)
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="Price_Change", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
auc = evaluator_auc.evaluate(predictions)

# Print all metrics
print(f"Random Forest Classifier Performance:")
print(f" - Accuracy : {accuracy:.2f}")
print(f" - Precision: {precision:.2f}")
print(f" - Recall   : {recall:.2f}")
print(f" - F1-Score : {f1:.2f}")
print(f" - AUC (ROC): {auc:.2f}")


Random Forest Classifier Performance:
 - Accuracy : 0.76
 - Precision: 0.83
 - Recall   : 0.76
 - F1-Score : 0.72
 - AUC (ROC): 0.53


In [10]:
from pyspark.ml.classification import GBTClassifier

# Initialize GBTClassifier
gbt = GBTClassifier(
    labelCol="Price_Change",
    featuresCol="features",
    maxIter=50,
    maxDepth=5
)

# Train the model
gbt_model = gbt.fit(train_df)

# Predict
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select("Price_Change", "prediction").show(5)


+------------+----------+
|Price_Change|prediction|
+------------+----------+
|           0|       1.0|
|           0|       1.0|
|           1|       1.0|
|           1|       1.0|
|           1|       1.0|
+------------+----------+
only showing top 5 rows



In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(gbt_predictions)

# Precision
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(gbt_predictions)

# Recall
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(gbt_predictions)

# F1-score
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction", metricName="f1"
)
f1 = evaluator_f1.evaluate(gbt_predictions)

# Print all metrics
print(f"GBT Classifier Performance:")
print(f" - Accuracy : {accuracy:.2f}")
print(f" - Precision: {precision:.2f}")
print(f" - Recall   : {recall:.2f}")
print(f" - F1-Score : {f1:.2f}")
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# AUC
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="Price_Change", rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
auc = evaluator_auc.evaluate(gbt_predictions)

# Display result
print(f" - AUC (Area Under ROC): {auc:.2f}")


GBT Classifier Performance:
 - Accuracy : 0.76
 - Precision: 0.83
 - Recall   : 0.76
 - F1-Score : 0.72
 - AUC (Area Under ROC): 0.53


In [6]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Assuming your features vector size is `input_size`
input_size = len(train_df.select("features").first()[0])  # dynamically get input size

mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="Price_Change",
    predictionCol="prediction",
    layers=[input_size, 10, 5, 2],  # [input, hidden1, hidden2, output]
    blockSize=128,
    maxIter=100
)

mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
mlp_predictions.select("Price_Change", "prediction").show(5)


+------------+----------+
|Price_Change|prediction|
+------------+----------+
|           0|       1.0|
|           0|       0.0|
|           1|       1.0|
|           1|       1.0|
|           1|       1.0|
+------------+----------+
only showing top 5 rows



In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Multiclass Evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Price_Change", predictionCol="prediction"
)

# Accuracy
accuracy = evaluator.setMetricName("accuracy").evaluate(mlp_predictions)

# F1 Score
f1 = evaluator.setMetricName("f1").evaluate(mlp_predictions)

# Precision
precision = evaluator.setMetricName("weightedPrecision").evaluate(mlp_predictions)

# Recall
recall = evaluator.setMetricName("weightedRecall").evaluate(mlp_predictions)

print(f"Multilayer Perceptron Classifier:")
print(f"  Accuracy : {accuracy:.4f}")
print(f"  F1 Score : {f1:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall   : {recall:.4f}")
# Binary Classification Evaluator for AUC
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="Price_Change",
    rawPredictionCol="rawPrediction",  # required for AUC
    metricName="areaUnderROC"
)

auc_mlp = binary_evaluator.evaluate(mlp_predictions)
print(f"  AUC (ROC) for Multi Layer Perceptron: {auc_mlp:.4f}")



Multilayer Perceptron Classifier:
  Accuracy : 0.8235
  F1 Score : 0.8047
  Precision: 0.8613
  Recall   : 0.8235
  AUC (ROC) for Multi Layer Perceptron: 0.3182
