<a href="https://colab.research.google.com/github/i-robles/ElipseJar/blob/main/Amazon_Reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<table>
<tr>
    <td>
        <img src="https://www.wordstream.com/wp-content/uploads/2021/07/how-to-get-amazon-reviews.png" width="200"/>
    </td>
    <td style="text-align: left; vertical-align: top;">
        <h1><strong>Amazon Reviews</strong><br></h1>
        <h4>Engineering Large Scale Data Analytics Systems<br>
        ENSF 612 - Fall 2023</h4>
    </td>
</tr>
</table>


*** Note: run all the code the first time. For subsecuent runs, you can set the individual process flags below to False. This will avoid resetting spark, reloading the datasets or repeat computing intensive tasks that were previously computed and stored.


In [81]:
set_spark = True
load_datasets = True
inspect_data = True
pre_process = True

**Setting Up Spark, Spark NLP and Required Modules**

In [82]:
# the capture magic command captures the output of the block to avoid clutter
%%capture

if set_spark:
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null
  !wget https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
  !tar -xvf spark-3.3.3-bin-hadoop3.tgz
  !pip install findspark
  !pip install -q spark-nlp
  !pip install contractions

  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

  import findspark
  findspark.init()
  findspark.find()
  from pyspark.sql import SparkSession
  import sparknlp

  # Setting up 4 threads, potentially allowing a 4-core processor execute 4 tasks in parallel
  # And adding the Spark NLP package to the Spark session
  spark = SparkSession.builder\
      .appName("Colab")\
      .master("local[4]")\
      .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4")\
      .getOrCreate()

  sc = spark.sparkContext

**Cloning Github Repository and Loading Datasets**

In [83]:
if load_datasets:
  !git clone https://github.com/MENG2023-TP/ENSF612-Project.git

fatal: destination path 'ENSF612-Project' already exists and is not an empty directory.


In [84]:
!ls ENSF612-Project/datasets

All_Beauty_5.json  Cell_Phones_and_Accessories_5_subsample.json  Software_5.json
Appliances_5.json  Musical_Instruments_5_subsample.json		 Video_Games_5_subsample.json


In [85]:
dataset_directory = 'ENSF612-Project/datasets'

# Gets the list of files in the dataset directory that end in ".json"
json_files = [file for file in os.listdir(dataset_directory) if file.endswith('.json')]

# Creates a list of full file paths
file_paths = [os.path.join(dataset_directory, file) for file in json_files]

In [86]:
import json

# Function to parse NDJSON (new line-delimited JSON) files and extract specific fields
def parse_ndjson(line):
    try:
        # Parse the JSON line and return only reviewText asin and reviewerID
        json_line = json.loads(line)
        return (
            json_line.get('overall', ''),
            json_line.get('reviewText', '')
        )
    except json.JSONDecodeError:
        # In case of error, skip this record and return None
        return None

In [87]:
if load_datasets:
  # Initialize an empty RDD
  X_rdd = spark.sparkContext.emptyRDD()
  y_rdd = spark.sparkContext.emptyRDD()

  # Read each file into an RDD, parse its ndjson objects if not None, and union with the existing RDD
  for file_path in file_paths:
      file_rdd = sc.textFile(file_path, 4) # Reads one of the files using its file_path
      parsed_rdd = file_rdd.map(parse_ndjson).filter(lambda x: x is not None) # For each line in the file calls parse_ndjson, it also filters out None records
      X_rdd = X_rdd.union(parsed_rdd.map(lambda x: (x[1],)))  # Extract review text (X)
      y_rdd = y_rdd.union(parsed_rdd.map(lambda x: (x[0],)))  # Extract scores (y)

  # convert the data_rdd to a distributed Spark DataFrame
X_df = spark.createDataFrame(X_rdd, schema=['review']).cache()
y_df = spark.createDataFrame(y_rdd, schema=['score']).cache()

**Data Inspection**

In [88]:
X_df # Shows the schema of the X_df dataframe, column name(s) and type(s)

DataFrame[review: string]

In [89]:
y_df # Shows the schema of the y_df dataframe, column name(s) and type(s)

DataFrame[score: double]

In [90]:
if inspect_data:
  X_count = X_df.count()
  y_count = y_df.count()
print(f"Records in X_df: {X_count}")
print(f"Records in y_df: {y_count}")

Records in X_df: 118242
Records in y_df: 118242


In [91]:
X_df.take(1) # Preview a single record

[Row(review='I like this as a vent as well as something that will keep house warmer in winter.  I sanded it and then painted it the same color as the house.  Looks great.')]

*Note about checking null values: since parse_ndjson returns None for records it could not return both their score and text and later the None records are filtered out. X_rdd and y_rdd effectively do not contain any null values. The following code demonstrates that:

In [92]:
if inspect_data:
  from pyspark.sql.functions import col, sum
  null_X_count = X_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in X_df.columns]).cache()  # Count null values
  null_y_count = y_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in y_df.columns]).cache()  # Count null values

In [93]:
null_X_count.show()
null_y_count.show()

+------+
|review|
+------+
|     0|
+------+

+-----+
|score|
+-----+
|    0|
+-----+



**Text Pre-Processing**

Expanding contractions

Although this step in not estrictly neccesary. Expanding contractions can make the text clearer and more consistent for the model, which can improve its ability to interpret and analyze the words.

In [94]:
if pre_process:
  from pyspark.sql.functions import udf
  from pyspark.sql.types import StringType
  import contractions

  # Define the UDF for expanding contractions
  def expand_contractions_text(text):
      return contractions.fix(text)

  expand_contractions_udf = udf(expand_contractions_text, StringType())

  # Apply the UDF to the DataFrame to create a new column with expanded contractions
  expanded_X_df = X_df.withColumn("expanded_review", expand_contractions_udf("review"))

