In [1]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

In [29]:
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, DoubleType
from pyspark.ml.linalg import DenseVector, SparseVector
import spacy
import nltk

import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col
c = col
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover, HashingTF, IDF
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, Word2Vec
from sparkxgb import XGBoostClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LinearSVC, LogisticRegression, GBTClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report, auc

In [3]:
import shutup 
shutup.please()

In [55]:
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.read.options(header=True).csv("d.csv")

                                                                                

----

In [6]:
# nlp = spacy.load("en_core_web_sm")

# def lemmatize_text(tokens):
#     doc = nlp(" ".join(tokens))
#     lemmatized_tokens = [token.lemma_ for token in doc]
#     return lemmatized_tokens


# lemmatize_udf = udf(lemmatize_text, ArrayType(StringType()))

regex = r'[!\"#$%&\'()*+,\-./:;<=>?@\[\\\]^_`{|}~]|(?<!\w)("|/)(?!\w)'

df = (spark.read
           .option("header", "true")
           .csv("./d.csv").drop("_c0")
           .where(c("label").isNotNull())
           .where(c("review_text").isNotNull())
           .where(c("label").isin([0, 1]))
           .withColumn("review_text", F.lower(c('review_text')))
           .withColumn("review_text", F.regexp_replace(c('review_text'), '\d+', ''))
           .withColumn("review_text", F.regexp_replace("review_text", regex, ""))
           .where(c("review_text") != '')
     )

train, test = df.select("review_text", "label").randomSplit([0.8, 0.2], seed = 20)

In [7]:
train.show()

+--------------------+-----+
|         review_text|label|
+--------------------+-----+
|  not enough peko...|    1|
| and at no point ...|    0|
| and is a good ex...|    1|
| and planetside h...|    1|
| because right no...|    0|
|                 btf|    0|
| but i guess dim ...|    1|
| but it still fee...|    1|
| but its fun and ...|    1|
| but out of luck ...|    0|
| but theres enoug...|    1|
| but wait until y...|    1|
| buy this game it...|    1|
| charming relaxin...|    1|
| dollar game at b...|    0|
| for temple run a...|    0|
| hour and im alre...|    1|
| hours in and it ...|    1|
| hours in still h...|    1|
| hours of my life...|    1|
+--------------------+-----+
only showing top 20 rows



In [56]:
tokenizer = RegexTokenizer(inputCol="review_text", outputCol="review_text_tokens", pattern="\\W")
add_tokens = tokenizer.transform(train)
stop_word_remover = StopWordsRemover(inputCol="review_text_tokens", outputCol="review_text_tokens_removed")
remove_stop_words = stop_word_remover.transform(add_tokens)

# Tokenize the text
# Apply TF-IDF
cv = CountVectorizer(inputCol="review_text_tokens_removed", outputCol="rawFeatures")
cvModel = cv.fit(remove_stop_words)
featurizedData = cvModel.transform(remove_stop_words)

### Number of docs

In [9]:
featurizedData.count()

877

### Voncalubalry size

In [10]:
featurizedData.select(F.explode(c("review_text_tokens_removed"))).distinct().count()

                                                                                

3418

## top tokens

In [11]:
(featurizedData
.select(F.explode(c("review_text_tokens_removed")).alias("tokens"))
.groupBy("tokens").count().orderBy(c("count").desc())).show()

                                                                                

+------+-----+
|tokens|count|
+------+-----+
|   sht|  667|
|   fck|  666|
|  game|  624|
|  like|  188|
|   fun|  184|
|  good|  151|
| great|  100|
|really|   91|
| games|   82|
|   get|   76|
|   one|   75|
|  play|   73|
|  dont|   72|
|  even|   68|
|  time|   62|
|   far|   61|
| still|   58|
| first|   58|
|  love|   57|
|  much|   56|
+------+-----+
only showing top 20 rows



In [12]:
token_counts = (featurizedData
    .select("label", F.explode(F.col("review_text_tokens_removed")).alias("tokens"))
    .groupBy("tokens", "label")
    .count()
    .orderBy(F.col("count").desc())
)

window = Window.partitionBy("label").orderBy(F.col("count").desc())
token_counts_with_rownumber = token_counts.withColumn("row_number", F.row_number().over(window))

top_tokens = token_counts_with_rownumber.filter(F.col("row_number") <= 10)

In [13]:
top_tokens.drop("row_number").show()

                                                                                

