In [33]:
# path = r"/home/mb/college/bda/BDA/datasets/StockMarket.csv"

# import pandas as pd

# df = pd.read_csv(path)

# print(df.shape)

# df.fillna(df.mean(numeric_only=True), inplace=True)

# print(df.shape)

# df.dropna(inplace=True)

# print(df.shape)

# for col in df.columns:
#     print(col, end=", ")

# df.to_csv("StockMarket_filled.csv", index=False)


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg as spark_avg
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, VectorAssembler, StandardScaler
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Start SparkSession
spark = SparkSession.builder.appName("StockMarketPrediction").getOrCreate()

# Load and clean data
df = spark.read.csv("/home/mb/college/bda/BDA/yes-daddy/StockMarket_filled.csv", header=True, inferSchema=True)
df = df.dropna()

# Moving Averages
windowSpec = Window.partitionBy("ticker").orderBy("date").rowsBetween(-4, 0)
ma_columns = ["open", "close", "volume", "INCREMENTO", "diff"]
for col_name in ma_columns:
    df = df.withColumn(f"MovingAvg{col_name}", spark_avg(col_name).over(windowSpec))

# Feature list
features = [
    'open', 'high', 'low', 'close', 'adjclose', 'volume',
    'RSIadjclose15', 'RSIvolume15', 'RSIadjclose25', 'RSIvolume25',
    'RSIadjclose50', 'RSIvolume50', 'MACDadjclose15', 'MACDvolume15',
    'MACDadjclose25', 'MACDvolume25', 'MACDadjclose50', 'MACDvolume50',
    'MovingAvgopen', 'MovingAvgclose', 'MovingAvgvolume',
    'MovingAvgINCREMENTO', 'MovingAvgdiff'
]

# Index categorical column
indexer = StringIndexer(inputCol="ticker", outputCol="tickerIndex", handleInvalid="skip")

# Add to features
features.append("tickerIndex")

# Assemble features into vector
assembler = VectorAssembler(inputCols=features, outputCol="features")

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

# Model
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="TARGET", maxIter=10)

# Pipeline
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

# Split data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train model
print("Training Logistic Regression in pipeline...")
model = pipeline.fit(train_data)

# Predict
predictions = model.transform(test_data)

# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="TARGET", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Logistic Regression Accuracy: {accuracy:.4f}")

# F1 Score
evaluator.setMetricName("f1")
f1_score = evaluator.evaluate(predictions)
print(f"Logistic Regression F1 Score: {f1_score:.4f}")

predictions.select("ticker", "date", "TARGET", "prediction", "probability").show(10, truncate=False)


# Stop Spark
spark.stop()


25/04/22 00:40:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Training Logistic Regression in pipeline...


                                                                                

Logistic Regression Accuracy: 0.8582
Logistic Regression F1 Score: 0.8512
+------+----------+------+----------+-----------------------------------------+
|ticker|date      |TARGET|prediction|probability                              |
+------+----------+------+----------+-----------------------------------------+
|ASLE  |2022-02-03|1     |1.0       |[0.05317952389027442,0.9468204761097255] |
|ASLE  |2022-02-09|1     |1.0       |[0.33327364580496965,0.6667263541950303] |
|ASLE  |2022-02-11|0     |0.0       |[0.7317741957386764,0.2682258042613236]  |
|ASLE  |2022-02-18|0     |0.0       |[0.7918584667981663,0.2081415332018337]  |
|ASLE  |2022-03-01|0     |0.0       |[0.8615387022954036,0.13846129770459636] |
|ASLE  |2022-03-07|0     |0.0       |[0.9484394281259682,0.051560571874031846]|
|ASLE  |2022-03-15|1     |1.0       |[0.35170814674072587,0.6482918532592741] |
|ASLE  |2022-03-23|0     |0.0       |[0.9065418199495142,0.09345818005048578] |
|ASLE  |2022-04-06|0     |0.0       |[0.879063