In [None]:
# File location and type
file_location = "/FileStore/tables/data_f.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.show(5)

+--------------------+--------------------+--------------------+
|             overall|          reviewText|             summary|
+--------------------+--------------------+--------------------+
|                   5|The stained glass...|           Nice book|
|                   5|My 11 y.o. loved ...|                null|
|Dragons and Wizar...| that make it ""s...| or anything else...|
|Even the perfecti...| so it tends to b...|      Great pictures|
|                   5|The pictures are ...|The pictures are ...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col

df = df.filter(~col("overall").rlike('\D'))
df.show(5)

+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|      5|The stained glass...|           Nice book|
|      5|My 11 y.o. loved ...|                null|
|      5|The pictures are ...|The pictures are ...|
|      5|I absolutely love...|       So beautiful!|
|      5|          I love it!|          Five Stars|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
df.count()

Out[116]: 1827525

In [None]:
# Load the data
from pyspark.sql.functions import col

data = df.select(col("reviewText"), col("overall"))

In [None]:
from pyspark.sql.functions import col, sum

# Count the number of null values in each column
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

# Display the result
null_counts.show()


+----------+-------+
|reviewText|overall|
+----------+-------+
|         9|      0|
+----------+-------+



In [None]:
# drop any rows with null values
data = data.na.drop()

In [None]:
from pyspark.sql.functions import col, sum

# Count the number of null values in each column
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

# Display the result
null_counts.show()


+----------+-------+
|reviewText|overall|
+----------+-------+
|         0|      0|
+----------+-------+



In [None]:
data.count()

Out[121]: 1827516

In [None]:
df = df.limit(100000)
df = df.sample(False, 0.1, seed=42).limit(100000)


In [None]:
df.printSchema()
df.show(5)

root
 |-- overall: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)

+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|      4|                cool|          Four Stars|
|      3|This is pretty mu...|This is pretty mu...|
|      4|its a cute little...|                Tiny|
|      5|Perfect for that ...|Entertains A 5 Ye...|
|      5|Great product. To...|             Awesome|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col
df = df.withColumn("overall", col("overall").cast("float"))


In [None]:
df.printSchema()
df.show(5)

root
 |-- overall: float (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- summary: string (nullable = true)

+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|    4.0|                cool|          Four Stars|
|    3.0|This is pretty mu...|This is pretty mu...|
|    4.0|its a cute little...|                Tiny|
|    5.0|Perfect for that ...|Entertains A 5 Ye...|
|    5.0|Great product. To...|             Awesome|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
import nltk
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...


Out[45]: True

In [None]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
# Define regex pattern to remove unwanted characters
pattern = r"[^a-zA-Z0-9]"

# Define stopword removal function
def stopword_removal(text):
    # Lowercase text
    text = text.lower()
    # Remove unwanted characters
    text = re.sub(pattern, " ", text)
    # Tokenize text
    tokens = text.split()
    # Remove stopwords
    stop_words = set(stopwords.words("english"))
    tokens = [w for w in tokens if not w in stop_words]
    # Lemmatize words
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(w) for w in tokens]
    # Join tokens back into text
    text = " ".join(tokens)
    return text

# Register stopword_removal function as a UDF
stopword_removal_udf = udf(stopword_removal, StringType())

# Apply preprocessing steps to sparkDF
preprocessedDF = df.select(
    "overall",
    stopword_removal_udf("reviewText").alias("reviewText"),
    stopword_removal_udf("summary").alias("summary")
)

# Show first 5 rows of preprocessedDF
preprocessedDF.show(5)


+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|    4.0|                cool|           four star|
|    3.0|pretty much avera...|pretty much avera...|
|    4.0|cute little book ...|                tiny|
|    5.0|perfect long airp...|entertains 5 year...|
|    5.0|great product too...|             awesome|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
# Define regex pattern to remove unwanted characters
pattern = r"[^a-zA-Z0-9]"

# Define stopword removal function
def stopword_removal(text):
    # Lowercase text
    text = text.lower()
    # Remove unwanted characters
    text = re.sub(pattern, " ", text)
    # Tokenize text
    tokens = text.split()
    # Remove stopwords
    stop_words = set(stopwords.words("english"))
    tokens = [w for w in tokens if not w in stop_words]
    # Lemmatize words
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(w) for w in tokens]
    # Join tokens back into text
    text = " ".join(tokens)
    return text

