In [0]:
# Check the contents in tables folder
display(dbutils.fs.ls("dbfs:/user/hive/warehouse"))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/reddit_data/,reddit_data/,0,1701248003091
dbfs:/user/hive/warehouse/twitter_data/,twitter_data/,0,1701248003091


In [0]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf

# conf = SparkConf().setAppName('data_processing').setMaster("local[1]")
# sc = SparkContext(conf = conf)
# sql_context = SQLContext(sc)

spark = SparkSession.builder.appName("data_processing").getOrCreate()
spark

In [0]:
reddit_cvs_path = "dbfs:/user/hive/warehouse/reddit_data"
twitter_cvs_path = "dbfs:/user/hive/warehouse/twitter_data"

In [0]:
#sparkr.read.format('delta').load

# reddit = spark.read.option("header", "true").option("inferSchema", "true").option("multiline", "true").option("escape", "\"").format('delta').load(reddit_cvs_path)
# twitter = spark.read.option("header", "true").option("inferSchema", "true").option("multiline", "true").option("escape", "\"").format('delta').load(twitter_cvs_path)


#sparkr.read.format('delta').load

reddit = spark.read.format('delta').load(reddit_cvs_path)
twitter = spark.read.format('delta').load(twitter_cvs_path)

In [0]:
reddit = reddit.withColumnRenamed("clean_comment", "text")
twitter = twitter.withColumnRenamed("clean_text", "text")

In [0]:
reddit.show(5)

+--------------------+--------+
|                text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
+--------------------+--------+
only showing top 5 rows



In [0]:
print(reddit.select("text").count())
print(reddit.select("category").count())
# 37249
# 37249

37249
37249


In [0]:
twitter.show(5)

+--------------------+--------+
|                text|category|
+--------------------+--------+
|when modi promise...|      -1|
|talk all the nons...|       0|
|what did just say...|       1|
|asking his suppor...|       1|
|answer who among ...|       1|
+--------------------+--------+
only showing top 5 rows



In [0]:
print(twitter.select("text").count())
print(twitter.select("category").count())
# 162973
# 162973

162973
162973


In [0]:
reddit.printSchema()
twitter.printSchema()

root
 |-- text: string (nullable = true)
 |-- category: integer (nullable = true)

root
 |-- text: string (nullable = true)
 |-- category: integer (nullable = true)



In [0]:
# Data Processing
# Remove HTLM tags /n
# Remove Any non-english character
# Remove extra white space
# Remove any character outside the ASCII range
# Remove any null value



In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import StopWordsRemover, Tokenizer, Word2Vec
import re
from pyspark.sql.functions import regexp_replace, col, trim, concat_ws

In [0]:
def lowercased(text):
    return text.lower()

lowercased_udf = spark.udf.register("lowercased_udf", lowercased)
reddit = reddit.withColumn("text", reddit["text"].cast("string"))
twitter = twitter.withColumn("text", twitter["text"].cast("string"))

