In [3]:
# Import other modules not related to PySpark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline


In [11]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains, udf\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()
filename_data = 'take_500K.csv'
# Load the main data set into pyspark data frame 
df = spark.read.options(delimiter=",", header=True).csv(filename_data)
# df = spark.read.csv(filename_data)
# df = pd.read_csv(filename_data)
# print(df)
# df.drop(col('permalink'))



In [12]:
df = df.drop('_c0')
df = df.withColumn('sentiment', df['sentiment'].cast('float'))
df = df.withColumn('score', df['score'].cast('int'))
df = df.na.drop()

df.show()

+--------------------+-----------+--------------------+---------+-----+
|             subName|created_utc|                body|sentiment|score|
+--------------------+-----------+--------------------+---------+-----+
|                nova| 2021-10-25|When you schedule...|      0.0|    2|
|           vancouver| 2021-10-25|Didn't stop price...|   0.1946|   11|
|            pregnant| 2021-10-25|I’m just waiting ...|   0.1946|   11|
|            startrek| 2021-10-25|*The first duty o...|   0.1946|   11|
|       entertainment| 2021-10-25|"""Sheeran didn't...|   0.1946|   11|
|          conspiracy| 2021-10-25|I see a lot of po...|   0.1946|   11|
|                nova| 2021-10-25|Unfortunately the...|   0.1946|   11|
|      torontoraptors| 2021-10-25|Idk if they have ...|  -0.0516|    3|
|               rvvtf| 2021-10-25|In terms of enrol...|   0.1946|   11|
|         redscarepod| 2021-10-25|Lol it’s kinda fu...|   0.4417|    6|
|          ukpolitics| 2021-10-25|It's the deadlies...|   0.1946

In [13]:
from pyspark.ml.feature import VectorAssembler

# Создание вектора признаков
feature_assembler  = VectorAssembler(inputCols=['sentiment', 'score'], outputCol='features')
X = feature_assembler.transform(df)
y = X.select("score")

In [14]:
# Разделение данных на тренировочный и тестовый наборы
train_data, test_data = X.randomSplit([0.7, 0.3], seed=42)
train_y, test_y = y.randomSplit([0.7, 0.3], seed=42)

In [15]:
train_data.count()

341932

In [16]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Создание и обучение модели линейной регрессии
lr = LinearRegression(featuresCol="features", labelCol="score", maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[feature_assembler, lr])
model = lr.fit(train_data)

print("Coefficients: %s" % model.coefficients)
print("Intercept: %s" % model.intercept)

Coefficients: [0.0,0.9204109254423848]
Intercept: 0.7259820461275553


In [17]:
predictions = model.transform(test_data)
predictions.show()

+-----------------+-----------+--------------------+---------+-----+--------------------+------------------+
|          subName|created_utc|                body|sentiment|score|            features|        prediction|
+-----------------+-----------+--------------------+---------+-----+--------------------+------------------+
|              196| 2021-10-24|In China if you h...|   0.1946|   11|[0.19460000097751...|10.850502225993788|
|              196| 2021-10-25|White’s b1 knight...|   0.1946|   11|[0.19460000097751...|10.850502225993788|
|        2007scape| 2021-10-24|Why March 2021? O...|      0.0|    2|           [0.0,2.0]| 2.566803897012325|
|        2007scape| 2021-10-25|I got my first 99...|      0.0|    2|           [0.0,2.0]| 2.566803897012325|
|        2007scape| 2021-10-25|he prob started p...|   0.2023|    5|[0.20229999721050...| 5.328036673339479|
|    2american4you| 2021-10-25|&gt;Obviously I h...|   0.1946|   11|[0.19460000097751...|10.850502225993788|
|      2balkan4you|

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
for metric in ["mse", "rmse", "mae", "var", "r2"]:
    regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="score", metricName=metric)
    calculated_metric = regression_evaluator.evaluate(predictions)
    print(f"{metric}: {calculated_metric}") # метрики

mse: 0.0871770930524805
rmse: 0.2952576723007897
mae: 0.23694068094600254
var: 11.658915784868238
r2: 0.9936655668878099


Random Forest

In [19]:
df = spark.read.options(delimiter=",", header=True).csv(filename_data)
df = df.drop('_c0')
df = df.withColumn('sentiment', df['sentiment'].cast('float'))
df = df.withColumn('score', df['score'].cast('int'))
df = df.na.drop()

df.show()

+--------------------+-----------+--------------------+---------+-----+
|             subName|created_utc|                body|sentiment|score|
+--------------------+-----------+--------------------+---------+-----+
|                nova| 2021-10-25|When you schedule...|      0.0|    2|
|           vancouver| 2021-10-25|Didn't stop price...|   0.1946|   11|
|            pregnant| 2021-10-25|I’m just waiting ...|   0.1946|   11|
|            startrek| 2021-10-25|*The first duty o...|   0.1946|   11|
|       entertainment| 2021-10-25|"""Sheeran didn't...|   0.1946|   11|
|          conspiracy| 2021-10-25|I see a lot of po...|   0.1946|   11|
|                nova| 2021-10-25|Unfortunately the...|   0.1946|   11|
|      torontoraptors| 2021-10-25|Idk if they have ...|  -0.0516|    3|
|               rvvtf| 2021-10-25|In terms of enrol...|   0.1946|   11|
|         redscarepod| 2021-10-25|Lol it’s kinda fu...|   0.4417|    6|
|          ukpolitics| 2021-10-25|It's the deadlies...|   0.1946

