# **Machine Learning Implentation:**

**Installing Spark Libraries**

In [3]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=8db88fa5b129e8ed9676776d7d6d2748e8707e81f3f97285c688081d57902f53
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [4]:
pip install bloom-filter2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting bloom-filter2
  Downloading bloom-filter2-2.0.0-1.tar.gz (6.6 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Discarding [4;34mhttps://files.pythonhosted.org/packages/2b/5d/8de4a849ebe212217e6d8f4798a6918d4035e741c44730da81272f170b47/bloom-filter2-2.0.0-1.tar.gz (from https://pypi.org/simple/bloom-filter2/)[0m: [33mRequested bloom-filter2 from https://files.pythonhosted.org/packages/2b/5d/8de4a849ebe212217e6d8f4798a6918d4035e741c44730da81272f170b47/bloom-filter2-2.0.0-1.tar.gz has inconsistent version: expected '2.0.0.post1', but metadata has '2.0.0'[0m
  Downloading bloom_filter2-2.0.0-py3-none-any.whl (6.8 kB)
Installing collected packages: bloom-filter2
Successfully installed bloom-filter2-2.0.0


** Import required libraries for the project**

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, udf, concat_ws, concat, to_date, collect_list, translate, regexp_replace, when
from pyspark.sql.types import BooleanType, StringType
from bloom_filter2 import BloomFilter
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes, LogisticRegression, RandomForestClassifier, DecisionTreeClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import matplotlib.pyplot as plt
import numpy as np

**Instantiate a Spark Session**

In [47]:
spark = SparkSession.builder.appName('SentimentAnalyzer').getOrCreate()

**Loading Reddit Data**

In [48]:
# Load data and rename column
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("multiLine", "true")\
    .option("delimiter", "¥")\
    .load("/content/drive/MyDrive/BigDataProject/reddit_data1.csv")\
    .coalesce(5)

In [45]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [49]:
df.head()

Row(_c0='0', id='rtbwo0', title='Happy New Years you retards❤️', comment=None, timestamp='2021-12-31 23:57:11', time_key=datetime.date(2021, 12, 31), SP500=-0.0026261799136575448, TESLA=-0.012668840823859995)

**Preprocessing data**

In [50]:
df = df.withColumn('comment', lower(col('comment')))

In [51]:
# filter to see if title column contains any keyword from keywords
keywords = ["SP500" , "S&P500"]
def my_filter(col):
    for keyword in keywords:
        if keyword.lower() in col.lower():
            return True
    return False

filterUDF = udf(my_filter, BooleanType())
ids = df.filter(col("title").isNotNull()).filter(filterUDF('title')).select("ID")

In [52]:
# create and populate bloom filter
bloomFilterIDS = BloomFilter(ids.count(), 0.000000001)
collected_ids = ids.collect()
for row in collected_ids:
    bloomFilterIDS.add(row["ID"])

In [53]:
broadcastFilterIds = spark.sparkContext.broadcast(bloomFilterIDS)

In [54]:
def my_filter_by_ids(col):
    return col in broadcastFilterIds.value
        
filterIdUDF = udf(my_filter_by_ids, BooleanType())
bloomedFilteredData = df.filter(col("SP500").isNotNull()).filter(filterIdUDF('ID'))

In [55]:
bloomedFilteredData = bloomedFilteredData.withColumn("date_stock",to_date("timestamp"))

In [56]:
bloomedFilteredData = bloomedFilteredData.na.drop(subset=["comment"])


In [57]:
bloomedFilteredData= bloomedFilteredData.drop("_c0","id","title", "timestamp", "time_key", "TESLA")


In [58]:
df1 = bloomedFilteredData.groupby('date_stock', 'SP500').agg(collect_list('comment').alias("comment"))


In [59]:
df2 = df1.withColumn("comment",
   concat_ws(",",col("comment")))



In [60]:
df2 = df2.withColumn('comment', translate('comment', '!"#$%&\'()*+,-./:;<=>?@[\\]^_{|}~', '" '))
df2 = df2.withColumn('comment', regexp_replace('comment', '"', ' '))
df2 = df2.withColumn('comment', regexp_replace('comment', "'", ' '))


In [61]:
df2.filter(df2.date_stock == "2022-05-04") \
    .show(truncate=False)

+----------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|date_stock|SP500              |comment                                                                                                                                                                                                                                                                                             |
+----------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2022-05-04|0.02986242

In [62]:
df2.head()

Row(date_stock=datetime.date(2022, 5, 4), SP500=0.02986242108440229, comment='hey dangmemesub please submit your post as a text post and add some additional context make sure to include the link \n\ni am a bot and this action was performed automatically please contact the moderators of this subredditmessagecomposetorwallstreetbets if you have any questions or concerns')

In [63]:
df2= df2.withColumn("SP500", when(df2["SP500"]>0,1).otherwise(0))

In [64]:
df2= df2.withColumnRenamed("SP500","label")

**Spark ML pipeline setup**

In [65]:
stages = []

regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

swr = StopWordsRemover(inputCol="tokens", outputCol="Comments")
stages += [swr]

cv = CountVectorizer(inputCol="Comments", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]


vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]




 RegexTokenizer_93917d7946bd

 StopWordsRemover_1e0879e44c61

 CountVectorizer_2518b940c172

 VectorAssembler_d7e18903be77


[None, None, None, None]

**Training and testing models**

##Pipeline Fitting:

In [66]:
pipeline = Pipeline(stages=stages)
data = pipeline.fit(df2).transform(df2)

In [67]:
train, test = data.randomSplit([0.7, 0.3])

## Naive Bayes Implementation

In [68]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)

