In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, StopWordsRemover, HashingTF, IDF, ChiSqSelector, CountVectorizer
from pyspark.ml.feature import Tokenizer, Normalizer, StringIndexer
from pyspark.sql.functions import col, lower

from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.mllib.util import MLUtils

import time

In [2]:
partial_dataset = "/user/dic24_shared/amazon-reviews/full/reviews_devset.json"
full_dataset = "/user/dic24_shared/amazon-reviews/full/reviewscombined.json"


In [3]:
#Initialize Spark Session
spark = SparkSession.builder \
    .appName("Text_Classification_all") \
    .getOrCreate()

try:
    df = spark.read.json(full_dataset)
    print("File read successfully.")
    
    # Show the schema and some data
    df.printSchema()
    df.show(5)
    
except Exception as e:
    print(f"Error: {e}")

SLF4J: Class path contains multiple SLF4J bindings.

File read successfully.
root
 |-- asin: string (nullable = true)
 |-- category: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)

+----------+--------------------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin|            category|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------------------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0981850006|Patio_Lawn_and_Garde| [6, 7]|    5.0|This

# Part 1: RDDs

# Part 2: 

In [4]:
# Casefolding
df = df.withColumn("reviewText", lower(col("reviewText")))

In [5]:
# Сonverting category to numeric
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

# Creating the pipeline:

# 1. tokenization
tokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern=r'\s+|\t+|\d+|[(){}.!?,;:+=-_"\`~#@&*%€$§\\/]+', gaps=True)

# 3. Stopwords removal
stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")

