In [1]:
from inspect import signature
from transformers import pipeline
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
import pandas as pd
from pyspark.sql.functions import col, struct, pandas_udf, size, count
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import FloatType, StringType, StructField, StructType, ArrayType
from pyspark.sql.functions import lower, regexp_replace, split, explode, array_contains, udf
import spacy

2024-06-25 09:28:08.946279: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-06-25 09:28:09.128456: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
spark = SparkSession.builder \
    .appName("CommentBrandRecognition") \
    .getOrCreate()
spark


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/25 09:28:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Assuming your comments are in a CSV file named 'comments.csv'
df = spark.read.csv('comments.csv', header=True)  # Assuming header row exists

# Replace 'comment_text' with the actual column name containing your comments
df = df.select('comment')  # Select only the comment column
df.show(20)

+--------------------+
|             comment|
+--------------------+
|                Help|
|Top 5&#39;s are A...|
|Nokia was the pri...|
|how do you make t...|
|🇺🇸🤞🇰🇷:1🇯🇵:...|
|Starbucks only +12😂|
|I still don&#39;t...|
|Samsung!! made by...|
|Google: Growing<b...|
|Apple has got roc...|
|2018 tk tesla ka ...|
|I am preparing su...|
|             Nokia❤️|
|       Apple💣💣💣❤️|
|Is bad samsung is...|
|I am statistic lo...|
|  Name of this song?|
|when Apple appear...|
|"<a href=""https:...|
|If it&#39;s upto ...|
+--------------------+
only showing top 20 rows



In [4]:
df.count()

8565

In [5]:
nlp = spacy.load("en_core_web_sm") 

In [6]:

# Define UDF for spaCy NER
def spacy_brand_recognition(text):
  """
  UDF that uses spaCy to identify brands (ORG entities) in text.

  Args:
      text: A string containing the comment text.

  Returns:
      A list of brand names identified in the text (empty list if none found).
  """
  doc = nlp(text)
  brands = [ent.text for ent in doc.ents if ent.label_ == "ORG"]
  return brands

In [7]:
# Register UDF (optional, but improves readability)
spark.udf.register("spacy_brand_recognition", spacy_brand_recognition, ArrayType(StringType()))


<function __main__.spacy_brand_recognition(text)>

In [8]:
# Preprocess text data
preprocessed_df = df.withColumn("comment", lower(col("comment"))) \
  .withColumn("comment", regexp_replace(col("comment"), r"[^\w\s]", "")) \
  .withColumn("words", split(col("comment"), " "))


In [9]:
brands_df = preprocessed_df.withColumn(
    "brands",
    udf(spacy_brand_recognition, ArrayType(StringType()))(col("comment"))
)


In [10]:
# Show the first 5 rows of the DataFrame with identified brands (adjust the number as needed)
print("Preprocessed DataFrame with Identified Brands:")
brands_df.show(100)

Preprocessed DataFrame with Identified Brands:


