# 1. Import Libraries

In [7]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col, concat_ws
from pyspark.sql.types import FloatType, StringType
import pandas as pd
import numpy as np
import re
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

# IInitialize Spark Session and Context

In [8]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Customer Reviews Analysis") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()


# Initialize Spark Context
sc = SparkContext.getOrCreate(SparkConf().setAppName("New App"))

In [9]:
spark = SparkSession.builder \
    .appName("Customer Reviews Analysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "8g") \
    .getOrCreate()

# 3. Read Data

In [10]:
# Read data from HDFS
test_data = spark.read.csv("hdfs://localhost:9000/user/hduser/CustomerReviews/test/test.csv", header=True, inferSchema=True)
train_data = spark.read.csv("hdfs://localhost:9000/user/hduser/CustomerReviews/train/train.csv", header=True, inferSchema=True)

                                                                                

# 4. Preprocess Data

In [11]:
# Rename columns in test_data and train_data
test_data = test_data.withColumnRenamed("1", "rating") \
                     .withColumnRenamed("mens ultrasheer", "title") \
                     .withColumnRenamed("This model may be ok for sedentary types, but I'm active and get around alot in my job - consistently found these stockings rolled up down by my ankles! Not Good!! Solution: go with the standard compression stocking, 20-30, stock #114622. Excellent support, stays up and gives me what I need. Both pair of these also tore as I struggled to pull them up all the time. Good riddance/bad investment!", "review_text")

train_data = train_data.withColumnRenamed('3', 'rating') \
                       .withColumnRenamed('more like funchuck', 'title') \
                       .withColumnRenamed('"Gave this to my dad for a gag gift after directing ""Nunsense', 'review_part1') \
                       .withColumnRenamed('"" he got a reall kick out of it!"', 'review_part2')

In [12]:
# Handle missing values for essential columns
train_data = train_data.na.drop(subset=["rating", "title", "review_part1", "review_part2"])
test_data = test_data.na.drop(subset=["rating", "title", "review_text"])


In [13]:
# Concatenate review_part1 and review_part2 into a single column
from pyspark.sql.functions import concat_ws

train_data = train_data.withColumn("review_text", concat_ws(" ", col("review_part1"), col("review_part2")))


In [14]:
# Initialize Tokenizer
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")

# Apply Tokenization on both test and train data
train_data = tokenizer.transform(train_data)
test_data = tokenizer.transform(test_data)


In [15]:
# Initialize HashingTF and IDF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Create a pipeline for TF-IDF transformation
tf_idf_pipeline = Pipeline(stages=[hashingTF, idf])

# Fit and Transform the pipeline on the train and test data
tf_idf_model = tf_idf_pipeline.fit(train_data)
train_data = tf_idf_model.transform(train_data)
test_data = tf_idf_model.transform(test_data)


                                                                                

In [16]:
train_data = train_data.repartition(100)
test_data = test_data.repartition(100)

In [17]:
# Initialize Logistic Regression Model
lr = LogisticRegression(labelCol="rating", featuresCol="features")

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)


2023-09-22 13:19:46,685 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:20:22,469 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.0 MiB
2023-09-22 13:20:42,388 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:20:44,786 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:20:46,488 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:21:06,486 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:21:11,085 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2023-09-22 13:21:11,087 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2023-09-22 13:21:11,456 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
2023-09-22 13:21:41,723 WARN scheduler.DAGSchedul

2023-09-22 13:23:09,341 WARN scheduler.TaskSetManager: Lost task 8.0 in stage 26.0 (TID 621) (fabio-poli-vm executor driver): TaskKilled (Stage cancelled)
2023-09-22 13:23:09,351 ERROR util.Instrumentation: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 26.0 failed 1 times, most recent failure: Lost task 6.0 in stage 26.0 (TID 619) (fabio-poli-vm executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.lang.reflect.Array.newInstance(Array.java:75)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2083)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.readObject(ObjectInputStream

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/IPython/core/interactiveshell.py", line 3457, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_7965/3753968853.py", line 5, in <module>
    lr_model = lr.fit(train_data)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 161, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 339, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 336, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_retur

ConnectionRefusedError: [Errno 111] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [None]:
# Convert Spark DataFrame to Pandas DataFrame for PyTorch
train_pdf = sampled_train_data.select("features", "rating").toPandas()
test_pdf = sampled_test_data.select("features", "rating").toPandas()


#### Prepare Data for LSTM

In [None]:
import numpy as np

# Convert to NumPy arrays and then to PyTorch tensors
X_train = np.array(train_pdf['features'].tolist())
y_train = np.array(train_pdf['rating'].tolist())

X_test = np.array(test_pdf['features'].tolist())
y_test = np.array(test_pdf['rating'].tolist())

X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test)

#### Create LSTM model in PyTorch

In [None]:
# Define the LSTM Model
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim)
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x.view(len(x), 1, -1))
        linear_out = self.linear(lstm_out.view(len(x), -1))
        y_pred = self.sigmoid(linear_out)
        return y_pred

# Initialize model, loss, optimizer
model = LSTMModel(input_dim=X_train_tensor.shape[1], hidden_dim=50)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Train the Model

In [None]:
# Training the Model
epochs = 10
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train_tensor)
    loss = criterion(y_pred, y_train_tensor)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item()}")

# Evaluate the model

In [None]:
# Evaluate the Model
model.eval()
y_test_pred = model(X_test_tensor)
y_test_pred = [1 if pred >= 0.5 else 0 for pred in y_test_pred.detach().numpy()]

print("Accuracy:", accuracy_score(y_test, y_test_pred))
print("F1 Score:", f1_score(y_test, y_test_pred))
print("ROC AUC Score:", roc_auc_score(y_test, y_test_pred))


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/hduser/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