# 4. tf-idf calculation with CountVectorizer
hashingTF = CountVectorizer(inputCol=stopwords_remover.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")

# # 4. TF-IDF calculation with HashingTF
# hashingTF = HashingTF(inputCol=stopwords_remover.getOutputCol(), outputCol="rawFeatures", numFeatures=20)
# idf = IDF(inputCol="rawFeatures", outputCol="features")

# 5. Chi-square
selector = ChiSqSelector(numTopFeatures=2000, featuresCol=idf.getOutputCol(),
                         outputCol="selectedFeatures", labelCol=indexer.getOutputCol())


# pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector])


# start_time = time.time()
# model = pipeline.fit(df)
# fit_time = time.time() - start_time
# print(f'fit_time={fit_time}')

# # Transform the test data
# result = model.transform(df)
# transform_time = time.time() - fit_time - start_time
# print(f'transform_time={transform_time}')

# # Extract the CountVectorizer model from the pipeline
# count_vectorizer_model = model.stages[3]
# vocab = count_vectorizer_model.vocabulary

# # Get the selected feature indices
# selected_indices = result.select("selectedFeatures").rdd.flatMap(lambda x: x).collect()

# # Convert the indices to terms
# selected_terms = [vocab[index] for indices in selected_indices for index in indices.indices]

    
# with open("output_ds.txt", "w") as f:
#     for term in selected_terms:
#         # print(term)
#         f.write(f"{term} ")


# Part 3:

In [6]:
# 6. Normalizing
normalizer = Normalizer(inputCol=selector.getOutputCol(), outputCol="normFeatures")

# 7. Creating SVM classificator
svm = LinearSVC(featuresCol=normalizer.getOutputCol(), labelCol="categoryIndex")

# 8. Using One-vs-Rest for multiclass classification
ovr = OneVsRest(classifier=svm, labelCol="categoryIndex")


# pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector, normalizer, ovr])


In [None]:
# Splitting the data
train_df, validation_df, test_df,  = df.randomSplit([0.7, 0.15, 0.15], seed=42)

# train_df = train_df.sample(0.01)
# validation_df = train_df.sample(0.05)
# test_df = train_df.sample(0.05)

# Show the count of each set
print("Training set count:", train_df.count())
print("Test set count:", test_df.count())
print("Validation set count:", validation_df.count())

In [None]:
# # Creating a grid of parameters
# paramGrid = (ParamGridBuilder()
#              .addGrid(selector.numTopFeatures, [2000, 500])  # number of features selected
#              .addGrid(svm.regParam, [0.01, 0.1, 1.0])  # Regularization parameter
#              .addGrid(svm.maxIter, [10, 50])  # Maximum number of iterations
#              .addGrid(svm.standardization, [True, False]) # Standardization
#              .build())


# # Creating a Cross Validator for Grid search
# crossval = CrossValidator(estimator=pipeline,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=MulticlassClassificationEvaluator(labelCol="categoryIndex", metricName="f1"),
#                           numFolds=3)

# # Model training
# try:
#     print("Starting model training...")
#     model = crossval.fit(train_df)
#     print("Model training completed.")
# except Exception as e:
#     print(f"Error during model training: {e}")


# # Predictions on the test data
# try:
#     print("Making predictions...")
#     predictions = model.transform(test_df)
#     print("Predictions made.")
# except Exception as e:
#     print(f"Error during predictions: {e}")


# # Evaluating the model
# try:
#     print("Evaluating the model...")
#     evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")
#     f1_score = evaluator.evaluate(predictions)
#     print(f"F1 Score: {f1_score}")
# except Exception as e:
#     print(f"Error during evaluation: {e}")


# # Stopping Spark
# print("Stopping Spark...")
# spark.stop()
# print("Spark stopped.")

In [None]:
# # # Create pipeline
# # pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector, normalizer, ovr])

# # # Create parameter grid
# # paramGrid = (ParamGridBuilder()
# #              .addGrid(selector.numTopFeatures, [50, 2000])
# #              .addGrid(svm.regParam, [0.01, 0.1, 1.0])
# #              .addGrid(svm.maxIter, [10, 50])
# #              .addGrid(svm.standardization, [True, False])
# #              .build())

# # Define evaluator

# # Precompute stages up to IDF
# pre_pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, count_vectorizer, idf])

# # Fit pre_pipeline
# pre_model = pre_pipeline.fit(train_df)

# # Cache the precomputed results
# train_df = pre_model.transform(train_df).cache()
# test_df = pre_model.transform(test_df).cache()
# validation_df = pre_model.transform(validation_df).cache()



# evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")

# best_model = None
# best_f1_score = float('-inf')
# best_params = None


# numTopFeatures = [500, 2000]
# regParams =  [0.01, 0.1, 1.0]
# maxIters = [10, 50]
# standardizations = [True, False]

# # Iterate over the parameter grid
# for numTopFeature in numTopFeatures:
#     for regParam in regParams:
#         for maxIter in maxIters:
#             for standardization in standardizations:
#                 print(f'numTopFeature={numTopFeature}, regParam={regParam}, maxIter={maxIter}, standardization={standardization}')

#                 selector = ChiSqSelector(numTopFeatures=numTopFeature, featuresCol=idf.getOutputCol(),
#                                      outputCol="selectedFeatures", labelCol=indexer.getOutputCol())
#                 svm = LinearSVC(featuresCol=normalizer.getOutputCol(), labelCol="categoryIndex", regParam=regParam, maxIter=maxIter, standardization=standardization)

#                 pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector, normalizer, ovr])

#                 # Create a new pipeline model with the current parameter combination
#                 start_time = time.time()
#                 print('fit')
#                 pipeline_model = pipeline.fit(train_df)
#                 fit_time = time.time() - start_time
#                 print(f'fit_time={fit_time}')

#                 print('transform')
#                 # Make predictions on the validation set
#                 predictions = pipeline_model.transform(validation_df)
#                 transform_time = time.time() - start_time  - fit_time
#                 print(f'transform_time={transform_time}')

#                 print('evaluate')
#                 # Evaluate the model
#                 f1_score = evaluator.evaluate(predictions)
#                 evaluate_time = time.time() - start_time  - fit_time - transform_time
#                 print(f'fit_time={evaluate_time}')

#                 print(f'f1_score={f1_score}')
                
#                 if f1_score > best_f1_score:
#                     best_f1_score = f1_score
#                     best_model = pipeline_model
#                     best_params = f'numTopFeature={numTopFeature}, regParam={regParam}, maxIter={maxIter}, standardization={standardization}'

# print(f"Best F1 Score: {best_f1_score}")
# print(f"Best Parameters: {best_params}")

# # Predictions on the test data
# try:
#     print("Making predictions...")
#     predictions = best_model.transform(test_df)
#     print("Predictions made.")
# except Exception as e:
#     print(f"Error during predictions: {e}")

# # Evaluating the model
# try:
#     print("Evaluating the model...")
#     f1_score = evaluator.evaluate(predictions)
#     print(f"F1 Score: {f1_score}")
# except Exception as e:
#     print(f"Error during evaluation: {e}")

# # Stopping Spark
# print("Stopping Spark...")
# spark.stop()
# print("Spark stopped.")

In [None]:
selector = ChiSqSelector(numTopFeatures=500, featuresCol=idf.getOutputCol(),
                         outputCol="selectedFeatures", labelCol=indexer.getOutputCol())

pre_pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector])
# Fit pre_pipeline
pre_model = pre_pipeline.fit(train_df)

# Cache the precomputed results
preprocessed_train_df_500 = pre_model.transform(train_df).cache()
preprocessed_test_df_500 = pre_model.transform(test_df).cache()
preprocessed_validation_df_500 = pre_model.transform(validation_df).cache()


In [None]:
selector = ChiSqSelector(numTopFeatures=2000, featuresCol=idf.getOutputCol(),
                         outputCol="selectedFeatures", labelCol=indexer.getOutputCol())