In [0]:
abbreviations_mapping = {
    "$" : " dollar ",
    "€" : " euro ",
    "4ao" : "for adults only",
    "a.m" : "before midday",
    "a3" : "anytime anywhere anyplace",
    "aamof" : "as a matter of fact",
    "acct" : "account",
    "adih" : "another day in hell",
    "afaic" : "as far as i am concerned",
    "afaict" : "as far as i can tell",
    "afaik" : "as far as i know",
    "afair" : "as far as i remember",
    "afk" : "away from keyboard",
    "app" : "application",
    "approx" : "approximately",
    "apps" : "applications",
    "asap" : "as soon as possible",
    "asl" : "age, sex, location",
    "atk" : "at the keyboard",
    "ave." : "avenue",
    "aymm" : "are you my mother",
    "ayor" : "at your own risk",
    "b&b" : "bed and breakfast",
    "b+b" : "bed and breakfast",
    "b.c" : "before christ",
    "b2b" : "business to business",
    "b2c" : "business to customer",
    "b4" : "before",
    "b4n" : "bye for now",
    "b@u" : "back at you",
    "bae" : "before anyone else",
    "bak" : "back at keyboard",
    "bbbg" : "bye bye be good",
    "bbc" : "british broadcasting corporation",
    "bbias" : "be back in a second",
    "bbl" : "be back later",
    "bbs" : "be back soon",
    "be4" : "before",
    "bfn" : "bye for now",
    "blvd" : "boulevard",
    "bout" : "about",
    "brb" : "be right back",
    "bros" : "brothers",
    "brt" : "be right there",
    "bsaaw" : "big smile and a wink",
    "btw" : "by the way",
    "bwl" : "bursting with laughter",
    "c/o" : "care of",
    "cet" : "central european time",
    "cf" : "compare",
    "cia" : "central intelligence agency",
    "csl" : "can not stop laughing",
    "cu" : "see you",
    "cul8r" : "see you later",
    "cv" : "curriculum vitae",
    "cwot" : "complete waste of time",
    "cya" : "see you",
    "cyt" : "see you tomorrow",
    "dae" : "does anyone else",
    "dbmib" : "do not bother me i am busy",
    "diy" : "do it yourself",
    "dm" : "direct message",
    "dwh" : "during work hours",
    "e123" : "easy as one two three",
    "eet" : "eastern european time",
    "eg" : "example",
    "embm" : "early morning business meeting",
    "encl" : "enclosed",
    "encl." : "enclosed",
    "etc" : "and so on",
    "faq" : "frequently asked questions",
    "fawc" : "for anyone who cares",
    "fb" : "facebook",
    "fc" : "fingers crossed",
    "fig" : "figure",
    "fimh" : "forever in my heart",
    "ft." : "feet",
    "ft" : "featuring",
    "ftl" : "for the loss",
    "ftw" : "for the win",
    "fwiw" : "for what it is worth",
    "fyi" : "for your information",
    "g9" : "genius",
    "gahoy" : "get a hold of yourself",
    "gal" : "get a life",
    "gcse" : "general certificate of secondary education",
    "gfn" : "gone for now",
    "gg" : "good game",
    "gl" : "good luck",
    "glhf" : "good luck have fun",
    "gmt" : "greenwich mean time",
    "gmta" : "great minds think alike",
    "gn" : "good night",
    "g.o.a.t" : "greatest of all time",
    "goat" : "greatest of all time",
    "goi" : "get over it",
    "gps" : "global positioning system",
    "gr8" : "great",
    "gratz" : "congratulations",
    "gyal" : "girl",
    "h&c" : "hot and cold",
    "hp" : "horsepower",
    "hr" : "hour",
    "hrh" : "his royal highness",
    "ht" : "height",
    "ibrb" : "i will be right back",
    "ic" : "i see",
    "icq" : "i seek you",
    "icymi" : "in case you missed it",
    "idc" : "i do not care",
    "idgadf" : "i do not give a damn fuck",
    "idgaf" : "i do not give a fuck",
    "idk" : "i do not know",
    "ie" : "that is",
    "i.e" : "that is",
    "ifyp" : "i feel your pain",
    "IG" : "instagram",
    "iirc" : "if i remember correctly",
    "ilu" : "i love you",
    "ily" : "i love you",
    "imho" : "in my humble opinion",
    "imo" : "in my opinion",
    "imu" : "i miss you",
    "iow" : "in other words",
    "irl" : "in real life",
    "j4f" : "just for fun",
    "jic" : "just in case",
    "jk" : "just kidding",
    "jsyk" : "just so you know",
    "l8r" : "later",
    "lb" : "pound",
    "lbs" : "pounds",
    "ldr" : "long distance relationship",
    "lmao" : "laugh my ass off",
    "lmfao" : "laugh my fucking ass off",
    "lol" : "laughing out loud",
    "ltd" : "limited",
    "ltns" : "long time no see",
    "m8" : "mate",
    "mf" : "motherfucker",
    "mfs" : "motherfuckers",
    "mfw" : "my face when",
    "mofo" : "motherfucker",
    "mph" : "miles per hour",
    "mr" : "mister",
    "mrw" : "my reaction when",
    "ms" : "miss",
    "mte" : "my thoughts exactly",
    "nagi" : "not a good idea",
    "nbc" : "national broadcasting company",
    "nbd" : "not big deal",
    "nfs" : "not for sale",
    "ngl" : "not going to lie",
    "nhs" : "national health service",
    "nrn" : "no reply necessary",
    "nsfl" : "not safe for life",
    "nsfw" : "not safe for work",
    "nth" : "nice to have",
    "nvr" : "never",
    "nyc" : "new york city",
    "oc" : "original content",
    "og" : "original",
    "ohp" : "overhead projector",
    "oic" : "oh i see",
    "omdb" : "over my dead body",
    "omg" : "oh my god",
    "omw" : "on my way",
    "p.a" : "per annum",
    "p.m" : "after midday",
    "pm" : "prime minister",
    "poc" : "people of color",
    "pov" : "point of view",
    "pp" : "pages",
    "ppl" : "people",
    "prw" : "parents are watching",
    "ps" : "postscript",
    "pt" : "point",
    "ptb" : "please text back",
    "pto" : "please turn over",
    "qpsa" : "what happens", #"que pasa",
    "ratchet" : "rude",
    "rbtl" : "read between the lines",
    "rlrt" : "real life retweet",
    "rofl" : "rolling on the floor laughing",
    "roflol" : "rolling on the floor laughing out loud",
    "lol" : "laughing out loud",
    "rotflmao" : "rolling on the floor laughing my ass off",
    "rt" : "retweet",
    "ruok" : "are you ok",
    "sfw" : "safe for work",
    "sk8" : "skate",
    "smh" : "shake my head",
    "sq" : "square",
    "srsly" : "seriously",
    "ssdd" : "same stuff different day",
    "tbh" : "to be honest",
    "tbs" : "tablespooful",
    "tbsp" : "tablespooful",
    "tfw" : "that feeling when",
    "thks" : "thank you",
    "tho" : "though",
    "thx" : "thank you",
    "tia" : "thanks in advance",
    "til" : "today i learned",
    "tl;dr" : "too long i did not read",
    "tldr" : "too long i did not read",
    "tmb" : "tweet me back",
    "tntl" : "trying not to laugh",
    "ttyl" : "talk to you later",
    "u" : "you",
    "u2" : "you too",
    "u4e" : "yours for ever",
    "utc" : "coordinated universal time",
    "w/" : "with",
    "w/o" : "without",
    "w8" : "wait",
    "wassup" : "what is up",
    "wb" : "welcome back",
    "wtf" : "what the fuck",
    "wtg" : "way to go",
    "wtpa" : "where the party at",
    "wuf" : "where are you from",
    "wuzup" : "what is up",
    "wywh" : "wish you were here",
    "yd" : "yard",
    "ygtr" : "you got that right",
    "ynk" : "you never know",
    "zzz" : "sleeping bored and tired"
}

