In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=c042abf7ef24948188c9716b8121a1acfbc1aee4ef7f4b971c1b3f640a8d16a2
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [2]:
# Importing required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf
import re
import string

In [3]:
# Set the memory configurations
spark = SparkSession.builder \
    .appName("TextClassifierwithPySpark") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()


In [4]:
# Merge the datasets
df = spark.read.parquet('Apparel_v1_00/amazon_us_reviews-train-00000-of-00005.parquet')

In [5]:
# Convert 'star_rating' column to string type
df = df.withColumn('star_rating', df['star_rating'].cast('string'))


In [6]:
# Convert 'review_body' column to string type
df = df.withColumn('review_body', df['review_body'].cast('string'))

In [7]:
# Apply label encoding based on star_rating
df = df.withColumn('label', (df['star_rating'] > '3').cast('integer'))

In [8]:
# Define the preprocess_text function
def preprocess_text(text):
    # Remove punctuation
    text = re.sub('[' + string.punctuation + ']', '', text)

    # Convert to lowercase
    text = text.lower()

    # Remove extra whitespaces
    text = ' '.join(text.split())

    return text

In [9]:
# Create a UDF for the preprocess_text function
preprocess_udf = udf(preprocess_text)


In [10]:
# Apply the preprocess_text function to the 'review_body' column
df = df.withColumn('review_body', preprocess_udf('review_body'))

In [11]:

# Select columns for training
df = df.select('review_body', 'label')


In [12]:
# Split the dataset into training and testing sets
trainDF, testDF = df.randomSplit([0.7, 0.3], seed=42)


In [13]:
# Define the stages for the pipeline
tokenizer = Tokenizer(inputCol='review_body', outputCol='mytokens')
stopwords_remover = StopWordsRemover(inputCol='mytokens', outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens', outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures', outputCol='vectorizedFeatures')


In [14]:
# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, vectorizer, idf])


In [15]:
# Fit the pipeline on the training data
pipeline_model = pipeline.fit(trainDF)

In [16]:
# Apply the pipeline to transform the training data
trainDF_transformed = pipeline_model.transform(trainDF)

In [17]:
# Apply the pipeline to transform the testing data
testDF_transformed = pipeline_model.transform(testDF)

In [24]:
# Define the mini-batch processing function
def process_mini_batch(iterator):
    for batch_df in iterator:
        # Fit the CountVectorizer on the mini-batch to obtain the vocabulary
        cv_model = vectorizer.fit(batch_df)
        vocab_size = len(cv_model.vocabulary)

        # Update the vectorizer input column to use the transformed tokens
        vectorizer.setInputCol('filtered_tokens')

        # Create the Multilayer Perceptron Classifier
        layers = [vocab_size, 64, 2]  # Input layer, hidden layer, output layer
        mpc = MultilayerPerceptronClassifier(layers=layers, featuresCol='vectorizedFeatures', labelCol='label')

        # Build the pipeline
        pipeline = Pipeline(stages=[vectorizer, idf, mpc])

        # Fit the pipeline on the mini-batch
        nn_model = pipeline.fit(batch_df)

        # Make predictions on the transformed testing data
        predictions = nn_model.transform(testDF_transformed)

        # Evaluate the model
        evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
        accuracy = evaluator.evaluate(predictions)

        # Print the accuracy
        print("Accuracy: {:.2f}%".format(accuracy * 100))

In [25]:
# Apply mini-batch processing using foreach
trainDF_transformed.foreachPartition(process_mini_batch)

# Start the Spark session
spark.stop()

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.RLock' object


PicklingError: ignored