[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|             comment|               words|              brands|
+--------------------+--------------------+--------------------+
|                help|              [help]|                  []|
|top 539s are amer...|[top, 539s, are, ...|                  []|
|nokia was the pri...|[nokia, was, the,...|             [nokia]|
|how do you make t...|[how, do, you, ma...|                  []|
|                 112|               [112]|                  []|
|   starbucks only 12|[starbucks, only,...|                  []|
|i still don39t un...|[i, still, don39t...|                  []|
|samsung made by s...|[samsung, made, b...|           [samsung]|
|google growingbra...|[google, growingb...|            [google]|
|apple has got roc...|[apple, has, got,...|             [apple]|
|2018 tk tesla ka ...|[2018, tk, tesla,...|                  []|
|i am preparing su...|[i, am, preparing...|                  []|
|               nokia|   

                                                                                

In [11]:
# Filter rows where the size of the brands array is greater than 0
filtered_df = brands_df.filter(size(col("brands")) > 0)

In [12]:
filtered_df.count()

24/06/25 09:28:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

2296

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from textblob import TextBlob


In [14]:
def analyze_sentiment_udf(comment):
  """
  Analyzes the sentiment of a comment using TextBlob.

  Args:
      comment (str): The comment text to analyze.

  Returns:
      int:
          1: Positive sentiment
          0: Neutral sentiment
          -1: Negative sentiment
  """
  analysis = TextBlob(comment)
  if analysis.sentiment.polarity > 0:
    return 1
  elif analysis.sentiment.polarity == 0:
    return 0
  else:
    return -1

# Create a User Defined Function (UDF)
analyze_sentiment = udf(analyze_sentiment_udf, IntegerType())


In [15]:
# Apply the sentiment analysis UDF to the 'comment_text' column
sentiment_df = filtered_df.withColumn('category', analyze_sentiment(col('comment')))

In [16]:
# Print the first few rows to see the sentiment categories
sentiment_df.show(100)  # Show first 20 rows


[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------+
|             comment|               words|              brands|category|
+--------------------+--------------------+--------------------+--------+
|nokia was the pri...|[nokia, was, the,...|             [nokia]|       0|
|samsung made by s...|[samsung, made, b...|           [samsung]|       0|
|google growingbra...|[google, growingb...|            [google]|       0|
|apple has got roc...|[apple, has, got,...|             [apple]|       0|
|               nokia|             [nokia]|             [nokia]|       0|
|               apple|             [apple]|             [apple]|       0|
|when apple appear...|[when, apple, app...|             [apple]|      -1|
|a hrefhttpswwwyou...|[a, hrefhttpswwwy...|[the john cena, a...|       0|
|         apple sucks|      [apple, sucks]|             [apple]|      -1|
|google  cocomelon...|[google, , cocome...|            [google]|       0|
|        samsung good|     [samsung, g

                                                                                

In [17]:
from pyspark.sql import SparkSession, functions as F

In [18]:
# Explode the 'brands' array to create separate rows for each brand
df_exploded = sentiment_df.withColumn("brand", F.explode_outer("brands"))

df_exploded.show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------+--------------------+
|             comment|               words|              brands|category|               brand|
+--------------------+--------------------+--------------------+--------+--------------------+
|nokia was the pri...|[nokia, was, the,...|             [nokia]|       0|               nokia|
|samsung made by s...|[samsung, made, b...|           [samsung]|       0|             samsung|
|google growingbra...|[google, growingb...|            [google]|       0|              google|
|apple has got roc...|[apple, has, got,...|             [apple]|       0|               apple|
|               nokia|             [nokia]|             [nokia]|       0|               nokia|
|               apple|             [apple]|             [apple]|       0|               apple|
|when apple appear...|[when, apple, app...|             [apple]|      -1|               apple|
|a hrefhttpswwwyou...|[a, hrefhttpswwwy...|[the jo

                                                                                

In [23]:

# Calculate total category sum for each brand
total_category_sum_by_brand = df_exploded.select("brand", "category").groupBy("brand").agg(
    F.sum("category").alias("total_category_sum")
)

# Calculate positive category sum for each brand (filter and sum)
positive_category_sum_by_brand = df_exploded.select("brand", "category").where(F.col("category") > 0).groupBy("brand").agg(
    F.sum("category").alias("positive_category_sum")
)

# Handle brands with no positive categories by filling with 0
positive_category_sum_by_brand = positive_category_sum_by_brand.withColumn("positive_category_sum", F.coalesce(F.col("positive_category_sum"), F.lit(0)))

# Calculate negative category sum for each brand (filter and sum)
negative_category_sum_by_brand = df_exploded.select("brand", "category").where(F.col("category") < 0).groupBy("brand").agg(
    F.sum("category").alias("negative_category_sum")
)

# Handle brands with no negative categories by filling with 0
negative_category_sum_by_brand = negative_category_sum_by_brand.withColumn("negative_category_sum", F.coalesce(F.col("negative_category_sum"), F.lit(0)))

# Join the three DataFrames to get all columns for each brand
brand_category_sum = total_category_sum_by_brand.join(positive_category_sum_by_brand, on="brand", how="outer").join(negative_category_sum_by_brand, on="brand", how="outer")

# Display the result
brand_category_sum.show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+------------------+---------------------+---------------------+
|               brand|total_category_sum|positive_category_sum|negative_category_sum|
+--------------------+------------------+---------------------+---------------------+
|                175x|                 1|                    1|                 NULL|
|            310apple|                 0|                 NULL|                 NULL|
|airlines airlines...|                 0|                 NULL|                 NULL|
|amazing brands ra...|                -1|                 NULL|                   -1|
|              amazon|                11|                   18|                   -7|
|amazona hrefhttps...|                 0|                 NULL|                 NULL|
|     american brands|                 1|                    1|                 NULL|
|american girlbr5 ...|                 1|                    1|                 NULL|
|                 anb|                 1|             

                                                                                

In [61]:
brand_category_sum.count()

                                                                                

369

In [25]:
brand_category_sum.write.option("header", True).mode("overwrite").csv("brand_category_sum.csv")


                                                                                

In [63]:
spark.stop()