+------+-----+-----+
|tokens|label|count|
+------+-----+-----+
|  game|    0|  187|
|  like|    0|   55|
|   get|    0|   31|
|  dont|    0|   30|
|  even|    0|   30|
|   far|    0|   24|
|  much|    0|   22|
|  play|    0|   20|
| games|    0|   19|
|   one|    0|   19|
|   sht|    1|  667|
|   fck|    1|  666|
|  game|    1|  437|
|   fun|    1|  167|
|  good|    1|  133|
|  like|    1|  133|
| great|    1|   92|
|really|    1|   76|
| games|    1|   63|
|  love|    1|   57|
+------+-----+-----+



---

### Full pipeline



In [14]:
def predict(pipelineModel, test):
    test_transformed = pipelineModel.transform(test)

    # Make predictions on the test data
    predictions = test_transformed.select("label", "prediction")

    # Evaluate the accuracy of the model
    evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                                  predictionCol="prediction",
                                                  metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    print("Accuracy on test data:", accuracy)
    return predictions

In [15]:
df = (spark.read
           .option("header", "true")
           .csv("./d.csv").drop("_c0")
           .where(c("label").isNotNull())
           .where(c("review_text").isNotNull())
           .where(c("label").isin([0, 1]))
           .withColumn("review_text", F.lower(c('review_text')))
           .withColumn("review_text", F.regexp_replace(c('review_text'), '\d+', ''))
           .withColumn("review_text", F.regexp_replace("review_text", regex, ""))
           .where(c("review_text") != '')
     ).withColumn("label", c("label").cast(DoubleType()))

train, test = df.select("review_text", "label").randomSplit([0.8, 0.2], seed = 20)

In [57]:
# Define the pipeline stages
tokenizer = RegexTokenizer(inputCol="review_text", outputCol="review_text_tokens", pattern="\\W")
stop_word_remover = StopWordsRemover(inputCol="review_text_tokens", outputCol="review_text_tokens_removed")
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="review_text_tokens_removed", outputCol="word2VecFeatures")

In [17]:
classifier_logistic = LogisticRegression(featuresCol="word2VecFeatures", labelCol="label")
classifier_boosting = GBTClassifier(featuresCol="word2VecFeatures", labelCol="label")
classifier_svc = LinearSVC(featuresCol="word2VecFeatures", labelCol="label")

### Logistic regression

In [58]:
# Create the pipeline
pipeline_logistic = Pipeline(stages=[tokenizer, stop_word_remover, word2Vec, classifier_logistic])

# Fit the pipeline to the training data
pipelineModel_logistic = pipeline_logistic.fit(train)

In [19]:
predict_logistic = predict(pipelineModel_logistic, test)

Accuracy on test data: 0.7848101265822784


---

## Boosting

In [54]:
pipeline_boosting = Pipeline(stages=[tokenizer, stop_word_remover, word2Vec, classifier_boosting])

paramGrid = (ParamGridBuilder()
             .addGrid(classifier_boosting.maxDepth, [5, 10])
             .addGrid(classifier_boosting.maxBins,  [20, 30])
             .build())

# Set up the cross-validator
crossval = CrossValidator(estimator=pipeline_boosting,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

crossvalModel = crossval.fit(train)

In [21]:
predict_boost = predict(crossvalModel, test)

Accuracy on test data: 0.759493670886076


----

### Support vector

In [53]:
pipeline_svc = Pipeline(stages=[tokenizer, stop_word_remover, word2Vec, classifier_svc])

# Define the parameter grid
paramGrid_svc = (ParamGridBuilder()
                 .addGrid(classifier_svc.maxIter, [100, 200])
                 .addGrid(classifier_svc.regParam, [0.0001,0.01,0.1, 0.2])
                 .build())


# Set up the cross-validator
crossval_svc = CrossValidator(estimator=pipeline_svc,
                              estimatorParamMaps=paramGrid_svc,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=3)

crossvalModel_svc = crossval_svc.fit(train)

In [48]:
predict_svc = predict(crossvalModel_svc, test)

Accuracy on test data: 0.7932489451476793


---

### Summary table

In [49]:
lab_true_svc = predict_svc.select('label').toPandas()['label'].values
lab_pred_svc = predict_svc.select('prediction').toPandas()['prediction'].values
print(classification_report(lab_true_svc, lab_pred_svc))

              precision    recall  f1-score   support

         0.0       0.00      0.00      0.00        49
         1.0       0.79      1.00      0.88       188

    accuracy                           0.79       237
   macro avg       0.40      0.50      0.44       237
weighted avg       0.63      0.79      0.70       237

