In [80]:
%env PYSPARK_DRIVER_PYTHON=jupyter
%env PYSPARK_DRIVER_PYTHON_OPTS=notebook
%env PYSPARK_PYTHON=python
%env OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES

# HADOOP_HOME = C:\Hadoop
# JAVA_HOME = C:\Java\jdk-11.0.6

env: PYSPARK_DRIVER_PYTHON=jupyter
env: PYSPARK_DRIVER_PYTHON_OPTS=notebook
env: PYSPARK_PYTHON=python
env: OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES


In [81]:
import numpy as np
import pandas as pd
from nltk import PorterStemmer

from pyspark import keyword_only
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp, from_unixtime, when, col, regexp_replace, lower, udf, concat, lit
from pyspark.sql.session import SparkSession
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, StringType


import matplotlib.pyplot as plt
%matplotlib inline


class Stemmer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(Stemmer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)
        
    def _transform(self, df: DataFrame) -> DataFrame:
        
        def stem_words(words):
            return [PorterStemmer().stem(w) for w in words]
        
        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = df[self.getInputCol()]
        return df.withColumn(out_col, udf(f=stem_words, returnType=t)(in_col))
    
    
def preprocessing(spark_session, path: str, is_test: bool):
    # read file
    df = spark_session.read.json(path=path)
    
    #change columns
    df = df.withColumn("label", df["overall"].cast(IntegerType()))
    # df = df.withColumn("asin", df["asin"].cast(DoubleType()))
    # df = df.withColumn("vote", when(col("vote") == "none", None).otherwise(col("vote")).cast("double"))
    # df = df.withColumn('style', df["style.Format:"]) 
    # df = df.withColumn('unixReviewTime', from_unixtime(df['unixReviewTime']))
    # df = df.withColumn("reviewTime", to_timestamp(df.reviewTime, 'MM d, yyyy'))

    df = df.drop('asin', 'image', 'reviewTime', 'reviewerID', 'reviewerName', 'reviewTime', 'unixReviewTime', 'overall'\
                , 'style', 'summary', 'verified', 'vote') 
    
    df = df.withColumn('reviewText', regexp_replace('reviewText', "\S+@\S+\s", ' '))\
                .withColumn('reviewText', regexp_replace('reviewText', "\n|\t", ' '))\
                .withColumn('reviewText', regexp_replace('reviewText', "\S*\d+\S*", ' '))\
                .withColumn('reviewText', regexp_replace('reviewText', "\s\W*\w\W*\s", ' '))\
                .withColumn('reviewText', regexp_replace('reviewText', "\W+", ' '))
    
    df = df.withColumn('reviewText', lower('reviewText'))
    # fill null values
    df = df.na.fill("null",["reviewText"]) 
    
    if is_test:
        df = df.limit(100)
    
    return df

In [82]:
WITH_INDEXERS = True
IS_TEST = True

spark = SparkSession \
    .builder \
    .appName("ML") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "6g")\
    .config('spark.executor.cores','1')\
    .getOrCreate()


path = r'C:\Users\dominik.brys\OneDrive - Accenture\DB\PW-Big_Data\Projekt\Books_5\Books_5_limited_v2.json'

df = preprocessing(spark, path, is_test=IS_TEST)

# df.show()

preprocessing_pipeline = Pipeline(
    stages=[
        RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W"),
        StopWordsRemover(inputCol='words', outputCol='words_cleaned'),
        Stemmer(inputCol='words_cleaned', outputCol='words_stemmed'),
        CountVectorizer(inputCol="words_stemmed", outputCol="term_freq", minDF=10.0), #, maxDF=0.5),
        IDF(inputCol="term_freq", outputCol="tfidf")
    ]
)

preprocessing_model = preprocessing_pipeline.fit(df)
df = preprocessing_model.transform(df)

# df.show(2)

df = df.select(['tfidf', 'label'])

In [78]:
train_set, test_set = df.randomSplit([0.75, 0.25], seed=123)


param_grid = ParamGridBuilder() \
    .addGrid(r_forest.maxDepth, [10, 20, 30]) \
    .addGrid(r_forest.maxBins, [5, 10, 25])\
    .addGrid(r_forest.numTrees, [50, 100, 200])\
    .addGrid(r_forest.impurity, ['gini','entropy'])\
    .build()

if WITH_INDEXERS is True:
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)
    featureIndexer = VectorIndexer(inputCol="tfidf", outputCol="indexedFeatures", maxCategories=4).fit(df)

    r_forest = RandomForestClassifier(bootstrap=True, labelCol="indexedLabel", featuresCol="indexedFeatures")
    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

    cv = CrossValidator(estimator=r_forest,
                        estimatorParamMaps=param_grid,
                        evaluator=evaluator,
                        numFolds=5)

    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)


    pipeline = Pipeline(stages=[labelIndexer, 
                                featureIndexer, 
                                cv,
                                labelConverter
                               ])
    cv_stage = 2
    
else:
    r_forest = RandomForestClassifier(bootstrap=True, 
                                      labelCol="label", featuresCol="tfidf")

    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", 
        metricName="accuracy")

    cv = CrossValidator(estimator=r_forest,
                        estimatorParamMaps=param_grid,
                        evaluator=evaluator,
                        numFolds=5)


    pipeline = Pipeline(stages=[cv])
    cv_stage = 0


model = pipeline.fit(train_set)
predictions = model.transform(test_set)


# Select example rows to display.
# predictions.select("prediction", "label", "tfidf").show(5)

# Select (prediction, true label) and compute test error
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

# predictions.show()

hyperparams = model.stages[cv_stage].getEstimatorParamMaps()[np.argmax(model.stages[cv_stage].avgMetrics)]
print(hyperparams)

Test Error = 0.217391
{Param(parent='RandomForestClassifier_8d68091fd606', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10, Param(parent='RandomForestClassifier_8d68091fd606', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 5, Param(parent='RandomForestClassifier_8d68091fd606', name='numTrees', doc='Number of trees to train (>= 1).'): 50, Param(parent='RandomForestClassifier_8d68091fd606', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini'}


In [79]:
hyperparams = model.stages[cv_stage].getEstimatorParamMaps()[np.argmax(model.stages[cv_stage].avgMetrics)]

import re

hyper_list = []

for i in range(len(hyperparams.items())):
    hyper_name = re.search("name='(.+?)'", str([x for x in hyperparams.items()][i])).group(1)
    hyper_value = [x for x in hyperparams.items()][i][1]

    hyper_list.append({hyper_name: hyper_value})

print(hyper_list)

# print(model.stages[0].bestModel.

[{'maxDepth': 10}, {'maxBins': 5}, {'numTrees': 50}, {'impurity': 'gini'}]


In [74]:

# print(model.stages[0].getEstimatorParamMaps())

In [75]:
print(hyperparams)

{Param(parent='RandomForestClassifier_e1d7e852a88a', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10, Param(parent='RandomForestClassifier_e1d7e852a88a', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 5, Param(parent='RandomForestClassifier_e1d7e852a88a', name='numTrees', doc='Number of trees to train (>= 1).'): 50}