Defining DocumentAssembler and Spark NLP components

The DocumentAssembler is the initial step in a Spark NLP pipeline. It converts raw text into a structured Annotation format that subsequent Spark NLP annotators can utilize for processing.

In [95]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner

document_assembler = DocumentAssembler() \
    .setInputCol("expanded_review") \
    .setOutputCol("document")

1. Tokenization

In [96]:
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

2. Text Cleaning

In [97]:
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True) \
    .setCleanupPatterns(["[^A-Za-z'\\s]"])  # remove punctuations and numbers

3. Stopword Removal

In [98]:
stop_words_cleaner = StopWordsCleaner() \
    .setInputCols(["normalized"]) \
    .setOutputCol("cleanTokens")

4. Stemming/Lemmatization.

Stemming and lemmatization are both text normalization techniques that reduce words to their base or root form. Applying both can at times be redundant. For this application we decide to use Lemmatization.


In [99]:
# Use the pretrained LemmatizerModel from Spark NLP
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemmatized")

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [100]:
if pre_process:
  # Define the Spark NLP pipeline
  from pyspark.ml import Pipeline

  preprocessing = Pipeline(stages=[
      document_assembler,
      tokenizer,
      normalizer,
      stop_words_cleaner,
      lemmatizer
  ])

  processed = preprocessing.fit(expanded_X_df).transform(expanded_X_df).cache()

In [101]:
# Show the processed data
processed.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              review|     expanded_review|            document|               token|          normalized|         cleanTokens|          lemmatized|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|I like this as a ...|I like this as a ...|[{document, 0, 15...|[{token, 0, 0, I,...|[{token, 0, 0, i,...|[{token, 2, 5, li...|[{token, 2, 5, li...|
|           good item|           good item|[{document, 0, 8,...|[{token, 0, 3, go...|[{token, 0, 3, go...|[{token, 0, 3, go...|[{token, 0, 3, go...|
|Fit my new LG dry...|Fit my new LG dry...|[{document, 0, 29...|[{token, 0, 2, Fi...|[{token, 0, 2, fi...|[{token, 0, 2, fi...|[{token, 0, 2, fi...|
|Good value for el...|Good value for el...|[{document, 0, 29...|[{token, 0, 3, Go...|[{token, 0, 3, go...|

In [102]:
# Get the first row of the DataFrame
first_row = processed.first()

# Print first row with its content
print("Review:", first_row['review'])
print("Document:", [doc.result for doc in first_row['document']])
print("Token:", [tok.result for tok in first_row['token']])
print("Normalized:", [norm.result for norm in first_row['normalized']])
print("Clean Tokens:", [clean.result for clean in first_row['cleanTokens']])
print("Lemmatized:", [lemma.result for lemma in first_row['lemmatized']])

Review: I like this as a vent as well as something that will keep house warmer in winter.  I sanded it and then painted it the same color as the house.  Looks great.
Document: ['I like this as a vent as well as something that will keep house warmer in winter.  I sanded it and then painted it the same color as the house.  Looks great.']
Token: ['I', 'like', 'this', 'as', 'a', 'vent', 'as', 'well', 'as', 'something', 'that', 'will', 'keep', 'house', 'warmer', 'in', 'winter', '.', 'I', 'sanded', 'it', 'and', 'then', 'painted', 'it', 'the', 'same', 'color', 'as', 'the', 'house', '.', 'Looks', 'great', '.']
Normalized: ['i', 'like', 'this', 'as', 'a', 'vent', 'as', 'well', 'as', 'something', 'that', 'will', 'keep', 'house', 'warmer', 'in', 'winter', 'i', 'sanded', 'it', 'and', 'then', 'painted', 'it', 'the', 'same', 'color', 'as', 'the', 'house', 'looks', 'great']
Clean Tokens: ['like', 'vent', 'well', 'something', 'keep', 'house', 'warmer', 'winter', 'sanded', 'painted', 'color', 'house', 

**Feature engineering**

TF-IDF Vectorization

In [103]:
from pyspark.ml.feature import HashingTF, CountVectorizer, IDF

#hashingTF = HashingTF(numFeatures=20) \
#    .setInputCol("lemmatized") \
#    .setOutputCol("rawFeatures")

countVectorizer = CountVectorizer(vocabSize = 500) \
    .setInputCol("lemmatized") \
    .setOutputCol("rawFeatures")

tfidf_vectorizer = IDF() \
    .setInputCol("rawFeatures") \
    .setOutputCol("tfidf_features")

**Implementing Machine Learning Models**

1. Model Selection

In [105]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LinearSVC

# Logistic Regression Pipeline
lr = LogisticRegression(featuresCol="tfidf_features", labelCol="label")
lr_pipeline = Pipeline(stages=[preprocessing, countVectorizer, tfidf_vectorizer, lr])

# Naive Bayes Pipeline
nb = NaiveBayes(featuresCol="tfidf_features", labelCol="label")
nb_pipeline = Pipeline(stages=[preprocessing, countVectorizer, tfidf_vectorizer, nb])

# Linear SVC Pipeline
svc = LinearSVC(featuresCol="tfidf_features", labelCol="label")
svc_pipeline = Pipeline(stages=[preprocessing, countVectorizer, tfidf_vectorizer, svc])


**Validating Models**

Next steps:

Feature extraction (Bag of Words, TF-IDF, word embeddings Word2Vec)

Vectorization (Count Vectorizer, TfidfVectorizer)

Model selection (LogisticRegression, Nayve Bayes, SVM or unsupervised learning)





Additional steps (optional to improve accuracy):

Speech tagging (before stop word removal)

N-grams to use along with TD-IDF