In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
import re

In [2]:
from nltk import download
download("stopwords")
download("wordnet")
download("averaged_perceptron_tagger")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [3]:
spark = SparkSession.builder.appName("Crossvalidation RF") \
    .master('spark://0.0.0.0:7077') \
    .config("spark.executor.resource.gpu.amount", "1") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.executor.memory", "25g")\
    .config("spark.shuffle.service.enabled", "false")\
    .config("spark.dynamicAllocation.enabled", "false")\
    .getOrCreate()

In [4]:
df = spark.read.csv("datasets/sentiment_data_trim.csv", header=True, inferSchema=True)
df = df.drop("_c0")
df = df.na.drop()

In [5]:
from pyspark.sql.functions import concat_ws
df = df.withColumn("Reviews", concat_ws(" ", df.Review, df.Summary))
df = df.drop("Review", "Summary")

In [6]:
from pyspark.sql.functions import udf
def removeSpecChar(raw_text):
    print(type(raw_text))
    clean_SpecialChar = re.sub("[^a-zA-Z]", " ", raw_text)  
    return clean_SpecialChar
removeSpecialChar = udf(removeSpecChar)

In [7]:
from nltk.stem import WordNetLemmatizer
def get_wordnet_pos(tag):
    from nltk.corpus import wordnet
    if tag.startswith('J'):
        return wordnet.ADJ
    elif tag.startswith('V'):
        return wordnet.VERB
    elif tag.startswith('N'):
        return wordnet.NOUN
    elif tag.startswith('R'):
        return wordnet.ADV
    else: 
        return wordnet.NOUN

In [8]:
lemmatizer = WordNetLemmatizer()
def lemmatizeScalar(sentance):
    from nltk.tag import pos_tag
    print(sentance)
    tagged = pos_tag( [i for i in sentance if i])
    lemmatized = []
    for word, tag in tagged:
        lemma = lemmatizer.lemmatize(word, pos = get_wordnet_pos(tag))
        lemmatized.append(lemma)
    return lemmatized
lemmatize = udf(lemmatizeScalar)

In [9]:
from pyspark.sql.functions import lower
df = df.withColumn("Reviews", removeSpecialChar(lower(df["Reviews"])))

In [10]:
tokenizer = Tokenizer(inputCol="Reviews", outputCol="reviewtoken")
df = tokenizer.transform(df)

In [11]:
from pyspark.ml.feature import StopWordsRemover
from nltk.corpus import stopwords
remover = StopWordsRemover(inputCol="reviewtoken", outputCol="reviewtokenfiltered", stopWords=stopwords.words("english"))
df = remover.transform(df)

In [12]:
df = df.withColumn("reviewtokenfiltered2", lemmatize(df.reviewtokenfiltered))

In [13]:
from pyspark.sql.functions import expr
df = df.withColumn('reviewtokenfiltered2', expr(r"regexp_extract_all(reviewtokenfiltered2, '(\\w+)', 1)"))

In [14]:
df = df.drop("reviewtoken", "Reviews","reviewtokenfiltered")
df = df.withColumnRenamed("reviewtokenfiltered2", "reviewtokenfiltered")
df = df.withColumnRenamed("Sentiment", "label")

In [15]:
hashingTF = HashingTF(inputCol="reviewtokenfiltered", outputCol="termFrequency", numFeatures=37120)
idf = IDF(inputCol="termFrequency", outputCol="features", )
rf = RandomForestClassifier()


In [16]:
pipeline = Pipeline(stages = [hashingTF,idf,rf] )

In [17]:
param_grid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [20000,30000,40000])\
    .addGrid(rf.numTrees, [10,20,30]) \
    .addGrid(rf.minInfoGain, [0.0,0.1,0.01]) \
    .addGrid(rf.impurity, ["gini", "entropy"])\
    .build()

In [18]:
crossVal = CrossValidator(estimator=pipeline,
                        estimatorParamMaps=param_grid,
                        evaluator=BinaryClassificationEvaluator(),
                        numFolds=5)

In [19]:
cvModel = crossVal.fit(df)

In [None]:
bestModel = cvModel.bestModel
train_data,test_data=df.randomSplit([0.8,0.2],seed=123)
trained_model=bestModel.fit(train_data)

In [None]:
predictions=trained_model.transform(test_data)
area_under_curve = BinaryClassificationEvaluator().evaluate(predictions)

In [None]:

evaluator = BinaryClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = predictions.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = BinaryClassificationMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())