pre_pipeline = Pipeline(stages=[indexer, tokenizer, stopwords_remover, hashingTF, idf, selector])
# Fit pre_pipeline
pre_model = pre_pipeline.fit(train_df)

# Cache the precomputed results
preprocessed_train_df_2000 = pre_model.transform(train_df).cache()
preprocessed_test_df_2000 = pre_model.transform(test_df).cache()
preprocessed_validation_df_2000 = pre_model.transform(validation_df).cache()


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, ChiSqSelector, StringIndexer, Normalizer
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import time
from pyspark.sql.functions import col

In [None]:
# Create parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(LinearSVC.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(LinearSVC.maxIter, [10, 50]) \
    .addGrid(LinearSVC.standardization, [True, False]) \
    .build()



evaluator = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="f1")

best_model = None
best_f1_score = float('-inf')
best_params = None

In [None]:
train_df = preprocessed_train_df_500
validation_df = preprocessed_validation_df_500
test_df = preprocessed_test_df_500


# Iterate over the parameter grid
for params in paramGrid:
    regParam = params[LinearSVC.regParam]
    maxIter = params[LinearSVC.maxIter]
    standardization = params[LinearSVC.standardization]

    print(f'fregParam={regParam}, maxIter={maxIter}, standardization={standardization}')

    normalizer = Normalizer(inputCol="selectedFeatures", outputCol="normFeatures")
    svm = LinearSVC(featuresCol="normFeatures", labelCol="categoryIndex", regParam=regParam, maxIter=maxIter, standardization=standardization)
    ovr = OneVsRest(classifier=svm, labelCol="categoryIndex")

    pipeline = Pipeline(stages=[normalizer, ovr])

    # Create a new pipeline model with the current parameter combination
    start_time = time.time()
    pipeline_model = pipeline.fit(train_df)
    fit_time = time.time() - start_time
    print(f'fit_time={fit_time}')

    # Make predictions on the validation set
    predictions = pipeline_model.transform(validation_df)
    transform_time = time.time() - start_time - fit_time
    print(f'transform_time={transform_time}')

    # Evaluate the model
    f1_score = evaluator.evaluate(predictions)
    evaluate_time = time.time() - start_time - fit_time - transform_time
    print(f'evaluate_time={evaluate_time}')

    print(f'f1_score={f1_score}')

    if f1_score > best_f1_score:
        best_f1_score = f1_score
        best_model = pipeline_model
        best_params = f'regParam={regParam}, maxIter={maxIter}, standardization={standardization}'



In [None]:
train_df = preprocessed_train_df_2000
validation_df = preprocessed_validation_df_2000
test_df = preprocessed_test_df_2000


# Iterate over the parameter grid
for params in paramGrid:
    regParam = params[LinearSVC.regParam]
    maxIter = params[LinearSVC.maxIter]
    standardization = params[LinearSVC.standardization]

    print(f'fregParam={regParam}, maxIter={maxIter}, standardization={standardization}')

    normalizer = Normalizer(inputCol="selectedFeatures", outputCol="normFeatures")
    svm = LinearSVC(featuresCol="normFeatures", labelCol="categoryIndex", regParam=regParam, maxIter=maxIter, standardization=standardization)
    ovr = OneVsRest(classifier=svm, labelCol="categoryIndex")

    pipeline = Pipeline(stages=[normalizer, ovr])

    # Create a new pipeline model with the current parameter combination
    start_time = time.time()
    pipeline_model = pipeline.fit(train_df)
    fit_time = time.time() - start_time
    print(f'fit_time={fit_time}')

    # Make predictions on the validation set
    predictions = pipeline_model.transform(validation_df)
    transform_time = time.time() - start_time - fit_time
    print(f'transform_time={transform_time}')

    # Evaluate the model
    f1_score = evaluator.evaluate(predictions)
    evaluate_time = time.time() - start_time - fit_time - transform_time
    print(f'evaluate_time={evaluate_time}')

    print(f'f1_score={f1_score}')

    if f1_score > best_f1_score:
        best_f1_score = f1_score
        best_model = pipeline_model
        best_params = f'regParam={regParam}, maxIter={maxIter}, standardization={standardization}'



In [None]:
print(f"Best F1 Score: {best_f1_score}")
print(f"Best Parameters: {best_params}")

# Predictions on the test data
try:
    print("Making predictions...")
    predictions = best_model.transform(test_df)
    print("Predictions made.")
except Exception as e:
    print(f"Error during predictions: {e}")

# Evaluating the model
try:
    print("Evaluating the model...")
    f1_score = evaluator.evaluate(predictions)
    print(f"F1 Score: {f1_score}")
except Exception as e:
    print(f"Error during evaluation: {e}")

# Stopping Spark
print("Stopping Spark...")
spark.stop()
print("Spark stopped.")