In [69]:
predictions = model.transform(test)
# Select results to view
predictions.limit(20).select("label", "prediction", "probability").show(truncate=False)

+-----+----------+-------------------------------------------+
|label|prediction|probability                                |
+-----+----------+-------------------------------------------+
|1    |1.0       |[2.0421381807012315E-12,0.9999999999979579]|
|0    |1.0       |[2.046222685002206E-11,0.9999999999795377] |
|0    |0.0       |[0.9917582109998717,0.00824178900012843]   |
|0    |1.0       |[0.2525753840628213,0.7474246159371786]    |
|0    |1.0       |[6.6625734958572395E-21,1.0]               |
|0    |1.0       |[2.972663531775293E-13,0.9999999999997027] |
|0    |0.0       |[1.0,6.7886360035995E-105]                 |
|0    |0.0       |[0.9999530717089651,4.692829103496594E-5]  |
|0    |0.0       |[0.9938682673666384,0.006131732633361661]  |
|0    |0.0       |[0.9999994773183344,5.226816657378221E-7]  |
|0    |1.0       |[5.015124412461461E-13,0.9999999999994984] |
+-----+----------+-------------------------------------------+



In [70]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
nbaccuracy = evaluator.evaluate(predictions)
print ("Test Area Under ROC: ", nbaccuracy)

Test Area Under ROC:  0.75


**Cross Validation Evaluation for Naive Bayes Model:**

In [71]:


# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
# Run Cross-validation
cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)


0.4

In [None]:

# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
print ("Test Area Under ROC: ", evaluator.evaluate(cvPredictions))


[Stage 492:>                                                        (0 + 1) / 1]

Test Area Under ROC:  0.8333333333333334



                                                                                

## Logistic regression Model:

In [None]:
log_reg = LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
model2 = log_reg.fit(train)
predictions = model2.transform(test)

evaluator = BinaryClassificationEvaluator().setLabelCol('label').setRawPredictionCol('prediction').setMetricName('areaUnderROC')
lgaccuracy = evaluator.evaluate(predictions)
print(lgaccuracy)


                                                                                

0.5


**Cross Validation Evaluation for logistic Rergression Model**

In [None]:
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
# Run Cross-validation
cv = CrossValidator(estimator=log_reg, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)

                                                                                

0.5

## Random Forest Classifier Model:

In [None]:
rf = RandomForestClassifier().setLabelCol('label').setFeaturesCol('features').setNumTrees(10)
model = rf.fit(train)
predictions = model.transform(test)

evaluator = BinaryClassificationEvaluator().setLabelCol('label').setRawPredictionCol('prediction').setMetricName("areaUnderROC")
rfaccuracy = evaluator.evaluate(predictions)
print(rfaccuracy)

22/12/22 17:25:03 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 22 (= number of training instances)
[Stage 1972:>                                                       (0 + 1) / 1]

0.5833333333333333



                                                                                

**Cross Validation Evaluation for Randon Forest Classifier Model**

In [None]:
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
# Run Cross-validation
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)

22/12/22 17:25:25 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:42 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:46 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:51 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:55 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:25:59 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:26:03 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:26:15 WARN D

0.5833333333333333

## Decision Tree Classifier Model:

In [None]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)

evaluator = BinaryClassificationEvaluator().setRawPredictionCol('prediction')
#evaluator = BinaryClassificationEvaluator(labelCol="label", featuresCol="features", maxDepth=2)
dtAccuracy = evaluator.evaluate(predictions)
print(dtAccuracy) 

22/12/22 17:28:39 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 22 (= number of training instances)
                                                                                

1.0


**Cross Validation Evaluation for Decision Tree Clasifier**

In [None]:
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
# Run Cross-validation
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)

22/12/22 17:29:00 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:13 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:18 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:22 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:26 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:31 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:39 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 13 (= number of training instances)
22/12/22 17:29:50 WARN D

1.0

## Support Vector Classifier Model:

In [None]:
# Define your classifier
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(train)

# Compute predictions for test data
predictions = lsvcModel.transform(test)

# Define the evaluator method with the corresponding metric and compute the classification error on test data
evaluator = BinaryClassificationEvaluator().setRawPredictionCol('prediction')
svmaccuracy = evaluator.evaluate(predictions) 

# Show the accuracy
print("Test accuracy = %g" % (svmaccuracy))

[Stage 4252:>                                                       (0 + 1) / 1]

Test accuracy = 0.75



                                                                                

**Cross Validation Evaluation of Support Vector Classifier**

In [None]:
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
# Run Cross-validation
cv = CrossValidator(estimator=lsvc, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)
# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)
# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)

                                                                                

0.75