### Packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover,
    CountVectorizer, IDF,
    StringIndexer, ChiSqSelector,
    Normalizer, StandardScaler
)
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from itertools import product

import warnings
warnings.filterwarnings("ignore")

In [2]:
spark = SparkSession.builder.appName("Task2_Task3").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print("Spark UI available at:", spark.sparkContext.uiWebUrl)

reviews = spark.read.json("hdfs:///user/e01652446/input/reviews_devset.json")
reviews.head()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/13 01:09:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/13 01:09:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/13 01:09:49 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark UI available at: http://lbdlg01.datalab.novalocal:4042


                                                                                

Row(asin='0981850006', category='Patio_Lawn_and_Garde', helpful=[6, 7], overall=5.0, reviewText="This was a gift for my other husband.  He's making us things from it all the time and we love the food.  Directions are simple, easy to read and interpret, and fun to make.  We all love different kinds of cuisine and Raichlen provides recipes from everywhere along the barbecue trail as he calls it. Get it and just open a page.  Have at it.  You'll love the food and it has provided us with an insight into the culture that produced it. It's all about broadening horizons.  Yum!!", reviewTime='12 3, 2009', reviewerID='A2VNYWOPJ13AFP', reviewerName='Amazon Customer "carringt0n"', summary='Delish', unixReviewTime=1259798400)

## Part 2: Feature Pipeline

In [3]:
tokenizer = RegexTokenizer(
    inputCol="reviewText",
    outputCol="wordTokens",
    pattern=r"[\s\d()\[\]{}\.\!\?,;:+=\-_\"'`~#@&\*\%€\$§\\/]+"
)


stopwords = spark.sparkContext.textFile(
    "hdfs:///user/e01652446/input/stopwords.txt"
).collect()
remover = StopWordsRemover(
    inputCol="wordTokens",
    outputCol="filteredWords",
    stopWords=stopwords
)

count_vectorizer = CountVectorizer(
    inputCol="filteredWords",
    outputCol="rawTermCounts"
)

idf_transformer = IDF(
    inputCol="rawTermCounts",
    outputCol="tfidfFeatures"
)

label_indexer = StringIndexer(
    inputCol="category",
    outputCol="label",
    handleInvalid="skip"
)

chi_selector = ChiSqSelector(
    numTopFeatures=2000,
    featuresCol="tfidfFeatures",
    outputCol="selectedFeatures",
    labelCol="label"
)



In [4]:
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    count_vectorizer,
    idf_transformer,
    label_indexer,
    chi_selector
])
model = pipeline.fit(reviews)

                                                                                

In [5]:
vocab = model.stages[2].vocabulary 
selected_indices = model.stages[-1].selectedFeatures

selected_terms = [vocab[i] for i in selected_indices]

In [6]:
#sc = spark.sparkContext
#sc.parallelize(selected_terms, 1) \
#  .saveAsTextFile("output_ds.txt")

## Part 3: Classification

In [7]:
normalizer = Normalizer(
    inputCol="selectedFeatures",
    outputCol="normalizedFeatures"
)

scaler = StandardScaler(
    inputCol="selectedFeatures",
    outputCol="scaledFeatures",
    withMean=False
)

binary_svm = LinearSVC(
    featuresCol="normalizedFeatures",
    labelCol="label"
)

multi_class_svm = OneVsRest(
    classifier=binary_svm,
    labelCol="label",
    featuresCol="normalizedFeatures"
)


full_pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    count_vectorizer,
    idf_transformer,
    label_indexer,
    chi_selector,
    normalizer,
    multi_class_svm
])

In [8]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

In [11]:


train_data, test_data = reviews.randomSplit([0.7, 0.3], seed=42)
train_inner, val_inner = train_data.randomSplit([0.8, 0.2], seed=42)

feature_counts   = [2000, 1000]
reg_params       = [0.01, 0.1, 1.0]
max_iters        = [20,   60]
standardizations = [True, False]


best_f1    = -1.0
best_model = None
best_conf  = None


total = (len(feature_counts)
         * len(reg_params)
         * len(max_iters)
         * len(standardizations))

