# 0) Initial Preparation
- Installing packages
- Importing packages
- Spark session initialization
- Database connection

**Installing Packages**


---



In [None]:
# Install Spark NLP
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash
# Install contraction module
!pip install contractions
# Install the transformers module from HuggingFace
!pip install -q transformers
# Install the explode library
!pip install explode

**Importing Packages**


---



In [None]:
import os
import googleapiclient.discovery
import re
import sparknlp
import contractions
import explode

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable

from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from sparknlp.annotator import *

from datetime import datetime

from transformers import pipeline

**Spark Session Instantiation (GPU accelerated session)**


---



In [None]:
spark = SparkSession.builder\
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","12G")\
    .config("spark.mongodb.read.connection.uri", "Your mongodb cluster URL")\
    .config("spark.mongodb.write.connection.uri", "Your mongodb cluster URL")\
    .config("spark.mongodb.write.maxBatchSize", 4096)\
    .config("spark.mongodb.write.ordered", False)\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1,com.johnsnowlabs.nlp:spark-nlp-gpu_2.12:4.4.3")\
    .getOrCreate()

spark.version

'3.2.3'

In [None]:
# Mounting google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 1) Data Extraction : Youtube Comment Scrapping



**Youtube Comments Scrapping Function**


---



In [None]:
#------------- Youtube comments scrapping function -------------#
def get_comment(counter, video_id):
    # Bypassing the https verification.
    # *DO NOT* leave this option enabled in production.
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

    # Specifiying the API name we want to use from Google.
    api_service_name = "youtube"
    # Specifying the API version.
    api_version = "v3"
    # Specifying the API key.
    DEVELOPER_KEY = "Insert Your Developer Key"

    # Building the initial part of the https request.
    # To build this part of the request : https://www.googleapis.com/youtube/v3
    youtube = googleapiclient.discovery.build(
        api_service_name, api_version, developerKey = DEVELOPER_KEY)

    # Specifying which resource & method to be used for the request.
    request = youtube.commentThreads().list(
        videoId=video_id,
        part="snippet",
        maxResults=100,
        order="relevance"
    )

    # Variable to store all the top-level comments of a particular video
    response_all = []

    # Sending the initial request
    response = request.execute()
    # Getting all the comment from the initial response (100 comments)
    for i in range(len(response["items"])):
      response_all.append({
          "_id" : counter[0],
          "videoId" : video_id,
          "userId" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"],
          "text" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["textOriginal"],
          "date" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
      })
      counter[0] = counter[0] + 1

    # Loop to retrieve the subsequent top-level comments
    while True:
      if "nextPageToken" in response.keys():
          pg_token = response["nextPageToken"]
          request = youtube.commentThreads().list(
              videoId=video_id,
              part="snippet",
              pageToken=pg_token,
              maxResults=100,
              order="relevance"
          )
          response = request.execute()
          for i in range(len(response["items"])):
            response_all.append({
                "_id" : counter[0],
                "videoId" : video_id,
                "userId" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"],
                "text" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["textOriginal"],
                "date" : response["items"][i]["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
            })
            counter[0] = counter[0] + 1
      else:
        break

    return response_all

**Scrapping Youtube Comments**


---



In [None]:
# Comments retrieval Iphone-14
counter = [0]
comment = []
comment = get_comment(counter, "SdLShOCvVeM")
comment = comment + get_comment(counter, "KLPZzf-wwlE")
comment = comment + get_comment(counter, "qDMY_n5b348")
comment = comment + get_comment(counter, "pTCgWVjB6UE")
comment = comment + get_comment(counter, "-E0iNG6uTxk")
comment = comment + get_comment(counter, "3NjpX5TBajY")
comment = comment + get_comment(counter, "P3Q5XYacz5E")
comment = comment + get_comment(counter, "NCe4WpoA03Q")
comment = comment + get_comment(counter, "oTtbIf1TfL8")
comment = comment + get_comment(counter, "VcgE32zVPLw")

# Printing how many comments are succesfully retrieved
print(counter[0])
df = spark.createDataFrame(comment)

In [None]:
# Comments retrieval Iphone-15
counter = [0]
comment = []
comment = get_comment(counter, "tzHLhlBMp6o")
comment = comment + get_comment(counter, "O3y-MdGwhrk")
comment = comment + get_comment(counter, "Sfs1uX5coyI")
comment = comment + get_comment(counter, "VIe7MPPwCaA")
comment = comment + get_comment(counter, "vchqdJdLB3w")
comment = comment + get_comment(counter, "yUWZsuFQwiE")
comment = comment + get_comment(counter, "BTJR-tZ_hmM")

# Printing how many comments are succesfully retrieved
print(counter[0])
df = spark.createDataFrame(comment)

**⬆️ Writing The Dataframe To a MongoDB Collection [WRITE]**


---



In [None]:
# Iphone 14 Pro dataset
df.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone14_raw")\
        .save()

In [None]:
# Iphone 15 dataset
df.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_raw")\
        .save()

# 2) Data Preprocessing
All operations performed :
- Case folding (lowercase)
- Links removal
- Duplicate comments removal
- Contraction handling
- Special character & number removal
- Document assembler (For spark NLP)
- Language detection & filtering (only english)
- Tokenizing
- Spell checking
- Stop words removal
- Lemmatization




**⬇️ Loading the collection into a dataframe [READ]**


---



In [None]:
# Iphone 14 dataset
df = spark.read.format("mongodb")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone14_raw")\
        .load()

In [None]:
# Iphone 15 dataset
df = spark.read.format("mongodb")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_raw")\
        .load()

**Custom Transformers Definition**


---



In [None]:
class TextLower(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)
  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    return df.withColumn(output_col, lower(df[input_col]))

In [None]:
class RemoveLinks(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'

    return df.filter(regexp_extract(df[input_col], url_pattern, 0) == '')

In [None]:
class RemoveDuplicates(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    return df.dropDuplicates([input_col])

In [None]:
class RemoveDuplicates(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    return df.dropDuplicates([input_col])

In [None]:
class SymbolRemoval(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    df = df.withColumn(output_col, regexp_replace(input_col, '[^a-z\s]', '')) \
              .withColumn(output_col, regexp_replace(input_col, '[:;=]-?[\)D\(\[\]pP\{\}]|<3|[\*\']?[:;=8][\-o\*\']?[\)D\]\[\\\/\|\(\)pP3\{\}]', '')) \
              .withColumn(output_col, regexp_replace(input_col, '\s+|\n', ' '))

    return df.filter(~((df[output_col] == '') | (df[output_col] == ' ')))

In [None]:
class FilterEN(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    return df.filter(df[output_col]["result"][0].contains('en'))\
            .drop(df[output_col])

In [None]:
class FinalCleansing(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self, inputCol=None, outputCol=None):
      super().__init__()
      kwargs = self._input_kwargs
      self.setParams(**kwargs)

  @keyword_only
  def setParams(self, inputCol=None, outputCol=None):
      kwargs = self._input_kwargs
      return self._set(**kwargs)

  def setInputCol(self, new_inputCol):
      return self.setParams(inputCol=new_inputCol)

  def setOutputCol(self, new_outputCol):
      return self.setParams(outputCol=new_outputCol)

  def _transform(self, df):
    if not self.isSet("inputCol"):
        raise ValueError("No input col is defined!")

    input_col = self.getInputCol()
    output_col = self.getOutputCol()

    df = df.drop(df.document)\
            .drop(df.token)\
            .drop(df.stop_word_clean)\
            .drop(df.spell_checked)\
            .withColumn("lemma_new", df.lemma.result)\
            .drop(df.lemma)\
            .withColumnRenamed("lemma_new", "lemma")

    return df

**Transformers Insantiation**


---



In [None]:
# User-defined transformers
textLower = TextLower(inputCol="text", outputCol="text")
removeLink = RemoveLinks(inputCol="text", outputCol="text")
removeDuplicates = RemoveDuplicates(inputCol="text", outputCol="text")
contractionHandling = ContractionHandling(inputCol="text", outputCol="text")
symbolRemoval = SymbolRemoval(inputCol="text", outputCol="text")
filterEN = FilterEN(inputCol="language", outputCol="language")
finalCleansing = FinalCleansing(inputCol="text", outputCol="text")

# Pretrained transformers (annotator) from sparkNLP
documentAssembler = DocumentAssembler()\
                    .setInputCol("text")\
                    .setOutputCol("document")

tokenizer = Tokenizer() \
            .setInputCols("document") \
            .setOutputCol("token")

spellChecker = NorvigSweetingModel.pretrained() \
            .setInputCols("token") \
            .setOutputCol("spell_checked")

stopWordsCleaner = StopWordsCleaner.pretrained()\
                  .setInputCols("spell_checked")\
                  .setOutputCol("stop_word_clean")\
                  .setCaseSensitive(False)

lemmatizer = LemmatizerModel.pretrained() \
            .setInputCols("stop_word_clean") \
            .setOutputCol("lemma")

languageDetection = LanguageDetectorDL.pretrained("ld_wiki_tatoeba_cnn_375", "xx")\
                    .setInputCols("document")\
                    .setOutputCol("language")

**Pipeline Insantiation & Implementation**


---



In [None]:
# Pipeline object instatiation
pipeline = Pipeline() \
    .setStages([
      textLower,
      removeLink,
      removeDuplicates,
      contractionHandling,
      symbolRemoval,
      documentAssembler,
      languageDetection,
      filterEN,
      tokenizer,
      spellChecker,
      stopWordsCleaner,
      lemmatizer,
      finalCleansing
    ])

In [None]:
# Using the pipeline
df_lemma = pipeline.fit(df).transform(df)

**⬆️ Writing the dataframe to a MongoDB collection [WRITE]**


---



In [None]:
# Iphone 14 dataset
df_lemma.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_cleaned")\
        .save()

In [None]:
# Iphone 15 dataset
df_lemma.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_cleaned")\
        .save()

# 3) Initial Data Labeling With HuggingFace (Only for iPhone 14 dataset)

**⬇️ Loading a collection into a dataframe [READ]**


---



In [None]:
# Iphone 14 dataset
df = spark.read.format("mongodb")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone14_cleaned")\
        .load()

**Labeling Process**

---



In [None]:
# Downloading a pretrained sentiment analysis model from huggingface
model_path = "cardiffnlp/twitter-roberta-base-sentiment-latest"

In [None]:
# Instantiating the pipeline
sentiment_pipeline = pipeline("sentiment-analysis", model=model_path, tokenizer=model_path, max_length=512, truncation=True)

In [None]:
# Labeling the data with the pretrained pipeline
labels = []
for item in df_lemma.collect():
  result = sentiment_pipeline(item.text)
  if result[0]["label"] == "negative" :
    labels.append([item._id, 0.0])
  elif result[0]["label"] == "positive":
    labels.append([item._id, 1.0])
  else:
    labels.append([item._id, 2.0])

**Appending The Labels To The Existing Dataframe**


---



In [None]:
# Combining the label dataframe with the dataset dataframe
df_label = spark.createDataFrame(labels, schema=["_id", "label"])
df_labeled = df_lemma.join(df_label, on="_id", how="inner")

In [None]:
# Throwing out the comments which has a neutral sentiment
df_labeled = df_labeled.filter((df_labeled.label == 0.0) | (df_labeled.label == 1.0))

**⬆️ Writing the dataframe to a MongoDB collection [WRITE]**


---



In [None]:
# Iphone 14 dataset
df_labeled.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone14_labeled")\
        .save()

# 4) Exploratory Data Analysis & Data Experiment



In [None]:
# Count How Many Unique Words
df_counter = df_labeled.select(explode('lemma').alias('word'))
df_counter_2 = df_counter.groupBy('word').count()
print(df_counter_2.count())

In [None]:
# Counting amounts of comments of each label
print("Negative comments : ", df_labeled.filter(col('label') == 0.0).count())
print("Positive comments : ", df_labeled.filter(col('label') == 1.0).count())
print("Neutral comments : ", df_labeled.filter(col('label') == 2.0).count())

#5) Data Preparation (Feature Extraction & Data Splitting)


**⬇️ Loading the collection into a dataframe [READ]**


---



In [None]:
# Iphone 14 dataset
df_prepare = spark.read.format("mongodb")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone14_labeled")\
        .load()

In [None]:
# Iphone 15 dataset
df_prepare = spark.read.format("mongodb")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_cleaned")\
        .load()

**Feature Extraction : HashingTF**

---



In [None]:
# Evaluating TF score of each term in the comments through HashingTF.
hashingTF = HashingTF(inputCol="lemma", outputCol="rawFeatures", numFeatures=8192)
df_tf = hashingTF.transform(df_prepare)

**Feature Extraction : IDF**

---



In [None]:
# Normalizing TF score of each term in the comment through IDF.
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df_tf)
df_idf = idf_model.transform(df_tf)

**Splitting The Dataset Into Training and Testing Dataset (80/20 Split)**

---



In [None]:
# Data splitting with 80/20 ratio
(df_training, df_testing) = df_idf.randomSplit([0.8, 0.2], seed=2023)

# 6) Model Training

**Multinomial Naive Bayes Model**


---



In [None]:
# Insantiating the Multinomial Naive Bayes model (estimator) from Spark ML
mnb_estimator = NaiveBayes(modelType="multinomial", featuresCol="features", labelCol="label")

# Training the model with the training dataset
mnb_model = mnb_estimator.fit(df_training)

In [None]:
# Testing the model with the testing dataset
df_prediction = mnb_model.transform(df_testing)

#7) Model Evaluation (Automatic)


**Multi-Class Evaluator**


---



In [None]:
# Using the multi-class evaluator to evaluate the model accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(df_prediction)
print("Model accuracy : ", accuracy)

Model accuracy :  0.8415545590433483


**Binary Evaluator**


---



In [None]:
# Using the binary evaluator to evaluate the model accuracy (through AUC score)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc_score = evaluator.evaluate(df_prediction)
print("AUC score : ", auc_score)

AUC score :  0.8400790221014941


**Saving the model**


---



In [None]:
# Saving the trained model to google drive
mnb_model.write().overwrite().save("/content/drive/MyDrive/Model")

# 8) Predicting iPhone 15 dataset label with the trained multi-nomial naive bayes model


**Loading the model**


---



In [None]:
mnb_model = NaiveBayesModel.load("/content/drive/MyDrive/Model/Model")

**Label Prediction**


---







In [None]:
# Using the model to predict the sentiment of iPhone 15 dataset
df_prediction = mnb_model.transform(df_idf)

**Tidying Up**


---



In [None]:
# Removing unnecesary column from dataframe
df_prediction = df_prediction.drop(col("features"))\
                .drop(col("probability"))\
                .drop(col("rawPrediction"))\
                .drop(col("rawFeatures"))

**Saving the labeled dataset**


---



In [None]:
# Saving to a mongodb collection
df_prediction.write.format("mongodb")\
        .mode("append")\
        .option("database","db_analisis_sentimen")\
        .option("collection", "comment_iphone15_labeled")\
        .save()

In [None]:
# Saving to a csv file (saved at a drive directory)
# Only saving the id, date, and prediction columns
df_prediction.select('_id', 'date', 'prediction')\
              .sort(col('_id'), ascending = True)\
              .write.format("csv")\
              .option("header", "true")\
              .save("/content/drive/MyDrive/Dataset/comment_iphone15_labeled.csv")