# Register stopword_removal function as a UDF
stopword_removal_udf = udf(stopword_removal, StringType())

# Apply preprocessing steps to sparkDF
preprocessedDF = df.select(
    "overall",
    stopword_removal_udf("reviewText").alias("reviewText"),
    stopword_removal_udf("summary").alias("summary")
)

# Show first 5 rows of preprocessedDF
preprocessedDF.show(5)


+-------+--------------------+--------------------+
|overall|          reviewText|             summary|
+-------+--------------------+--------------------+
|    4.0|                cool|           four star|
|    3.0|pretty much avera...|pretty much avera...|
|    4.0|cute little book ...|                tiny|
|    5.0|perfect long airp...|entertains 5 year...|
|    5.0|great product too...|             awesome|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Import necessary libraries
from pyspark.sql.functions import avg
from pyspark.sql.functions import explode
from pyspark.sql.functions import lower

# Show the schema of the DataFrame
preprocessedDF.printSchema()

# Count the number of rows in the DataFrame
print("Total number of rows: ", preprocessedDF.count())

# Calculate the average rating
avg_rating = preprocessedDF.select(avg("overall")).first()[0]
print("Average rating: ", avg_rating)

# Count the number of reviews for each rating
rating_count = preprocessedDF.groupBy("overall").count().orderBy("overall")
rating_count.show()

# Show the top 10 most frequent words in the reviewText column
#words = preprocessedDF.select(explode(split(lower("reviewText"), "\W+")).alias("word"))
#top_words = words.filter(length("word") > 2).groupBy("word").count().orderBy(desc("count")).limit(10)
#top_words.show()

from pyspark.sql.functions import length, desc, lower, explode, split

# Create a column with all words in reviewText
words = preprocessedDF.select(explode(split(lower(preprocessedDF.reviewText), "\W+")).alias("word"))

# Filter words that have less than 3 characters
words_filtered = words.filter(length("word") > 2)

# Get the top 10 most frequent words
top_words = words_filtered.groupBy("word").count().orderBy(desc("count")).limit(10)

# Show the results
top_words.show()










root
 |-- overall: double (nullable = true)
 |-- reviewText: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- summary: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- sentiment: string (nullable = false)

Total number of rows:  9944
Average rating:  4.473049074818986
+-------+-----+
|overall|count|
+-------+-----+
|    1.0|  336|
|    2.0|  303|
|    3.0|  739|
|    4.0| 1511|
|    5.0| 7054|
|    7.0|    1|
+-------+-----+

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-681034656033422>", line 28, in <module>
    words = preprocessedDF.select(explode(split(lower(preprocessedDF.reviewText), "\W+")).alias("word"))
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 3023, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 234, in deco
    raise converted from None
pyspark.errors.exceptions.AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "lower(reviewText)" due to data 



In [None]:
%Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec, RegexTokenizer

# Define a regular expression tokenizer to split the text into words
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")

# Apply the tokenizer to the data
words_df = tokenizer.transform(df)

# Learn a Word2Vec model on the text data
word2vec = Word2Vec(vectorSize=100, minCount=5, inputCol="words", outputCol="features")
word2vec_model = word2vec.fit(words_df)

# Transform the data using the Word2Vec model
word2vec_df = word2vec_model.transform(words_df)

# Split the data into training and test sets
(training_data, test_data) = word2vec_df.randomSplit([0.7, 0.3])