count = 0
for k, reg, iters, std in product(feature_counts,
                                  reg_params,
                                  max_iters,
                                  standardizations):
    count += 1
    print(f"\n⏳ Training {count}/{total} "
          f"(numTopFeatures={k}, regParam={reg}, "
          f"maxIter={iters}, standardization={std})")
    
    chi_selector.setNumTopFeatures(k)
    binary_svm.setRegParam(reg) \
              .setMaxIter(iters) \
              .setStandardization(std)
    scaler.setWithStd(std)      
    
  
    model = full_pipeline.fit(train_inner)

    val_pred = model.transform(val_inner)
    f1 = evaluator.evaluate(val_pred)
    print(f"→ Validation F1 = {f1:.4f}")
    
    if f1 > best_f1:
        best_f1    = f1
        best_model = model
        best_conf  = (k, reg, iters, std)


print("\nBest validation config:",
      f"numTopFeatures={best_conf[0]}, regParam={best_conf[1]},",
      f"maxIter={best_conf[2]}, standardization={best_conf[3]}")
print(f"Best validation F1 = {best_f1:.4f}")

test_pred = best_model.transform(test_data)
test_f1   = evaluator.evaluate(test_pred)
print(f"Test F1 = {test_f1:.4f}")


⏳ Training 1/24 (numTopFeatures=2000, regParam=0.01, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5984

⏳ Training 2/24 (numTopFeatures=2000, regParam=0.01, maxIter=20, standardization=False)


                                                                                

→ Validation F1 = 0.4712

⏳ Training 3/24 (numTopFeatures=2000, regParam=0.01, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.5981

⏳ Training 4/24 (numTopFeatures=2000, regParam=0.01, maxIter=60, standardization=False)


                                                                                

→ Validation F1 = 0.4941

⏳ Training 5/24 (numTopFeatures=2000, regParam=0.1, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5920

⏳ Training 6/24 (numTopFeatures=2000, regParam=0.1, maxIter=20, standardization=False)


                                                                                

→ Validation F1 = 0.4293

⏳ Training 7/24 (numTopFeatures=2000, regParam=0.1, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.5891

⏳ Training 8/24 (numTopFeatures=2000, regParam=0.1, maxIter=60, standardization=False)


                                                                                

→ Validation F1 = 0.4951

⏳ Training 9/24 (numTopFeatures=2000, regParam=1.0, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5549

⏳ Training 10/24 (numTopFeatures=2000, regParam=1.0, maxIter=20, standardization=False)


                                                                                

→ Validation F1 = 0.0045

⏳ Training 11/24 (numTopFeatures=2000, regParam=1.0, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.5518

⏳ Training 12/24 (numTopFeatures=2000, regParam=1.0, maxIter=60, standardization=False)


                                                                                

→ Validation F1 = 0.4956

⏳ Training 13/24 (numTopFeatures=1000, regParam=0.01, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5534

⏳ Training 14/24 (numTopFeatures=1000, regParam=0.01, maxIter=20, standardization=False)


                                                                                

→ Validation F1 = 0.4678

⏳ Training 15/24 (numTopFeatures=1000, regParam=0.01, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.5526

⏳ Training 16/24 (numTopFeatures=1000, regParam=0.01, maxIter=60, standardization=False)


                                                                                

→ Validation F1 = 0.4592

⏳ Training 17/24 (numTopFeatures=1000, regParam=0.1, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5393

⏳ Training 18/24 (numTopFeatures=1000, regParam=0.1, maxIter=20, standardization=False)


                                                                                

→ Validation F1 = 0.4676

⏳ Training 19/24 (numTopFeatures=1000, regParam=0.1, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.5298

⏳ Training 20/24 (numTopFeatures=1000, regParam=0.1, maxIter=60, standardization=False)


                                                                                

→ Validation F1 = 0.4586

⏳ Training 21/24 (numTopFeatures=1000, regParam=1.0, maxIter=20, standardization=True)


                                                                                

→ Validation F1 = 0.5014

⏳ Training 22/24 (numTopFeatures=1000, regParam=1.0, maxIter=20, standardization=False)


                                                                                

25/05/13 05:44:06 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


                                                                                

→ Validation F1 = 0.0005

⏳ Training 23/24 (numTopFeatures=1000, regParam=1.0, maxIter=60, standardization=True)


                                                                                

→ Validation F1 = 0.4945

⏳ Training 24/24 (numTopFeatures=1000, regParam=1.0, maxIter=60, standardization=False)


                                                                                

25/05/13 06:12:50 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


                                                                                

→ Validation F1 = 0.4603

🏆 Best validation config: numTopFeatures=2000, regParam=0.01, maxIter=20, standardization=True
🏆 Best validation F1 = 0.5984


[Stage 55175:>                                                      (0 + 2) / 2]

🚀 Test F1 = 0.5981


                                                                                

In [None]:
spark.stop()