In [20]:
# Define the categorical feature (hashtags)
categoricalCols = ["sentiment"]

# Define the numerical features
numericalCols = ["score"]

In [35]:
from pyspark.sql.functions import when
df = df.withColumn('sentiment', when(df['sentiment'] > 0.1, lit(1)).otherwise(lit(0)))
df.show()

+--------------------+-----------+--------------------+---------+-----+
|             subName|created_utc|                body|sentiment|score|
+--------------------+-----------+--------------------+---------+-----+
|                nova| 2021-10-25|When you schedule...|        0|    2|
|           vancouver| 2021-10-25|Didn't stop price...|        0|   11|
|            pregnant| 2021-10-25|I’m just waiting ...|        0|   11|
|            startrek| 2021-10-25|*The first duty o...|        0|   11|
|       entertainment| 2021-10-25|"""Sheeran didn't...|        0|   11|
|          conspiracy| 2021-10-25|I see a lot of po...|        0|   11|
|                nova| 2021-10-25|Unfortunately the...|        0|   11|
|      torontoraptors| 2021-10-25|Idk if they have ...|        0|    3|
|               rvvtf| 2021-10-25|In terms of enrol...|        0|   11|
|         redscarepod| 2021-10-25|Lol it’s kinda fu...|        0|    6|
|          ukpolitics| 2021-10-25|It's the deadlies...|        0

In [36]:
from pyspark.ml.feature import StringIndexer

# Convert the categorical feature into a numerical feature using string indexing
indexer = StringIndexer(inputCols=categoricalCols, outputCols=[c + "Index" for c in categoricalCols])
indexed = indexer.fit(df).transform(df)

In [37]:
# Combine the numerical and numerical features into a single vector column
assembler = VectorAssembler(inputCols=numericalCols, outputCol="features")
output = assembler.transform(indexed)

In [38]:
# Split the data into training and test sets (70%/30%)
train, test = output.randomSplit([0.7, 0.3])

In [39]:
major_df = train.filter(col("sentiment") == 0)
minor_df = train.filter(col("sentiment") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 341421


In [40]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import array

sampled_majority_df = major_df.sample(False, 1/ratio)
train = sampled_majority_df.unionAll(minor_df)

a = range(ratio)

oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')# combine both oversampled minority rows and previous majority rows 
train = major_df.unionAll(oversampled_df)
train.show()

+-----------------+-----------+--------------------+---------+-----+--------------+--------+
|          subName|created_utc|                body|sentiment|score|sentimentIndex|features|
+-----------------+-----------+--------------------+---------+-----+--------------+--------+
|     1200isplenty| 2021-10-25|I never thought I...|        0|   11|           0.0|  [11.0]|
|            14ers| 2021-10-25|Good to see your ...|        0|   11|           0.0|  [11.0]|
|              196| 2021-10-25|B I G S C R E E N...|        0|   11|           0.0|  [11.0]|
|              196| 2021-10-25|True but I'd rath...|        0|    9|           0.0|   [9.0]|
|              196| 2021-10-25|Why won't you deb...|        0|   11|           0.0|  [11.0]|
|        2007scape| 2021-10-25|I’m not sad [depr...|        0|   11|           0.0|  [11.0]|
|        2007scape| 2021-10-25|No, it was made p...|        0|   16|           0.0|  [16.0]|
|        2007scape| 2021-10-25|What caused the c...|        0|   11|  

In [41]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="sentiment", featuresCol="features", numTrees=100)

In [42]:
# Train the model on the training data
model = rf.fit(train)

In [43]:
# Make predictions on the test data
predictions = model.transform(test) 


In [45]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate the model using binary classification metrics
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="sentiment")
auc = evaluator.evaluate(predictions)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="sentiment", metricName="areaUnderPR")
apr = evaluator.evaluate(predictions)
print("Area Under ROC (test) =", auc)

rfSummary = model.summary
print("Area under ROC (train): ", rfSummary.areaUnderROC)

Area Under ROC (test) = 0.0
Area under ROC (train):  1.0