In [0]:
#Check if expantion process work

contains_lol = reddit.filter(col("text").contains("lol"))    #contain lol
contains_lol.show(20, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
for key, value in abbreviations_mapping.items():
    reddit = reddit.withColumn("text", regexp_replace(reddit.text, rf'\b{key}\b', value))
    twitter = twitter.withColumn("text", regexp_replace(twitter.text, rf'\b{key}\b', value))

reddit.show(5)

+--------------------+--------+
|                text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
+--------------------+--------+
only showing top 5 rows



In [0]:
#Check if expantion process work

contains_lol2 = reddit.filter(col("text").contains("lol")) #without lol
contains_lol2.show(20)

+--------------------+--------+
|                text|category|
+--------------------+--------+
|اللعنة عليك bismi...|       1|
|modi very divisiv...|       1|
| cnn ibn she had ...|      -1|
| team becomes tea...|       0|
|proof that you ar...|       0|
|you forgot put za...|      -1|
| alt lolguard app...|       0|
| sab andar lolgua...|       0|
|holy shit have mi...|      -1|
|wow someone the s...|       1|
|next lolguard kah...|       0|
| months back when...|      -1|
|       lolwa dollar |       0|
|our gdp numbers a...|       1|
|nice read little ...|      -1|
|onestamente non s...|       0|
|the current situa...|       1|
|google trends sho...|       0|
| thankful this st...|       1|
|the tail number t...|       0|
+--------------------+--------+
only showing top 20 rows



In [0]:
#Check if expantion process work

contains_laughing = reddit.filter(col("text").contains("laughing out loud")) #contain laughing out loud
contains_laughing.show(20)

+--------------------+--------+
|                text|category|
+--------------------+--------+
|laughing out loud...|       1|
|laughing out loud...|       1|
|well live brookly...|       1|
|laughing out loud...|       1|
|what missing jpg\...|       1|
| tell people from...|       1|
| congress are bas...|       1|
|kapil sibal zero ...|       1|
|critical issue la...|       1|
|laughing out loud...|       1|
|laughing out loud...|       1|
|let see 400 milli...|       1|
|dat self righteou...|       1|
|this infographic ...|       1|
|laughing out loud...|       1|
|true sense fear m...|       1|
|laughing out loud...|       1|
|laughing out loud...|       1|
| hey think the sa...|       1|
|laughing out loud...|       1|
+--------------------+--------+
only showing top 20 rows



In [0]:
#Check if expantion process work

contains_laughingss = reddit.filter(col("text").contains("the vietnam goalscorer didn even mean kick the ball that way")) #check if lol being expanded
contains_laughingss.show(20,truncate=False)

+--------------------------------------------------------------------------------------+--------+
|text                                                                                  |category|
+--------------------------------------------------------------------------------------+--------+
|laughing out loud the vietnam goalscorer didn even mean kick the ball that way dollar |1       |
+--------------------------------------------------------------------------------------+--------+



In [0]:
contraction_mapping ={"ain't": "is not", "aren't": "are not","can't": "cannot", "'cause": "because", "could've": "could have", "couldn't": "could not",

            "didn't": "did not", "doesn't": "does not", "don't": "do not", "hadn't": "had not", "hasn't": "has not", "haven't": "have not",

            "he'd": "he would","he'll": "he will", "he's": "he is", "how'd": "how did", "how'd'y": "how do you", "how'll": "how will", "how's": "how is",

            "I'd": "I would", "I'd've": "I would have", "I'll": "I will", "I'll've": "I will have","I'm": "I am", "I've": "I have", "i'd": "i would",

            "i'd've": "i would have", "i'll": "i will",  "i'll've": "i will have","i'm": "i am", "i've": "i have", "isn't": "is not", "it'd": "it would",

            "it'd've": "it would have", "it'll": "it will", "it'll've": "it will have","it's": "it is", "let's": "let us", "ma'am": "madam",

            "mayn't": "may not", "might've": "might have","mightn't": "might not","mightn't've": "might not have", "must've": "must have",

            "mustn't": "must not", "mustn't've": "must not have", "needn't": "need not", "needn't've": "need not have","o'clock": "of the clock",

            "oughtn't": "ought not", "oughtn't've": "ought not have", "shan't": "shall not", "sha'n't": "shall not", "shan't've": "shall not have",

            "she'd": "she would", "she'd've": "she would have", "she'll": "she will", "she'll've": "she will have", "she's": "she is",

            "should've": "should have", "shouldn't": "should not", "shouldn't've": "should not have", "so've": "so have","so's": "so as",

            "this's": "this is","that'd": "that would", "that'd've": "that would have", "that's": "that is", "there'd": "there would",

            "there'd've": "there would have", "there's": "there is", "here's": "here is","they'd": "they would", "they'd've": "they would have",

            "they'll": "they will", "they'll've": "they will have", "they're": "they are", "they've": "they have", "to've": "to have",

            "wasn't": "was not", "we'd": "we would", "we'd've": "we would have", "we'll": "we will", "we'll've": "we will have", "we're": "we are",

            "we've": "we have", "weren't": "were not", "what'll": "what will", "what'll've": "what will have", "what're": "what are",

            "what's": "what is", "what've": "what have", "when's": "when is", "when've": "when have", "where'd": "where did", "where's": "where is",

            "where've": "where have", "who'll": "who will", "who'll've": "who will have", "who's": "who is", "who've": "who have",

            "why's": "why is", "why've": "why have", "will've": "will have", "won't": "will not", "won't've": "will not have",

            "would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have", "y'all": "you all",

            "y'all'd": "you all would","y'all'd've": "you all would have","y'all're": "you all are","y'all've": "you all have",

            "you'd": "you would", "you'd've": "you would have", "you'll": "you will", "you'll've": "you will have",

            "you're": "you are", "you've": "you have"}

In [0]:
for key, value in contraction_mapping.items():
    reddit = reddit.withColumn("text", regexp_replace(reddit.text, key, value))
    twitter = twitter.withColumn("text", regexp_replace(twitter.text, key, value))

reddit.show(5)

+--------------------+--------+
|                text|category|
+--------------------+--------+
| family mormon ha...|       1|
|buddhism has very...|       1|
|seriously don say...|      -1|
|what you have lea...|       0|
|for your own bene...|       1|
+--------------------+--------+
only showing top 5 rows



In [0]:
# Remove non chararcter
reddit = reddit.withColumn("text", regexp_replace(reddit.text, '[^a-zA-Z0-9\\s]', ''))
twitter = twitter.withColumn("text", regexp_replace(twitter.text, '[^a-zA-Z0-9\\s]', ''))

In [0]:
# Remove parentheses and anything in it
reddit = reddit.withColumn("text", regexp_replace(reddit.text, r'\([^)]*\)', ''))
twitter = twitter.withColumn("text", regexp_replace(twitter.text, r'\([^)]*\)', ''))

In [0]:
# Remove any extra withe spaces
reddit = reddit.withColumn("text", regexp_replace(reddit.text, ' +', ' '))
twitter = twitter.withColumn("text", regexp_replace(twitter.text, ' +', ' '))

In [0]:
# Remvoe number
reddit = reddit.withColumn("text", regexp_replace(reddit.text, '[0-9]', ''))
twitter = twitter.withColumn("text", regexp_replace(twitter.text, '[0-9]', ''))

In [0]:
# Replace newline characters with a space
reddit = reddit.withColumn("text", regexp_replace(reddit.text, '\n', ' '))
twitter = twitter.withColumn("text", regexp_replace(twitter.text, '\n', ' '))

In [0]:
# Remove leading spaces

reddit = reddit.withColumn("text", trim(reddit.text))
twitter = twitter.withColumn("text", trim(twitter.text))

In [0]:
# Remove HTML tags from the given text

def remove_html_tags(text):

    if text is None:
        return ""
    clean_text = re.sub(r'<.*?>', '', text)
    return clean_text

remove_html_tags_udf = spark.udf.register("remove_html_tags_udf", remove_html_tags)

# Convert "text" column to string type
reddit = reddit.withColumn("text", reddit["text"].cast("string"))
twitter = twitter.withColumn("text", twitter["text"].cast("string"))

def lowercased(text):
    return text.lower()

lowercased_udf = spark.udf.register("lowercased_udf", lowercased)

reddit = reddit.withColumn("text", reddit["text"].cast("string"))
twitter = twitter.withColumn("text", twitter["text"].cast("string"))


In [0]:
def remove_non_english_chars(text):
    """Remove any non-English characters from the given text."""
    if text is not None:
        clean_text = re.sub(r'[^\x00-\x7F]+', '', text)
        return clean_text
    else:
        return None

remove_non_english_chars_udf = spark.udf.register("remove_non_english_chars_udf", remove_non_english_chars)

reddit = reddit.withColumn("text", reddit["text"].cast("string"))
reddit = reddit.withColumn("text", remove_non_english_chars_udf(reddit.text))

twitter = twitter.withColumn("text", twitter["text"].cast("string"))
twitter = twitter.withColumn("text", remove_non_english_chars_udf(twitter.text))

In [0]:
# For reddit Null check
for column in reddit.columns:
  null_count = reddit.where(col(column).isNull()).count()
  print(f"reddit Column '{column}' has {null_count} NULL values.")

# For twitter Null check
for column in twitter.columns:
  null_count = twitter.where(col(column).isNull()).count()
  print(f"twitter Column '{column}' has {null_count} NULL values.")


reddit Column 'text' has 99 NULL values.
reddit Column 'category' has 0 NULL values.
twitter Column 'text' has 1 NULL values.
twitter Column 'category' has 0 NULL values.


In [0]:
# Fill null value with empty_value for text column and -99 for category column
reddit = reddit.na.fill({'text': 'empty_value'})
reddit = reddit.na.fill({'category': '-99'})

twitter = twitter.na.fill({'text': 'empty_value'})
twitter = twitter.na.fill({'category': '-99'})

In [0]:
# Filtering out empty_value from text column and -99 from category column
reddit = reddit.filter(col("text") != "")
reddit = reddit.filter(col("category") != "-99")

twitter = twitter.filter(col("text") != "")
twitter = twitter.filter(col("category") != "-99")

In [0]:
# For reddit Null check again
for column in reddit.columns:
  null_count = reddit.where(col(column).isNull()).count()
  print(f"reddit Column '{column}' has {null_count} NULL values.")

# For twitter Null check again
for column in twitter.columns:
  null_count = twitter.where(col(column).isNull()).count()
  print(f"twitter Column '{column}' has {null_count} NULL values.")

reddit Column 'text' has 0 NULL values.
reddit Column 'category' has 0 NULL values.
twitter Column 'text' has 0 NULL values.
twitter Column 'category' has 0 NULL values.


In [0]:
category_value_check = reddit.filter(col("category").contains("-99")) #should be empty
category_value_check.show(20)

+----+--------+
|text|category|
+----+--------+
+----+--------+



In [0]:
# check the data size

print(reddit.select("text").count())
print(reddit.select("category").count())
# 36910

print(twitter.select("text").count())
print(twitter.select("category").count())

37024
37024
162967
162967


In [0]:
# split the text into token

tokenizer = Tokenizer(inputCol="text", outputCol="tokens")

reddit = tokenizer.transform(reddit)
twitter = tokenizer.transform(twitter)

# get a new dataframe xxx_token

reddit_token = reddit.select("tokens", "category")
twitter_token = twitter.select("tokens", "category")
# # Show the result

reddit_token.show(10, truncate=False)
twitter_token.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
reddit.select("tokens", "category").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Remove Stopwords

from pyspark.ml.feature import StopWordsRemover

# Download the stop words list
stop_words = StopWordsRemover.loadDefaultStopWords("english")

# Optionally, you can add custom stop words to the list
# stop_words.extend(["custom_stopword1", "custom_stopword2"])

# Create a StopWordsRemover instance
stop_words_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stop_words)

