In [None]:
!pip install pyspark
!pip install sparkxgb
!pip install wordcloud
!pip install textblob




In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Import necessary libraries
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:

from pyspark import SparkConf, SparkContext, SQLContext
import re

from pyspark.mllib.evaluation import MulticlassMetrics
from sklearn.metrics import classification_report
import numpy as np

import nltk
from nltk.corpus import stopwords
from wordcloud import WordCloud,STOPWORDS
import matplotlib.pyplot as plt
import numpy as np

from sklearn.cluster import KMeans
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot as plt
%matplotlib inline

from sklearn.preprocessing import StandardScaler
import plotly.express as px

from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, IntegerType
from textblob import TextBlob


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# sc.stop()

In [None]:
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
train_rdd = sc.textFile("/content/drive/MyDrive/yelp_review_polarity_csv/train.csv")
test_rdd = sc.textFile("/content/drive/MyDrive/yelp_review_polarity_csv/test.csv")

# Convert to rdd
def split(line):
  label = int(line[1])
  sentence = line[5:-1]
  return label, sentence

train_rdd = train_rdd.map(split)
test_rdd = test_rdd.map(split)

# Get the first 10 rows
for row in train_rdd.take(10):
  print(row)      # label, text

(1, "Unfortunately, the frustration of being Dr. Goldberg's patient is a repeat of the experience I've had with so many other doctors in NYC -- good doctor, terrible staff.  It seems that his staff simply never answers the phone.  It usually takes 2 hours of repeated calling to get an answer.  Who has time for that or wants to deal with it?  I have run into this problem with many other doctors and I just don't get it.  You have office workers, you have patients with medical needs, why isn't anyone answering the phone?  It's incomprehensible and not work the aggravation.  It's with regret that I feel that I have to give Dr. Goldberg 2 stars.")
(2, "Been going to Dr. Goldberg for over 10 years. I think I was one of his 1st patients when he started at MHMG. He's been great over the years and is really all about the big picture. It is because of him, not my now former gyn Dr. Markoff, that I found out I have fibroids. He explores all options with you and is very patient and understanding. 

In [None]:
# Preprocess
nltk.download('stopwords')

# Remove special characters
pattern = r'[^a-zA-Z0-9\s]'     # Define a pattern that only includes whitespaces and alphanumeric characters
def remove_special_characters(text):
    text = re.sub(pattern, ' ', text)     # Replace characters not belonging to the pattern with whitespace
    return text.replace('\n', ' ')      # Replace '\n' with whitespace

# Remove indepedent numbers and stopwords
def is_number(s):
    if s.isdigit():
        return True
    try:
        float(s)
        return True
    except:
        return False
stop_words = set(stopwords.words('english'))
stop_words.add('')
def remove_numbers_and_stopwords(text):
    return " ".join([x for x in text.split(' ') if not is_number(x) and x not in stop_words])

# Remove abundant spaces
def remove_extra_spaces(text):
    return " ".join(text.split(' '))

# Lemmatization
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
def lemmatize(text):
    lemmatizer = WordNetLemmatizer()
    return " ".join([lemmatizer.lemmatize(word) for word in text.split(" ")])

def preprocess(text):
    text = remove_special_characters(text)
    text = text.lower()
    text = remove_numbers_and_stopwords(text)
    text = remove_extra_spaces(text)
    text = lemmatize(text)
    return text

# Remove rows with empty word lists
def filter_empty_and_none(row):
    return row[1] is not None and len(row[1]) > 0

train_preprocessed_rdd = (
    train_rdd
    .map(lambda x : (x[0], preprocess(x[1])))
    .filter(filter_empty_and_none)
)
test_preprocessed_rdd = (
    test_rdd
    .map(lambda x : (x[0], preprocess(x[1])))
    .filter(filter_empty_and_none)
)

for row in train_preprocessed_rdd.take(10):
  print(row)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


(1, 'unfortunately frustration dr goldberg patient repeat experience many doctor nyc good doctor terrible staff seems staff simply never answer phone usually take hour repeated calling get answer time want deal run problem many doctor get office worker patient medical need anyone answering phone incomprehensible work aggravation regret feel give dr goldberg star')
(2, 'going dr goldberg year think one 1st patient started mhmg great year really big picture former gyn dr markoff found fibroid explores option patient understanding judge asks right question thorough want kept loop every aspect medical health life')
(1, 'know dr goldberg like moving arizona let tell stay away doctor office going dr johnson left goldberg took johnson left caring doctor interested co pay come medication refill every month give refill could le patient financial situation trying get day mail away pharmacy prescription guy joke make matter even worse office staff incompetent time call office put voice mail one e

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema for DataFrame
schema = StructType([
    StructField("label", IntegerType(), True),
    StructField("text", StringType(), True)
])

# Convert RDDs to DataFrames with specified schema
train_df = train_preprocessed_rdd.toDF(schema=schema)
test_df = test_preprocessed_rdd.toDF(schema=schema)

# Now you can use train_df and test_df with column names
train_df.show(10)

+-----+--------------------+
|label|                text|
+-----+--------------------+
|    1|unfortunately fru...|
|    2|going dr goldberg...|
|    1|know dr goldberg ...|
|    1|writing review gi...|
|    2|food great best t...|
|    1|wing sauce like w...|
|    1|owning driving ra...|
|    1|place absolute ga...|
|    2|finally made rang...|
|    2|drove yesterday g...|
+-----+--------------------+
only showing top 10 rows



logistic regression - tfidf


In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from textblob import TextBlob

# Step 1: Prepare the Data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures", numFeatures=12)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")

# Additional Features: Sentiment Score
def analyze_sentiment(text):
    # Perform sentiment analysis using TextBlob
    blob = TextBlob(text)
    sentiment_score = blob.sentiment.polarity
    return sentiment_score

analyze_sentiment_udf = udf(analyze_sentiment, FloatType())
sentiment_data = train_df.withColumn("sentiment_score", analyze_sentiment_udf(train_df["text"]))

# Additional Feature: Word Length
word_length_udf = udf(lambda text: len(text.split()), IntegerType())
sentiment_data = sentiment_data.withColumn("word_length", word_length_udf(sentiment_data["text"]))

# Transformations
tokenized_data = tokenizer.transform(sentiment_data)
hashed_data = hashing_tf.transform(tokenized_data)
idf_data = idf.fit(hashed_data).transform(hashed_data)

# Selecting the columns to keep
combined_data = idf_data.select("label", "text", "sentiment_score", "word_length", "features")
combined_data.show(truncate=False)


+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when

# Define feature columns used for assembling features
feature_columns = ["sentiment_score", "word_length", "features"]

# Create a VectorAssembler to assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features")

# Transform the combined_data DataFrame using the VectorAssembler
assembled_data = assembler.transform(combined_data)

# Adjust labels for binary classification (e.g., map label 2.0 to 1.0)
adjusted_data = assembled_data.withColumn("label_adjusted", when(assembled_data["label"] == 2.0, 1.0).otherwise(0.0))

# Select the assembled features and adjusted label
final_data = adjusted_data.select("assembled_features", "label_adjusted")

# Define LogisticRegression
lr = LogisticRegression(
    labelCol="label_adjusted",
    featuresCol="assembled_features",
    maxIter=100,  # Number of iterations
    regParam=0.0,  # Regularization parameter (default is 0.0 for no regularization)
    elasticNetParam=0.0  # ElasticNet mixing parameter (default is 0.0 for L2 regularization)
)

# Train the LogisticRegression model
model = lr.fit(final_data)


In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import udf, length
from pyspark.sql.types import FloatType
from textblob import TextBlob

# Additional Features: Sentiment Score for Test Data
def analyze_sentiment(text):
    # Perform sentiment analysis using TextBlob
    blob = TextBlob(text)
    sentiment_score = blob.sentiment.polarity
    return sentiment_score

analyze_sentiment_udf = udf(analyze_sentiment, FloatType())
sentiment_test_data = test_df.withColumn("sentiment_score", analyze_sentiment_udf(test_df["text"]))

# Additional Features: Word Length for Test Data
word_length_udf = udf(lambda text: len(text.split()), IntegerType())
sentiment_test_data = sentiment_test_data.withColumn("word_length", word_length_udf(sentiment_test_data["text"]))

# Transformations for Test Data
tokenized_test_data = tokenizer.transform(sentiment_test_data)
hashed_test_data = hashing_tf.transform(tokenized_test_data)
idf_test_data = idf.fit(hashed_test_data).transform(hashed_test_data)

# Display the transformed test data
idf_test_data.show()
# Selecting the columns to keep
combined_data_test = idf_test_data.select("label", "text", "sentiment_score", "word_length", "features")
combined_data_test.show(truncate=True)

+-----+--------------------+---------------+-----------+--------------------+--------------------+--------------------+
|label|                text|sentiment_score|word_length|               words|         rawFeatures|            features|
+-----+--------------------+---------------+-----------+--------------------+--------------------+--------------------+
|    2|contrary review z...|     0.08611111|         54|[contrary, review...|(12,[0,1,2,3,4,5,...|(12,[0,1,2,3,4,5,...|
|    1|last summer appoi...|    0.016969698|         34|[last, summer, ap...|(12,[0,1,2,3,4,5,...|(12,[0,1,2,3,4,5,...|
|    2|friendly staff st...|     0.34166667|         11|[friendly, staff,...|(12,[0,2,5,6,7,8,...|(12,[0,2,5,6,7,8,...|
|    1|food good unfortu...|    0.033333335|         26|[food, good, unfo...|(12,[0,1,3,4,5,6,...|(12,[0,1,3,4,5,6,...|
|    2|even car filene b...|     0.18349282|         77|[even, car, filen...|(12,[0,1,2,3,4,5,...|(12,[0,1,2,3,4,5,...|
|    2|picture billy joe...|     0.30869

In [None]:
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Define feature columns used for assembling features
feature_columns = ["sentiment_score", "word_length", "features"]

# Assemble features into a vector for test data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features")
assembled_test_data = assembler.transform(combined_data_test)

# Adjust labels for binary classification (e.g., map label 2.0 to 1.0)
adjusted_data_test = assembled_test_data.withColumn("label_adjusted", when(assembled_test_data["label"] == 2.0, 1.0).otherwise(0.0))

# Select the assembled features and adjusted label
final_test_data = adjusted_data_test.select("assembled_features", "label_adjusted")

# Define LogisticRegression
lr = LogisticRegression(
    labelCol="label_adjusted",
    featuresCol="assembled_features",
    maxIter=100,  # Number of iterations
    regParam=0.0,  # Regularization parameter (default is 0.0 for no regularization)
    elasticNetParam=0.0  # ElasticNet mixing parameter (default is 0.0 for L2 regularization)
)

# Train the LogisticRegression on the test data
model = lr.fit(final_test_data)

# Optionally, you can use the trained model to make predictions on the test data
predictions = model.transform(final_test_data)

# Show predictions
predictions.show()


+--------------------+--------------+--------------------+--------------------+----------+
|  assembled_features|label_adjusted|       rawPrediction|         probability|prediction|
+--------------------+--------------+--------------------+--------------------+----------+
|[0.08611111342906...|           1.0|[0.98097627388103...|[0.72730188746248...|       0.0|
|[0.01696969754993...|           0.0|[1.45448806418241...|[0.81068819060867...|       0.0|
|[0.34166666865348...|           1.0|[-1.5920648967615...|[0.16909357872960...|       1.0|
|[0.03333333507180...|           0.0|[1.23050965497400...|[0.77390776336137...|       0.0|
|[0.18349282443523...|           1.0|[0.12548479481737...|[0.53133009804688...|       0.0|
|[0.30869317054748...|           1.0|[-1.6791288932344...|[0.15721085223772...|       1.0|
|[-0.1809523850679...|           0.0|[2.92921669121875...|[0.94927196834221...|       0.0|
|[0.12257539480924...|           0.0|[0.85548291917601...|[0.70171604358158...|       0.0|

In [None]:
# Step 6: Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label_adjusted", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = ", accuracy)

Accuracy =  0.7912784883414916


In [None]:
# F1 score evaluation
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_adjusted", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print("F1 Score = ", f1_score)

F1 Score =  0.79127612000083


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert predictions to RDD of (prediction, label) pairs
prediction_and_label = predictions.select("prediction", "label_adjusted").rdd.map(lambda row: (float(row.prediction), float(row.label_adjusted)))

# Instantiate MulticlassMetrics
metrics = MulticlassMetrics(prediction_and_label)

# Calculate precision, recall, and F1-score for each class
precision = {}
recall = {}
f1Score = {}

# Define the classes for which you want to calculate metrics
classes = [0.0, 1.0]  # Assuming binary classification with adjusted labels

for label in classes:
    precision[label] = metrics.precision(label)
    recall[label] = metrics.recall(label)
    f1Score[label] = metrics.fMeasure(label)

# Print the classification report
print("Precision by class:")
for label in classes:
    print("Class %s: %s" % (label, precision[label]))

print("Recall by class:")
for label in classes:
    print("Class %s: %s" % (label, recall[label]))

print("F1 Score by class:")
for label in classes:
    print("Class %s: %s" % (label, f1Score[label]))


Precision by class:
Class 0.0: 0.7932277038842669
Class 1.0: 0.7893553615308203
Recall by class:
Class 0.0: 0.7879250447415518
Class 1.0: 0.7946315789473685
F1 Score by class:
Class 0.0: 0.7905674826375136
Class 1.0: 0.79198468277074


In [None]:

# # Optionally, you can save the model for future use
model.save("/content/drive/MyDrive/big data models/logreg_tfidf_addfeat_model")


In [None]:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.feature import VectorAssembler

# Assuming 'model' is your trained LogisticRegressionModel
assert isinstance(model, LogisticRegressionModel)

# Define feature columns used for assembling features
feature_columns = ["sentiment_score", "word_length", "features"]

# Create a VectorAssembler to assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features")
assembled_data = assembler.transform(combined_data)

# Get coefficients (weights) from the Logistic Regression model
coefficients = model.coefficients

# Get the names of input features
input_features = assembled_data.schema["assembled_features"].metadata["ml_attr"]["attrs"]["numeric"]

# Create a mapping of feature index to feature name
feature_index_to_name = {int(feature["idx"]): feature["name"] for feature in input_features}

# Create a list of (feature_name, coefficient) tuples
feature_coefficient_list = [(feature_index_to_name[idx], coefficient) for idx, coefficient in enumerate(coefficients)]

# Sort feature coefficient list by absolute coefficient value (descending order)
feature_coefficient_list_sorted = sorted(feature_coefficient_list, key=lambda x: -abs(x[1]))

# Print feature coefficient information
print("Feature Coefficients:")
for feature_name, coefficient in feature_coefficient_list_sorted:
    print(f"{feature_name}: {coefficient:.4f}")

# Feature with Highest Positive Coefficient
highest_positive_feature = feature_coefficient_list_sorted[0][0]
highest_positive_coefficient = feature_coefficient_list_sorted[0][1]
print("\nFeature with Highest Positive Coefficient:", highest_positive_feature)
print("Coefficient Value:", highest_positive_coefficient)

# Feature with Highest Negative Coefficient
highest_negative_feature = feature_coefficient_list_sorted[-1][0]
highest_negative_coefficient = feature_coefficient_list_sorted[-1][1]
print("\nFeature with Highest Negative Coefficient:", highest_negative_feature)
print("Coefficient Value:", highest_negative_coefficient)


Feature Coefficients:
sentiment_score: 8.6019
features_6: 0.5821
features_3: -0.4861
features_5: 0.4144
features_7: -0.4129
features_1: -0.3058
features_2: -0.2853
features_11: 0.2284
features_10: -0.1544
features_8: 0.1401
features_4: 0.1180
features_9: 0.0998
features_0: 0.0438
word_length: -0.0002

Feature with Highest Positive Coefficient: sentiment_score
Coefficient Value: 8.601862357083315

Feature with Highest Negative Coefficient: word_length
Coefficient Value: -0.00015253374904812298


log reg - word 2 vec


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, IntegerType
from pyspark.ml.feature import Tokenizer, Word2Vec, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Step 1: Prepare the Data
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Additional Features: Sentiment Score
def analyze_sentiment(text):
    # Perform sentiment analysis using TextBlob
    blob = TextBlob(text)
    sentiment_score = blob.sentiment.polarity
    return sentiment_score

analyze_sentiment_udf = udf(analyze_sentiment, FloatType())
sentiment_data = train_df.withColumn("sentiment_score", analyze_sentiment_udf(train_df["text"]))

# Additional Feature: Word Length
word_length_udf = udf(lambda text: len(text.split()), IntegerType())
sentiment_data = sentiment_data.withColumn("word_length", word_length_udf(sentiment_data["text"]))

# Tokenize Text
tokenized_data = tokenizer.transform(sentiment_data)

# Step 2: Apply Word2Vec Model
word2vec = Word2Vec(vectorSize=100, seed=42, inputCol="words", outputCol="word2vec_features")
word2vec_model = word2vec.fit(tokenized_data)
word2vec_data = word2vec_model.transform(tokenized_data)

# Step 3: Select Relevant Columns
selected_columns = word2vec_data.select("label", "text", "sentiment_score", "word_length", "word2vec_features")

selected_columns.show(truncate=False)



+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when

# Define feature columns used for assembling features
feature_columns = ["sentiment_score", "word_length", "word2vec_features"]

# Create a VectorAssembler to assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features")

# Transform the combined_data DataFrame using the VectorAssembler
assembled_data = assembler.transform(selected_columns)

# Adjust labels for binary classification (e.g., map label 2.0 to 1.0)
adjusted_data = assembled_data.withColumn("label_adjusted", when(assembled_data["label"] == 2.0, 1.0).otherwise(0.0))

# Select the assembled features and adjusted label
final_data = adjusted_data.select("assembled_features", "label_adjusted")

# Define LogisticRegression
lr = LogisticRegression(
    labelCol="label_adjusted",
    featuresCol="assembled_features",
    maxIter=100,  # Number of iterations
    regParam=0.0,  # Regularization parameter (default is 0.0 for no regularization)
    elasticNetParam=0.0  # ElasticNet mixing parameter (default is 0.0 for L2 regularization)
)

# Train the LogisticRegression model
model = lr.fit(final_data)


In [None]:
# Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")

# Sentiment Score Function
def analyze_sentiment(text):
    blob = TextBlob(text)
    sentiment_score = blob.sentiment.polarity
    return sentiment_score

# UDF for Sentiment Analysis
analyze_sentiment_udf = udf(analyze_sentiment, FloatType())

# Apply Sentiment Analysis to Test Data
test_data_with_sentiment = test_df.withColumn("sentiment_score", analyze_sentiment_udf(test_df["text"]))

# Word Length UDF
word_length_udf = udf(lambda text: len(text.split()), IntegerType())

# Apply Word Length UDF to Test Data
test_data_with_word_length = test_data_with_sentiment.withColumn("word_length", word_length_udf(test_data_with_sentiment["text"]))

# Tokenize Test Data
tokenized_test_data = tokenizer.transform(test_data_with_word_length)

# Word2Vec Model
word2vec = Word2Vec(vectorSize=200, seed=42, inputCol="words", outputCol="word2vec")

# Fit Word2Vec Model on Training Data
word2vec_model = word2vec.fit(tokenized_test_data)

# Transform Test Data using Word2Vec Model
word2vec_test_data = word2vec_model.transform(tokenized_test_data)

# Select Columns to Keep
selected_columns_test = word2vec_test_data.select("label", "text", "sentiment_score", "word_length", "word2vec")

# Show the processed test data
selected_columns_test.show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when

# Define Logistic Regression classifier
lr = LogisticRegression(labelCol="label_adjusted", featuresCol="assembled_features", maxIter=10)

# Select feature columns for VectorAssembler (excluding 'label' and other non-feature columns)
feature_columns = ["sentiment_score", "word_length", "word2vec"]

# Assemble features into a vector for test data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="assembled_features")
assembled_test_data = assembler.transform(selected_columns_test)

# Adjust labels for binary classification (e.g., map label 2.0 to 1.0)
adjusted_data_test = assembled_test_data.withColumn("label_adjusted", when(assembled_test_data["label"] == 2.0, 1.0).otherwise(0.0))

# Select the assembled features and adjusted label
final_test_data = adjusted_data_test.select("assembled_features", "label_adjusted")

# Train the Logistic Regression classifier on the test data
model = lr.fit(final_test_data)

# Optionally, use the trained model to make predictions on the test data
predictions = model.transform(final_test_data)

# Show predictions
predictions.show()


+--------------------+--------------+--------------------+--------------------+----------+
|  assembled_features|label_adjusted|       rawPrediction|         probability|prediction|
+--------------------+--------------+--------------------+--------------------+----------+
|[0.08611111342906...|           1.0|[4.49591592776259...|[0.98896859015573...|       0.0|
|[0.01696969754993...|           0.0|[2.75773438124127...|[0.94034867528662...|       0.0|
|[0.34166666865348...|           1.0|[-3.6610430606701...|[0.02506146401921...|       1.0|
|[0.03333333507180...|           0.0|[3.07036576085079...|[0.95565367583973...|       0.0|
|[0.18349282443523...|           1.0|[-0.0459357603145...|[0.48851807884485...|       1.0|
|[0.30869317054748...|           1.0|[-2.3911018748869...|[0.08385374451691...|       1.0|
|[-0.1809523850679...|           0.0|[9.38279257713060...|[0.99991584723109...|       0.0|
|[0.12257539480924...|           0.0|[2.53874084412094...|[0.92681346358522...|       0.0|

In [None]:
# Step 6: Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="label_adjusted", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = ", accuracy)

In [None]:
# F1 score evaluation
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_adjusted", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print("F1 Score = ", f1_score)

F1 Score =  0.8851494694826213


In [None]:

# # Optionally, you can save the model for future use
model.save("/content/drive/MyDrive/big data models/logreg_word2vec_addfeat_model2")

In [None]:
from pyspark.ml.classification import LogisticRegressionModel

# Assuming 'model' is your trained LogisticRegressionModel
if isinstance(model, LogisticRegressionModel):
    # Get feature columns used in training
    feature_columns = [col_name for col_name in final_data.columns if col_name != 'label']

    # Get coefficients (weights) from the Logistic Regression model
    coefficients = model.coefficients.toArray()

    # Combine feature names with coefficients
    feature_coefficient_list = list(zip(feature_columns, coefficients))

    # Sort feature coefficient list by absolute coefficient value (descending order)
    feature_coefficient_list_sorted = sorted(feature_coefficient_list, key=lambda x: -abs(x[1]))

    # Print feature coefficient information
    print("Total Number of Features:", len(feature_columns))
    print("\nFeature Coefficients:")
    for feature, coefficient in feature_coefficient_list_sorted:
        print(f"{feature}: {coefficient:.4f}")

    # Feature with Highest Positive Coefficient
    highest_positive_feature = feature_coefficient_list_sorted[0][0]
    highest_positive_coefficient = feature_coefficient_list_sorted[0][1]
    print("\nFeature with Highest Positive Coefficient:", highest_positive_feature)
    print("Coefficient Value:", highest_positive_coefficient)

    # Feature with Highest Negative Coefficient
    highest_negative_feature = feature_coefficient_list_sorted[-1][0]
    highest_negative_coefficient = feature_coefficient_list_sorted[-1][1]
    print("\nFeature with Highest Negative Coefficient:", highest_negative_feature)
    print("Coefficient Value:", highest_negative_coefficient)

else:
    # If the model is not an instance of LogisticRegressionModel, handle appropriately
    raise ValueError("The provided model is not an instance of LogisticRegressionModel.")


Total Number of Features: 2

Feature Coefficients:
assembled_features: 2.4787
label_adjusted: 0.0003

Feature with Highest Positive Coefficient: assembled_features
Coefficient Value: 2.478655080017059

Feature with Highest Negative Coefficient: label_adjusted
Coefficient Value: 0.000270953347186719
Total Number of Features: 2

Feature Coefficients:
assembled_features: 2.4787
label_adjusted: 0.0003

Feature with Highest Positive Coefficient: assembled_features
Coefficient Value: 2.478655080017059

Feature with Highest Negative Coefficient: label_adjusted
Coefficient Value: 0.000270953347186719