# Train a Logistic Regression model on the data
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="overall", maxIter=10, regParam=0.01, elasticNetParam=0)
lr_model = lr.fit(training_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model using accuracy metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="overall", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.7034207904350714


In [None]:
from pyspark.sql.functions import col

# Calculate precision and recall
tp = predictions.filter((col("prediction") == col("overall")) & (col("prediction") == 1)).count()
tn = predictions.filter((col("prediction") == col("overall")) & (col("prediction") == 0)).count()
fp = predictions.filter((col("prediction") != col("overall")) & (col("prediction") == 1)).count()
fn = predictions.filter((col("prediction") != col("overall")) & (col("prediction") == 0)).count()

precision = tp / (tp + fp)
recall = tp / (tp + fn)

# Display precision and recall
print("Precision: ", precision)
print("Recall: ", recall)

# Create confusion matrix
confusion_matrix = predictions.groupBy("prediction", "overall").count().orderBy("prediction", "overall")
confusion_matrix.show()

Precision:  0.375
Recall:  1.0
+----------+-------+-----+
|prediction|overall|count|
+----------+-------+-----+
|       1.0|    1.0|    3|
|       1.0|    2.0|    1|
|       1.0|    4.0|    1|
|       1.0|    5.0|    3|
|       3.0|    1.0|    5|
|       3.0|    2.0|   18|
|       3.0|    3.0|   14|
|       3.0|    4.0|    5|
|       3.0|    5.0|   11|
|       4.0|    2.0|    1|
|       4.0|    3.0|    8|
|       4.0|    4.0|    5|
|       4.0|    5.0|    5|
|       5.0|    1.0|   87|
|       5.0|    2.0|  100|
|       5.0|    3.0|  209|
|       5.0|    4.0|  439|
|       5.0|    5.0| 2096|
+----------+-------+-----+



In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Select prediction and label columns from predictions dataframe
predictionAndLabels = predictions.select("prediction", "overall").rdd

# Instantiate MulticlassMetrics object with prediction and label RDDs
metrics = MulticlassMetrics(predictionAndLabels)

# Calculate precision for each label
labels = [1.0, 2.0, 3.0, 4.0, 5.0] # your 5 labels
precisions = {}
for label in labels:
    precisions[label] = metrics.precision(label)

# Print precision for each label
for label in labels:
    print(f"Precision for label {label}: {precisions[label]}")

# Calculate confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

# Print confusion matrix
print("Confusion Matrix:")
for i in range(len(confusion_matrix)):
    row = ""
    for j in range(len(confusion_matrix)):
        row += str(confusion_matrix[i][j]) + "\t"
    print(row)



Precision for label 1.0: 0.375
Precision for label 2.0: 0.0
Precision for label 3.0: 0.2641509433962264
Precision for label 4.0: 0.2631578947368421
Precision for label 5.0: 0.7151142954622995
Confusion Matrix:
3.0	0.0	5.0	0.0	87.0	
1.0	0.0	18.0	1.0	100.0	
0.0	0.0	14.0	8.0	209.0	
1.0	0.0	5.0	5.0	439.0	
3.0	0.0	11.0	5.0	2096.0	


In [None]:
%pip install gensim


Python interpreter will be restarted.
Collecting gensim
  Downloading gensim-4.3.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (26.5 MB)
Collecting smart-open>=1.8.1
  Downloading smart_open-6.3.0-py3-none-any.whl (56 kB)
Installing collected packages: smart-open, gensim
Successfully installed gensim-4.3.1 smart-open-6.3.0
Python interpreter will be restarted.


In [None]:
!pip install --upgrade numpy

Collecting numpy
  Downloading numpy-1.24.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[?25l[K     |                                | 10 kB 15.9 MB/s eta 0:00:02[K     |                                | 20 kB 5.9 MB/s eta 0:00:03[K     |                                | 30 kB 8.5 MB/s eta 0:00:03[K     |                                | 40 kB 4.6 MB/s eta 0:00:04[K     |                                | 51 kB 5.2 MB/s eta 0:00:04[K     |▏                               | 61 kB 6.1 MB/s eta 0:00:03[K     |▏                               | 71 kB 6.4 MB/s eta 0:00:03[K     |▏                               | 81 kB 7.2 MB/s eta 0:00:03[K     |▏                               | 92 kB 6.4 MB/s eta 0:00:03[K     |▏                               | 102 kB 5.8 MB/s eta 0:00:03[K     |▏                               | 112 kB 5.8 MB/s eta 0:00:03[K     |▎                               | 122 kB 5.8 MB/s eta 0:00:03[K     |▎                          

In [None]:
%Doc2Vec


In [None]:
from pyspark.ml.feature import Word2Vec, RegexTokenizer
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

# Define a regular expression tokenizer to split the text into words
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")

# Apply the tokenizer to the data
words_df = tokenizer.transform(df)

# Learn a Doc2Vec model on the text data
from gensim.models.doc2vec import Doc2Vec, TaggedDocument

# Define a function to convert a row into a TaggedDocument
def to_tagged_document(row):
    return TaggedDocument(row.words, [str(row.overall)])

tagged_data = words_df.rdd.map(to_tagged_document)
doc2vec_model = Doc2Vec(vector_size=100, min_count=5, epochs=10)
doc2vec_model.build_vocab(tagged_data.collect())
doc2vec_model.train(tagged_data.collect(), total_examples=doc2vec_model.corpus_count, epochs=doc2vec_model.epochs)

# Define a UDF to apply the infer_vector method on each row
infer_vector_udf = udf(lambda words: Vectors.dense(doc2vec_model.infer_vector(words)), VectorUDT())

# Add the Doc2Vec features to the data
doc2vec_df = words_df.withColumn("features", infer_vector_udf(words_df["words"]))

# Split the data into training and test sets
(training_data, test_data) = doc2vec_df.randomSplit([0.7, 0.3])

# Train a Logistic Regression model on the data
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="overall", maxIter=10, regParam=0.01, elasticNetParam=0)
lr_model = lr.fit(training_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Evaluate the model using accuracy metric
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="overall", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.7441938741164591


In [None]:
from pyspark.sql.functions import col

# Calculate precision and recall
tp = predictions.filter((col("prediction") == col("overall")) & (col("prediction") == 1)).count()
tn = predictions.filter((col("prediction") == col("overall")) & (col("prediction") == 0)).count()
fp = predictions.filter((col("prediction") != col("overall")) & (col("prediction") == 1)).count()
fn = predictions.filter((col("prediction") != col("overall")) & (col("prediction") == 0)).count()

precision = tp / (tp + fp)
recall = tp / (tp + fn)

# Display precision and recall
print("Precision: ", precision)
print("Recall: ", recall)

# Create confusion matrix
confusion_matrix = predictions.groupBy("prediction", "overall").count().orderBy("prediction", "overall")
confusion_matrix.show()

Precision:  0.5178571428571429
Recall:  1.0
+----------+-------+-----+
|prediction|overall|count|
+----------+-------+-----+
|       1.0|    1.0|   29|
|       1.0|    2.0|   11|
|       1.0|    3.0|    7|
|       1.0|    4.0|    4|
|       1.0|    5.0|    5|
|       2.0|    1.0|    6|
|       2.0|    2.0|    6|
|       2.0|    3.0|    5|
|       2.0|    4.0|    1|
|       2.0|    5.0|    1|
|       3.0|    1.0|   10|
|       3.0|    2.0|   15|
|       3.0|    3.0|   61|
|       3.0|    4.0|   20|
|       3.0|    5.0|   10|
|       4.0|    1.0|    2|
|       4.0|    2.0|    7|
|       4.0|    3.0|   25|
|       4.0|    4.0|   96|
|       4.0|    5.0|   46|
+----------+-------+-----+
only showing top 20 rows



In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Select prediction and label columns from predictions dataframe
predictionAndLabels = predictions.select("prediction", "overall").rdd

# Instantiate MulticlassMetrics object with prediction and label RDDs
metrics = MulticlassMetrics(predictionAndLabels)

# Calculate precision for each label
labels = [1.0, 2.0, 3.0, 4.0, 5.0] # your 5 labels
precisions = {}
for label in labels:
    precisions[label] = metrics.precision(label)

# Print precision for each label
for label in labels:
    print(f"Precision for label {label}: {precisions[label]}")

# Calculate confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

# Print confusion matrix
print("Confusion Matrix:")
for i in range(len(confusion_matrix)):
    row = ""
    for j in range(len(confusion_matrix)):
        row += str(confusion_matrix[i][j]) + "\t"
    print(row)



Precision for label 1.0: 0.5178571428571429
Precision for label 2.0: 0.3157894736842105
Precision for label 3.0: 0.5258620689655172
Precision for label 4.0: 0.5454545454545454
Precision for label 5.0: 0.7753456221198156
Confusion Matrix:
29.0	6.0	10.0	2.0	51.0	0.0	
11.0	6.0	15.0	7.0	46.0	0.0	
7.0	5.0	61.0	25.0	136.0	0.0	
4.0	1.0	20.0	96.0	351.0	0.0	
5.0	1.0	10.0	46.0	2019.0	0.0	
0.0	0.0	0.0	0.0	1.0	0.0	