# Create a Pipeline to perform tokenization and stop words removal
#pipeline = Pipeline(stages=[tokenizer, stop_words_remover])

# Fit and transform the DataFrame
#reddit_data = pipeline.fit(reddit_data).transform(reddit_data)

#----------------------------------------------------------------------------------------
#reddit = stop_words_remover.transform(reddit)   This one works!!!!!!


reddit_token = stop_words_remover.transform(reddit_token)
twitter_token = stop_words_remover.transform(twitter_token)

reddit_token_filtered = reddit_token.select("filtered_tokens","category")
twitter_token_filtered = twitter_token.select("filtered_tokens","category")

reddit_token_filtered.show(20, truncate=False)
twitter_token_filtered.show(20, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# check null value for xxx_token_filtered column and category colum

for column in reddit_token_filtered.columns:
  null_count = reddit_token_filtered.where(col(column).isNull()).count()
  print(f"Column '{column}' has {null_count} NULL values.")

for column in twitter_token_filtered.columns:
  null_count = twitter_token_filtered.where(col(column).isNull()).count()
  print(f"Column '{column}' has {null_count} NULL values.")

Column 'filtered_tokens' has 0 NULL values.
Column 'category' has 0 NULL values.
Column 'filtered_tokens' has 0 NULL values.
Column 'category' has 0 NULL values.


In [0]:
# check empty vector for xxx_token_filtered column and category colum

for column in reddit_token_filtered.columns:
    if column == "filtered_tokens":  # Assuming "filtered_tokens" is the column containing lists
        empty_vector_count = reddit_token_filtered.filter(F.size(F.col(column)) == 0).count()
        print(f"Column '{column}' has {empty_vector_count} empty vectors.")
    else:
        null_count = reddit_token_filtered.where(F.col(column).isNull()).count()
        print(f"Column '{column}' has {null_count} NULL values.")

for column in twitter_token_filtered.columns:
    if column == "filtered_tokens":  # Assuming "filtered_tokens" is the column containing lists
        empty_vector_count = twitter_token_filtered.filter(F.size(F.col(column)) == 0).count()
        print(f"Column '{column}' has {empty_vector_count} empty vectors.")
    else:
        null_count = twitter_token_filtered.where(F.col(column).isNull()).count()
        print(f"Column '{column}' has {null_count} NULL values.")

Column 'filtered_tokens' has 50 empty vectors.
Column 'category' has 0 NULL values.
Column 'filtered_tokens' has 57 empty vectors.
Column 'category' has 0 NULL values.


In [0]:
from pyspark.sql.functions import col, when, size, array, lit

# For reddit
# Replace empty vectors in 'filtered_tokens' column with ["empty_value"]
reddit_token_filtered = reddit_token_filtered.withColumn(
    "filtered_tokens",
    when(size("filtered_tokens") == 0, array(lit("empty_value"))).otherwise(col("filtered_tokens"))
)

# Fill '-99' for 'category' column
reddit_token_filtered = reddit_token_filtered.na.fill({'category': '-99'})

# For twitter
# Replace empty vectors in 'filtered_tokens' column with ["empty_value"]
twitter_token_filtered = twitter_token_filtered.withColumn(
    "filtered_tokens",
    when(size("filtered_tokens") == 0, array(lit("empty_value"))).otherwise(col("filtered_tokens"))
)

# Fill '-99' for 'category' column
twitter_token_filtered = twitter_token_filtered.na.fill({'category': '-99'})

Drop empty vector

In [0]:
from pyspark.sql.functions import size, col

# Filter out rows with empty vectors in 'filtered_tokens' and '-99' in 'category'
reddit_token_filtered = reddit_token_filtered.filter((size(col("filtered_tokens")) > 0) & (col("category") != "-99"))
twitter_token_filtered = twitter_token_filtered.filter((size(col("filtered_tokens")) > 0) & (col("category") != "-99"))

In [0]:
print(reddit_token_filtered.select("filtered_tokens").count())
print(twitter_token_filtered.select("category").count())

37024
162967


In [0]:
weights = [0.8, 0.2]  # You can adjust these weights based on your requirement

# Perform the random split
reddit_train, reddit_test = reddit_token_filtered.randomSplit(weights, seed=42)  # Use a seed for reproducibility
twitter_train, twitter_test = twitter_token_filtered.randomSplit(weights, seed=42)  # Use a seed for reproducibility

# Show the results
reddit_train.show(10)
reddit_test.show(10)
twitter_train.show(10)
twitter_test.show(10)
#test.show(5,truncate=False)

+--------------------+--------+
|     filtered_tokens|category|
+--------------------+--------+
|[, , india, lifte...|      -1|
|[, , pages, origi...|       1|
|[, accurate, lear...|       1|
|[, amendments, co...|       1|
|[, bollywood, mov...|       1|
|[, congress, bad,...|      -1|
|[, elections, mod...|       1|
|[, everytime, gov...|      -1|
|  [, inr, nri, lose]|       0|
|[, karma, channel...|      -1|
+--------------------+--------+
only showing top 10 rows

+--------------------+--------+
|     filtered_tokens|category|
+--------------------+--------+
|[, , tiny, prince...|       1|
|[, budget, latest...|       1|
|[, correct, would...|       0|
|[, kazari, uiharu...|       1|
|[, notes, deposit...|       1|
|[, quality, liabi...|       0|
|[, times, richer,...|       1|
|[aaah, family, wh...|       0|
|            [aadhar]|       0|
|            [aadhar]|       0|
+--------------------+--------+
only showing top 10 rows

+--------------------+--------+
|     filtered_token

In [0]:
# print(reddit_train.select("filtered_tokens").count())
# print(reddit_test.select("category").count())

print(f"Number of filtered tokens are {reddit_train.select('filtered_tokens').count()}")
print(f"Number of category are {reddit_train.select('category').count()}")
print("-------------------------------------------------------------------------------------------")
print(f"Number of filtered tokens are {reddit_test.select('filtered_tokens').count()}")
print(f"Number of category are {reddit_test.select('category').count()}")

print("===========================================================================================")

print(f"Number of filtered tokens are {twitter_train.select('filtered_tokens').count()}")
print(f"Number of category are {twitter_train.select('category').count()}")
print("-------------------------------------------------------------------------------------------")
print(f"Number of filtered tokens are {twitter_test.select('filtered_tokens').count()}")
print(f"Number of category are {twitter_test.select('category').count()}")


Number of filtered tokens are 29629
Number of category are 29629
-------------------------------------------------------------------------------------------
Number of filtered tokens are 7395
Number of category are 7395
Number of filtered tokens are 130230
Number of category are 130230
-------------------------------------------------------------------------------------------
Number of filtered tokens are 32737
Number of category are 32737


In [0]:
reddit_token_filtered = reddit_token_filtered.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))
twitter_token_filtered = twitter_token_filtered.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))

reddit_train = reddit_train.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))
reddit_test = reddit_test.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))
twitter_train = twitter_train.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))
twitter_test = twitter_test.withColumn("clean_text",concat_ws(" ", col("filtered_tokens")))


In [0]:
reddit_test.show(5)

+--------------------+--------+--------------------+
|     filtered_tokens|category|          clean_text|
+--------------------+--------+--------------------+
|[, , tiny, prince...|       1|  tiny princely s...|
|[, budget, latest...|       1| budget latest pi...|
|[, correct, would...|       0| correct wouldn w...|
|[, kazari, uiharu...|       1| kazari uiharu ra...|
|[, notes, deposit...|       1| notes deposit ba...|
+--------------------+--------+--------------------+
only showing top 5 rows



In [0]:
reddit_token_filtered.select("clean_text","category").coalesce(1).write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/reddit_token_filtered.csv")
twitter_token_filtered.select("clean_text","category").coalesce(1).write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/twitter_token_filtered.csv")

reddit_train.coalesce(1).select("clean_text","category").write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/reddit_train.csv")
reddit_test.coalesce(1).select("clean_text","category").write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/reddit_test.csv")
twitter_train.coalesce(1).select("clean_text","category").write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/twitter_train.csv")
twitter_test.coalesce(1).select("clean_text","category").write.format("com.databricks.spark.csv").option("header","true").save("dbfs:/FileStore/tables/MSML651/twitter_test.csv")

In [0]:
input_path1 = "dbfs:/FileStore/tables/MSML651/reddit_test.csv"
input_path2 = "dbfs:/FileStore/tables/MSML651/reddit_train.csv"
input_path3 = "dbfs:/FileStore/tables/MSML651/twitter_test.csv"
input_path4 = "dbfs:/FileStore/tables/MSML651/twitter_train.csv"
input_path5 = "dbfs:/FileStore/tables/MSML651/reddit_token_filtered.csv"
input_path6 = "dbfs:/FileStore/tables/MSML651/twitter_token_filtered.csv"

In [0]:
in_data = spark.read.csv(input_path6, header=True)

In [0]:
display(in_data)

clean_text,category
modi promised minimum government maximum governance expected begin difficult job reforming state take years get justice state business exit psus temples dollar,-1
talk nonsense continue drama vote modi,0
say vote modi welcome bjp told rahul main campaigner modi think modi relax dollar,1
asking supporters prefix chowkidar names modi great service confusion read crustal clear crass filthy nonsensical see abuses coming chowkidars dollar,1
answer among powerful world leader today trump putin modi may,1
kiya though refresh maarkefir comment karo,0
surat women perform yagna seeks divine grace narendra modi become dollar,0
comes cabinet scholars like modi smriti hema time introspect dollar,0
upcoming election india saga going important pair look current modi leads govt elected deal brexit combination weekly looks juicy bears humble opinion,1
gandhi gay modi,1
