In [None]:
!pip install pyspark #For google collab environment

In [None]:
#Create %Helpful column
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
import pandas as pd
# Create a SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Read the CSV file using pandas
df_pandas = pd.read_csv('/content/drive/MyDrive/Portfolio/Reviews.csv')

# Convert pandas DataFrame to PySpark DataFrame
df = spark.createDataFrame(df_pandas)

# Apply the transformation using withColumn
df = df.withColumn(
  'Helpful %',
  when(
    col('HelpfulnessDenominator') > 0,
    col('HelpfulnessNumerator') / col('HelpfulnessDenominator')
  ).otherwise(-1)
)

In [16]:
#Bining % Helpful
from pyspark.sql.functions import when, col
bins = [-1, 0, 0.2, 0.4, 0.6, 0.8, 1.0]
labels = ['Empty', '0-20%', '20-40%', '40-60%', '60-80%', '80-100%']
df = df.withColumn(
    '%upvote',
    when(col('Helpful %').isNull(), 'Empty')
    .when((col('Helpful %') >= bins[0]) & (col('Helpful %') < bins[1]) , labels[0])
    .when((col('Helpful %') >= bins[1]) & (col('Helpful %') < bins[2]) , labels[1])
    .when((col('Helpful %') >= bins[2]) & (col('Helpful %') < bins[3]) , labels[2])
    .when((col('Helpful %') >= bins[3]) & (col('Helpful %') < bins[4]) , labels[3])
    .when((col('Helpful %') >= bins[4]) & (col('Helpful %') < bins[5]) , labels[4])
    .when((col('Helpful %') >= bins[5]) & (col('Helpful %') < bins[6]) , labels[5])
    .otherwise('80-100%')
)
#Creat %upvote column
from pyspark.sql.functions import count

#df.groupBy("%upvote").agg(count("*").alias("count")).show(10)

In [18]:
# Drop 'Empty' in column %upvote
df_note = df.filter(col("%upvote").isNotNull())

In [None]:
from pyspark.sql.functions import count
# Group by 'Score' and '%upvote' columns and count the number of 'Id' values in each group
# Rename the aggregated column to 'Count'
df_s = df_note.groupby(['Score', '%upvote']).agg(count('Id').alias('count_id')).orderBy(['Score', '%upvote'])

#Create Heatmap to check correlation
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import seaborn as sns
import matplotlib.pyplot as plt

# group the data and pivot the Score column
df_p = df_s.groupBy('%upvote').pivot('Score').agg(sum('count_id')).fillna(0)

# convert the PySpark DataFrame to a Pandas DataFrame and sort by '%upvote'
df_p_sorted = df_p.toPandas().sort_values('%upvote')

# create the heatmap using the sorted Pandas DataFrame
sns.heatmap(df_p_sorted.set_index('%upvote'), annot=True, cmap='YlGnBu')
plt.title('How helpful users find among user scores (sorted by %upvote)')
plt.show()

In [None]:
#Modeling
# Check distinct Score values
df_note.select("Score").distinct().orderBy("Score").show()

In [None]:
#Countvecterizer and Logistic Regression
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator , MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml import Pipeline

# Assuming 'df' is a PySpark DataFrame containing 'Score' and 'Text' columns

# Filter out the rows with 'Score' equal to 3
df2 = df_note.filter(df['Score'] != 3)

# Create a dictionary to map the 'Score' values to binary labels
y_dict = {1:0, 2:0, 4:1, 5:1}
df2 = df2.withColumn('label', col('Score').cast('double')).na.fill(0.0, subset=['label'])
df2 = df2.replace(y_dict, subset='label')

# Split the data into training and testing sets
train, test = df2.randomSplit([0.8, 0.2], seed=2)

# Perform feature extraction using CountVectorizer
tokenizer = Tokenizer(inputCol="Text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")
count_vectorizer = CountVectorizer(inputCol=stopwords_remover.getOutputCol(), outputCol="features")
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer])
pipeline_fit = pipeline.fit(train)
train_features = pipeline_fit.transform(train).select(col('features'), col('label'))
test_features = pipeline_fit.transform(test).select(col('features'), col('label'))

# Train a Logistic Regression model
log_reg = LogisticRegression()
log_reg_fit = log_reg.fit(train_features)

# Evaluate the model on the test set
predictions = log_reg_fit.transform(test_features)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', metricName='areaUnderROC')
auc = evaluator.evaluate(predictions)
print('Model AUC:', auc)

# Assuming 'predictions' is a PySpark DataFrame containing 'label' and 'prediction' columns
avg_prediction = predictions.select(mean('prediction')).collect()[0][0]
print('Prediction Result:', avg_prediction)

# Compute the accuracy of the model
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Model Accuracy:', accuracy)

In [None]:
#TF-IDF and Logistic Regression
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator , MulticlassClassificationEvaluator

# Assuming 'df' is a PySpark DataFrame containing 'Score' and 'Text' columns

# Filter out the rows with 'Score' equal to 3
df2 = df_note.filter(df['Score'] != 3)

# Create a dictionary to map the 'Score' values to binary labels
y_dict = {1:0, 2:0, 4:1, 5:1}
df2 = df2.withColumn('label', col('Score').cast('double')).na.fill(0.0, subset=['label'])
df2 = df2.replace(y_dict, subset='label')
# Split the data into training and testing sets
train, test = df2.randomSplit([0.8, 0.2], seed=2)

# Perform feature extraction using TF-IDF
tokenizer = Tokenizer(inputCol="Text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")
hashing_tf = HashingTF(inputCol=stopwords_remover.getOutputCol(), outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf])
pipeline_fit = pipeline.fit(train)
train_features = pipeline_fit.transform(train).select(col('features'), col('label'))
test_features = pipeline_fit.transform(test).select(col('features'), col('label'))

# Train a Logistic Regression model
log_reg = LogisticRegression()
log_reg_fit = log_reg.fit(train_features)

# Evaluate the model on the test set
predictions = log_reg_fit.transform(test_features)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', metricName='areaUnderROC')
auc = evaluator.evaluate(predictions)

print('Model AUC:', auc)
from pyspark.sql.functions import mean

# Assuming 'predictions' is a PySpark DataFrame containing 'label' and 'prediction' columns
avg_prediction = predictions.select(mean('prediction')).collect()[0][0]
print('Prediction Result:', avg_prediction)

# Compute the accuracy of the model
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Model Accuracy:', accuracy)
