## Import Libraries

In [None]:
# Pyspark SQL
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import col, sum, udf
import pyspark.sql.functions as F

# Sentiment Analyzer
# !pip install vaderSentiment
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Pyspark Machine Learning
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


Collecting vaderSentiment
  Using cached vaderSentiment-3.3.2-py2.py3-none-any.whl (125 kB)
Installing collected packages: vaderSentiment
Successfully installed vaderSentiment-3.3.2
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-00d252a4-0060-4ba9-9749-c3bfd3dffd13/bin/python -m pip install --upgrade pip' command.[0m


## Load Data

#### Mount to weclouddata public dataset bucket

In [None]:
# Define a function to mount

def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

In [None]:
# Set my access key and secret access key
ACCESS_KEY = my access key
SECRET_ACCESS_KEY = my secret access key

In [None]:
# Mount the dataset
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata/twitter/", "twitter_folder")

In [None]:
# Explore the mounted folder
%fs ls /mnt/twitter_folder/

path,name,size,modificationTime
dbfs:/mnt/twitter_folder/AI/,AI/,0,0
dbfs:/mnt/twitter_folder/BankofCanada/,BankofCanada/,0,0
dbfs:/mnt/twitter_folder/BlackFriday/,BlackFriday/,0,0
dbfs:/mnt/twitter_folder/CERB/,CERB/,0,0
dbfs:/mnt/twitter_folder/CSIS/,CSIS/,0,0
dbfs:/mnt/twitter_folder/CanadaHousing/,CanadaHousing/,0,0
dbfs:/mnt/twitter_folder/ElonMusk/,ElonMusk/,0,0
dbfs:/mnt/twitter_folder/Flames/,Flames/,0,0
dbfs:/mnt/twitter_folder/Inflation/,Inflation/,0,0
dbfs:/mnt/twitter_folder/Interest_rate/,Interest_rate/,0,0


In [None]:
# file path for tweets about Black Friday on November 24th, 2022 
# Due to limited computing power, we will choose tweets from only the day before Black Friday (11/24).
filePath = '/mnt/twitter_folder/BlackFriday/2022/11/24/*/*'

#### Create Spark Session

In [None]:
spark = (SparkSession
        .builder
        .appName('df')
        .getOrCreate())

print('Session created')

Session created


In [None]:
sc = spark.sparkContext

In [None]:
# Define schema
schema = StructType([
    StructField('id', StringType(), True),
    StructField('name', StringType(), True),
    StructField('username', StringType(), True),
    StructField('tweet', StringType(), True),
    StructField('followers_count', StringType(), True),
    StructField('location', StringType(), True),
    StructField('geo', StringType(), True),
    StructField('created_at', StringType(), True)
])

In [None]:
# read data from the selected file path
df = (spark.read.schema(schema).option('delimiter','\t').csv(filePath))

In [None]:
# cache the dataframe for faster iteration
df.cache()

# run the count action to materialize the cache
df.count()

Out[7]: 113814

In [None]:
display(df.head(10))

id,name,username,tweet,followers_count,location,geo,created_at
1595930122003320832,Alan L. Stewart - unetomaterouge,unetomaterouge,"RT @ProgIntl: BREAKING: Tomorrow, Amazon workers at 18 warehouses are going on strike in France and Germany to #MakeAmazonPay, with many ot…",2394,,,Thu Nov 24 23:59:26 +0000 2022
1595930122124550144,Bismillah BigWin,CityxWin0,RT @ChiefElrond: $50 | 24 Hours 🥏 RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLA…,262,,,Thu Nov 24 23:59:26 +0000 2022
1595930122682478592,Fuck You I Quit,fuckyouiquit,This is the only Black Friday ad you need to see https://t.co/7Hj3CR4YlS,272510,Corporate Accounts Payable,,Thu Nov 24 23:59:26 +0000 2022
1595930123337105417,kumiii🐻 • freetag,chunvrwin,RT @jihanicorn: $150 | 2.250.000 IDR • 24 Hours 💜 - RT & Follow @CryptoCoinCoach + @NeblioTeam ____________________________ (BE ACTIVE ON…,53,🍀,,Thu Nov 24 23:59:27 +0000 2022
1595930124075294722,D_Adrian,DexaWinWin,RT @HeiraCrypto: $100 ~ Ends in 24 hrs. 🔶 RT - Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM…,45,,,Thu Nov 24 23:59:27 +0000 2022
1595930124305649664,$NEST 🎄$TTC freetag sepuasnya🍀,desnumber1,RT @May7ven: $50 — 24 Hours — ➖RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLACK…,189,kwangya,,Thu Nov 24 23:59:27 +0000 2022
1595930125413285890,bigwin 🔥 $COOKIES 🍪,0xbigwinasn,RT @Beast_Cryptox: 💰 $70 ~ 24 HOURS 🐄🦥 ➖ RT & Follow @CryptoCoinCoach @NeblioTeam ----------------- (BE ACTIVE ON PROFILE) Tweet - #NEBL…,42,"Jawa Tengah, Indonesia",,Thu Nov 24 23:59:27 +0000 2022
1595930125903990784,Rex00o,Rex00o2,"RT @GFuelEnergy: 💛 𝗟𝗜𝗞𝗘 + 𝗥𝗧 to win a #BanjoKazooie x #GFUEL ""HONEY BERRY"" Tub!!! Picking 2 winners tomorrow bc we just RESTOCKED these bab…",23,,,Thu Nov 24 23:59:27 +0000 2022
1595930126499332099,CJ Tocco,DMAlCo241,"@ThisIsKyleR Working for doubletime and a half is MY tradition. Of course, the sweet paycheck gets gobbled up on Bl… https://t.co/kaDhOvmW5n",23,,,Thu Nov 24 23:59:27 +0000 2022
1595930127816609792,Martin Jones,BackPackJones,If y’all don’t get y’all grandma a tv for Black Friday,616,"Dallas, TX",,Thu Nov 24 23:59:28 +0000 2022


In [None]:
# Get the shape of the DataFrame
num_rows = df.count()
num_cols = len(df.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_cols)

Number of rows: 113814
Number of columns: 8


In [None]:
# check for null values for column 'tweet'

# Count the number of rows with null values in the "tweet" column
null_count = df.select(sum(col("tweet").isNull().cast("int"))).collect()[0][0]

print("Number of rows with null values in the 'tweet' column:", null_count)


Number of rows with null values in the 'tweet' column: 268


In [None]:
# drop those columns
df = df.dropna(subset=["tweet"])
display(df.head(10))

id,name,username,tweet,followers_count,location,geo,created_at
1595930122003320832,Alan L. Stewart - unetomaterouge,unetomaterouge,"RT @ProgIntl: BREAKING: Tomorrow, Amazon workers at 18 warehouses are going on strike in France and Germany to #MakeAmazonPay, with many ot…",2394,,,Thu Nov 24 23:59:26 +0000 2022
1595930122124550144,Bismillah BigWin,CityxWin0,RT @ChiefElrond: $50 | 24 Hours 🥏 RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLA…,262,,,Thu Nov 24 23:59:26 +0000 2022
1595930122682478592,Fuck You I Quit,fuckyouiquit,This is the only Black Friday ad you need to see https://t.co/7Hj3CR4YlS,272510,Corporate Accounts Payable,,Thu Nov 24 23:59:26 +0000 2022
1595930123337105417,kumiii🐻 • freetag,chunvrwin,RT @jihanicorn: $150 | 2.250.000 IDR • 24 Hours 💜 - RT & Follow @CryptoCoinCoach + @NeblioTeam ____________________________ (BE ACTIVE ON…,53,🍀,,Thu Nov 24 23:59:27 +0000 2022
1595930124075294722,D_Adrian,DexaWinWin,RT @HeiraCrypto: $100 ~ Ends in 24 hrs. 🔶 RT - Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM…,45,,,Thu Nov 24 23:59:27 +0000 2022
1595930124305649664,$NEST 🎄$TTC freetag sepuasnya🍀,desnumber1,RT @May7ven: $50 — 24 Hours — ➖RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLACK…,189,kwangya,,Thu Nov 24 23:59:27 +0000 2022
1595930125413285890,bigwin 🔥 $COOKIES 🍪,0xbigwinasn,RT @Beast_Cryptox: 💰 $70 ~ 24 HOURS 🐄🦥 ➖ RT & Follow @CryptoCoinCoach @NeblioTeam ----------------- (BE ACTIVE ON PROFILE) Tweet - #NEBL…,42,"Jawa Tengah, Indonesia",,Thu Nov 24 23:59:27 +0000 2022
1595930125903990784,Rex00o,Rex00o2,"RT @GFuelEnergy: 💛 𝗟𝗜𝗞𝗘 + 𝗥𝗧 to win a #BanjoKazooie x #GFUEL ""HONEY BERRY"" Tub!!! Picking 2 winners tomorrow bc we just RESTOCKED these bab…",23,,,Thu Nov 24 23:59:27 +0000 2022
1595930126499332099,CJ Tocco,DMAlCo241,"@ThisIsKyleR Working for doubletime and a half is MY tradition. Of course, the sweet paycheck gets gobbled up on Bl… https://t.co/kaDhOvmW5n",23,,,Thu Nov 24 23:59:27 +0000 2022
1595930127816609792,Martin Jones,BackPackJones,If y’all don’t get y’all grandma a tv for Black Friday,616,"Dallas, TX",,Thu Nov 24 23:59:28 +0000 2022


#### Create Sentiment column using VADER

In [None]:
# define a function to get sentiment score using VADER
def getSentimentScore(tweetText):
    sia = SentimentIntensityAnalyzer()
    ss = sia.polarity_scores(tweetText)
    return float(ss['compound'])

# define a function to get sentiment
def getSentiment(score):
    return 1 if score >= 0 else 0

In [None]:
# create sentiment score column
udfss=udf(getSentimentScore, FloatType())
df = df.withColumn('sentiment score',udfss('tweet'))

In [None]:
# create sentiment column - positive:1  negative:0
udfSentiment = udf(getSentiment, IntegerType())
df1 = df.withColumn('sentiment', udfSentiment('sentiment score'))

In [None]:
display(df1.head(5))
# df1 is the dataframe with the original data + sentiment + sentiment score

id,name,username,tweet,followers_count,location,geo,created_at,sentiment score,sentiment
1595930122003320832,Alan L. Stewart - unetomaterouge,unetomaterouge,"RT @ProgIntl: BREAKING: Tomorrow, Amazon workers at 18 warehouses are going on strike in France and Germany to #MakeAmazonPay, with many ot…",2394,,,Thu Nov 24 23:59:26 +0000 2022,0.0516000017523765,1
1595930122124550144,Bismillah BigWin,CityxWin0,RT @ChiefElrond: $50 | 24 Hours 🥏 RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLA…,262,,,Thu Nov 24 23:59:26 +0000 2022,0.5318999886512756,1
1595930122682478592,Fuck You I Quit,fuckyouiquit,This is the only Black Friday ad you need to see https://t.co/7Hj3CR4YlS,272510,Corporate Accounts Payable,,Thu Nov 24 23:59:26 +0000 2022,0.0,1
1595930123337105417,kumiii🐻 • freetag,chunvrwin,RT @jihanicorn: $150 | 2.250.000 IDR • 24 Hours 💜 - RT & Follow @CryptoCoinCoach + @NeblioTeam ____________________________ (BE ACTIVE ON…,53,🍀,,Thu Nov 24 23:59:27 +0000 2022,0.8240000009536743,1
1595930124075294722,D_Adrian,DexaWinWin,RT @HeiraCrypto: $100 ~ Ends in 24 hrs. 🔶 RT - Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM…,45,,,Thu Nov 24 23:59:27 +0000 2022,0.7034000158309937,1


In [None]:
# select the sentiment and tweet column for the purpose of this sentiment analysis
tweets=df1.select('sentiment','tweet')
display(tweets.head(10))

sentiment,tweet
1,"RT @ProgIntl: BREAKING: Tomorrow, Amazon workers at 18 warehouses are going on strike in France and Germany to #MakeAmazonPay, with many ot…"
1,RT @ChiefElrond: $50 | 24 Hours 🥏 RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLA…
1,This is the only Black Friday ad you need to see https://t.co/7Hj3CR4YlS
1,RT @jihanicorn: $150 | 2.250.000 IDR • 24 Hours 💜 - RT & Follow @CryptoCoinCoach + @NeblioTeam ____________________________ (BE ACTIVE ON…
1,RT @HeiraCrypto: $100 ~ Ends in 24 hrs. 🔶 RT - Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM…
1,RT @May7ven: $50 — 24 Hours — ➖RT & Follow: @CryptoCoinCoach + @NeblioTeam (BE ACTIVE ON PROFILE) Tweet #NEBL NEXT GEM ON BLACK…
1,RT @Beast_Cryptox: 💰 $70 ~ 24 HOURS 🐄🦥 ➖ RT & Follow @CryptoCoinCoach @NeblioTeam ----------------- (BE ACTIVE ON PROFILE) Tweet - #NEBL…
1,"RT @GFuelEnergy: 💛 𝗟𝗜𝗞𝗘 + 𝗥𝗧 to win a #BanjoKazooie x #GFUEL ""HONEY BERRY"" Tub!!! Picking 2 winners tomorrow bc we just RESTOCKED these bab…"
1,"@ThisIsKyleR Working for doubletime and a half is MY tradition. Of course, the sweet paycheck gets gobbled up on Bl… https://t.co/kaDhOvmW5n"
1,If y’all don’t get y’all grandma a tv for Black Friday


#### Text Cleaning Preprocessing

`pyspark.sql.functions.regexp_replace` is used to process the text

1. Remove URLs such as `http://cnn.com`
2. Remove special characters
3. Substituting multiple spaces with single space
4. Lowercase all text
5. Trim the leading/trailing whitespaces

In [None]:
# clean the tweets as mentioned above
tweets_clean = tweets.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                    .withColumn('tweet', F.lower('tweet')) \
                    .withColumn('tweet', F.trim('tweet')) 
display(tweets_clean.head(5))

sentiment,tweet
1,rt progintl breaking tomorrow amazon workers at warehouses are going on strike in france and germany to makeamazonpay with many ot
1,rt chiefelrond hours rt amp follow cryptocoincoach neblioteam be active on profile tweet nebl next gem on bla
1,this is the only black friday ad you need to see
1,rt jihanicorn idr hours rt amp follow cryptocoincoach neblioteam ____________________________ be active on
1,rt heiracrypto ends in hrs rt follow cryptocoincoach neblioteam be active on profile tweet nebl next gem


#### Feature Transformer: Tokenizer

In [None]:
# Tokenize the tweets
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens") 
tweets_tokenized = tokenizer.transform(tweets_clean)

display(tweets_tokenized.head(5))

sentiment,tweet,tokens
1,rt progintl breaking tomorrow amazon workers at warehouses are going on strike in france and germany to makeamazonpay with many ot,"List(rt, progintl, breaking, tomorrow, amazon, workers, at, warehouses, are, going, on, strike, in, france, and, germany, to, makeamazonpay, with, many, ot)"
1,rt chiefelrond hours rt amp follow cryptocoincoach neblioteam be active on profile tweet nebl next gem on bla,"List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem, on, bla)"
1,this is the only black friday ad you need to see,"List(this, is, the, only, black, friday, ad, you, need, to, see)"
1,rt jihanicorn idr hours rt amp follow cryptocoincoach neblioteam ____________________________ be active on,"List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, be, active, on)"
1,rt heiracrypto ends in hrs rt follow cryptocoincoach neblioteam be active on profile tweet nebl next gem,"List(rt, heiracrypto, ends, in, hrs, rt, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem)"


#### Feature Transformer: Stopword Removal

In [None]:
# Remove stopword
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
tweets_stopword = stopword_remover.transform(tweets_tokenized)

display(tweets_stopword.head(5))

sentiment,tweet,tokens,filtered
1,rt progintl breaking tomorrow amazon workers at warehouses are going on strike in france and germany to makeamazonpay with many ot,"List(rt, progintl, breaking, tomorrow, amazon, workers, at, warehouses, are, going, on, strike, in, france, and, germany, to, makeamazonpay, with, many, ot)","List(rt, progintl, breaking, tomorrow, amazon, workers, warehouses, going, strike, france, germany, makeamazonpay, many, ot)"
1,rt chiefelrond hours rt amp follow cryptocoincoach neblioteam be active on profile tweet nebl next gem on bla,"List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem, on, bla)","List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem, bla)"
1,this is the only black friday ad you need to see,"List(this, is, the, only, black, friday, ad, you, need, to, see)","List(black, friday, ad, need, see)"
1,rt jihanicorn idr hours rt amp follow cryptocoincoach neblioteam ____________________________ be active on,"List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, be, active, on)","List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, active)"
1,rt heiracrypto ends in hrs rt follow cryptocoincoach neblioteam be active on profile tweet nebl next gem,"List(rt, heiracrypto, ends, in, hrs, rt, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem)","List(rt, heiracrypto, ends, hrs, rt, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem)"


#### Feature Transformer: CountVectorizer (TF - Term Frequency)

In [None]:
# Apply count vectorizer
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(tweets_stopword)
tweets_cv = cv_model.transform(tweets_stopword)

display(tweets_cv.head(5))

sentiment,tweet,tokens,filtered,cv
1,rt progintl breaking tomorrow amazon workers at warehouses are going on strike in france and germany to makeamazonpay with many ot,"List(rt, progintl, breaking, tomorrow, amazon, workers, at, warehouses, are, going, on, strike, in, france, and, germany, to, makeamazonpay, with, many, ot)","List(rt, progintl, breaking, tomorrow, amazon, workers, warehouses, going, strike, france, germany, makeamazonpay, many, ot)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 20, 41, 57, 252, 271, 372, 552, 589, 790, 797, 815, 823, 847), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1,rt chiefelrond hours rt amp follow cryptocoincoach neblioteam be active on profile tweet nebl next gem on bla,"List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem, on, bla)","List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem, bla)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 85, 88), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1,this is the only black friday ad you need to see,"List(this, is, the, only, black, friday, ad, you, need, to, see)","List(black, friday, ad, need, see)","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 129, 150, 312), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))"
1,rt jihanicorn idr hours rt amp follow cryptocoincoach neblioteam ____________________________ be active on,"List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, be, active, on)","List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, active)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 5, 6, 7, 9, 10, 71, 72, 73), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1,rt heiracrypto ends in hrs rt follow cryptocoincoach neblioteam be active on profile tweet nebl next gem,"List(rt, heiracrypto, ends, in, hrs, rt, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem)","List(rt, heiracrypto, ends, hrs, rt, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 11, 12, 13, 36, 45, 48), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


#### Feature Transformer: TF-IDF Vectorization

In [None]:
# TF-IDF Vectorization
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) 
idf_model = idf.fit(tweets_cv)
tweets_idf = idf_model.transform(tweets_cv)

display(tweets_idf.head(5)) # The dataframe is now ready for the following machine learning stage.

sentiment,tweet,tokens,filtered,cv,features
1,rt progintl breaking tomorrow amazon workers at warehouses are going on strike in france and germany to makeamazonpay with many ot,"List(rt, progintl, breaking, tomorrow, amazon, workers, at, warehouses, are, going, on, strike, in, france, and, germany, to, makeamazonpay, with, many, ot)","List(rt, progintl, breaking, tomorrow, amazon, workers, warehouses, going, strike, france, germany, makeamazonpay, many, ot)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 20, 41, 57, 252, 271, 372, 552, 589, 790, 797, 815, 823, 847), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 20, 41, 57, 252, 271, 372, 552, 589, 790, 797, 815, 823, 847), values -> List(0.3676543272010515, 3.1175924090018903, 3.5883125702634744, 3.9732819270253423, 5.352113566943643, 5.411461123514245, 5.751094168772547, 6.269334098977765, 6.3317044297042235, 6.705498193974736, 6.727317241369375, 6.749622998883674, 6.7647748039042765, 6.780159722743756))"
1,rt chiefelrond hours rt amp follow cryptocoincoach neblioteam be active on profile tweet nebl next gem on bla,"List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem, on, bla)","List(rt, chiefelrond, hours, rt, amp, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem, bla)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 85, 88), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 85, 88), values -> List(0.735308654402103, 1.3188603556373257, 1.3732660863895567, 1.5056109771931794, 1.5058888597300504, 1.5203253570295348, 1.540835557564483, 1.5713939547037072, 1.5846222252600537, 1.6124985947883086, 1.6561496019170256, 1.6958706016840448, 4.103608188700917, 4.185831048958749))"
1,this is the only black friday ad you need to see,"List(this, is, the, only, black, friday, ad, you, need, to, see)","List(black, friday, ad, need, see)","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 129, 150, 312), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 129, 150, 312), values -> List(0.4434163164512347, 0.46447935985153693, 4.599435736889472, 4.7272293066122515, 5.578515208177411))"
1,rt jihanicorn idr hours rt amp follow cryptocoincoach neblioteam ____________________________ be active on,"List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, be, active, on)","List(rt, jihanicorn, idr, hours, rt, amp, follow, cryptocoincoach, neblioteam, ____________________________, active)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 5, 6, 7, 9, 10, 71, 72, 73), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 5, 6, 7, 9, 10, 71, 72, 73), values -> List(0.735308654402103, 1.3188603556373257, 1.5056109771931794, 1.5058888597300504, 1.5203253570295348, 1.5713939547037072, 1.5846222252600537, 3.987426434411507, 3.9979277242321705, 3.9984076858444557))"
1,rt heiracrypto ends in hrs rt follow cryptocoincoach neblioteam be active on profile tweet nebl next gem,"List(rt, heiracrypto, ends, in, hrs, rt, follow, cryptocoincoach, neblioteam, be, active, on, profile, tweet, nebl, next, gem)","List(rt, heiracrypto, ends, hrs, rt, follow, cryptocoincoach, neblioteam, active, profile, tweet, nebl, next, gem)","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 11, 12, 13, 36, 45, 48), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(0, 3, 4, 5, 6, 7, 8, 11, 12, 13, 36, 45, 48), values -> List(0.735308654402103, 1.3188603556373257, 1.3732660863895567, 1.5056109771931794, 1.5058888597300504, 1.5203253570295348, 1.540835557564483, 1.6124985947883086, 1.6561496019170256, 1.6958706016840448, 3.449340446201889, 3.61609213437055, 3.6466897990038367))"


### Model Training: Logistic Regression

In [None]:
# rename column 'sentiment' to 'label'
tweets_idf = tweets_idf.withColumnRenamed("sentiment", "label")

In [None]:
# split the data into traning and test sets
train_data, test_data = tweets_idf.randomSplit([0.7, 0.3], seed=1234)

lr = LogisticRegression(maxIter=100)

lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)

display(predictions.head(5))

label,tweet,tokens,filtered,cv,features,rawPrediction,probability,prediction
0,a distraction from shopping in the black friday sales an epic thread combines gynaecological anatomy with vague,"List(a, distraction, from, shopping, in, the, black, friday, sales, an, epic, thread, combines, gynaecological, anatomy, with, vague)","List(distraction, shopping, black, friday, sales, epic, thread, combines, gynaecological, anatomy, vague)","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 32, 64, 97, 246, 15471, 16074, 29796, 30875, 31219), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 32, 64, 97, 246, 15471, 16074, 29796, 30875, 31219), values -> List(0.4434163164512347, 0.46447935985153693, 3.4413326718080586, 3.8492760959306898, 4.286249796705796, 5.313822653950329, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-5.241514243446955, 5.241514243446955))","Map(vectorType -> dense, length -> 2, values -> List(0.005264376700873776, 0.9947356232991262))",1.0
0,a dizzy nendroid man,"List(a, dizzy, nendroid, man)","List(dizzy, nendroid, man)","Map(vectorType -> sparse, length -> 37185, indices -> List(683, 13757, 36833), values -> List(1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(683, 13757, 36833), values -> List(6.527984338748885, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-2.7838985014207074, 2.7838985014207074))","Map(vectorType -> dense, length -> 2, values -> List(0.058200497792373386, 0.9417995022076266))",1.0
0,adorepixsxoxo give me my black friday shopping allowance im going crazy this year findom findomme fins,"List(adorepixsxoxo, give, me, my, black, friday, shopping, allowance, im, going, crazy, this, year, findom, findomme, fins)","List(adorepixsxoxo, give, black, friday, shopping, allowance, im, going, crazy, year, findom, findomme, fins)","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 41, 64, 78, 108, 114, 398, 548, 3915, 5535, 6957, 29373), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 41, 64, 78, 108, 114, 398, 548, 3915, 5535, 6957, 29373), values -> List(0.4434163164512347, 0.46447935985153693, 3.5883125702634744, 3.8492760959306898, 4.052661621082812, 4.381559976510121, 4.4924128559159735, 5.884229913518515, 6.283385852433415, 8.749600369209263, 9.242076854307058, 9.560530585425592, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.0453207302256597, -2.0453207302256597))","Map(vectorType -> dense, length -> 2, values -> List(0.88547394998761, 0.11452605001239002))",0.0
0,amazon workers across the world plan black friday strike to demand the tech giant pays fairly and ceases awful un,"List(amazon, workers, across, the, world, plan, black, friday, strike, to, demand, the, tech, giant, pays, fairly, and, ceases, awful, un)","List(amazon, workers, across, world, plan, black, friday, strike, demand, tech, giant, pays, fairly, ceases, awful, un)","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 57, 271, 369, 372, 389, 419, 809, 2013, 2115, 2202, 2995, 5508, 6377, 6576), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(1, 2, 57, 271, 369, 372, 389, 419, 809, 2013, 2115, 2202, 2995, 5508, 6377, 6576), values -> List(0.4434163164512347, 0.46447935985153693, 3.9732819270253423, 5.411461123514245, 5.753868095655272, 5.751094168772547, 5.881070353228147, 5.963218324837146, 6.742132327154517, 7.90230250882206, 7.951092672991492, 8.029054214461203, 8.381875589083945, 9.242076854307058, 9.442747549769209, 9.442747549769209))","Map(vectorType -> dense, length -> 2, values -> List(47.24168322895689, -47.24168322895689))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",0.0
0,another bullshit from wbd firedavidzaslav,"List(another, bullshit, from, wbd, firedavidzaslav)","List(another, bullshit, wbd, firedavidzaslav)","Map(vectorType -> sparse, length -> 37185, indices -> List(365, 3116, 21514, 26094), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 37185, indices -> List(365, 3116, 21514, 26094), values -> List(5.756649738617149, 8.461918296757482, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(27.301697931623202, -27.301697931623202))","Map(vectorType -> dense, length -> 2, values -> List(0.99999999999861, 1.389999226830696E-12))",0.0


#### Model Evaluation

In [None]:
# Evaluate the model using binary classification evaluation 
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction") 
roc_auc = evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.9588
ROC-AUC: 0.9290


#### Save the data and the predictions into my bucket

In [None]:
# mount my s3 bucket
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'ptb2-effy', 'my_bucket')

Mounting ptb2-effy
/mnt/my_bucket has been unmounted.
The bucket ptb2-effy was mounted to my_bucket 



In [None]:
# save the data
df.write.option('header','false').csv('/mnt/my_bucket/demo/data.csv') # remove header for athena

In [None]:
# save the predictions as a Parquet file
predictions.write.parquet('/mnt/my_bucket/demo/predictions.parquet')

## Conclusion

This project is focused on analyzing Twitter data during Black Friday using machine learning techniques, specifically sentiment analysis. The aim is to gain insights into consumer behavior and preferences during this significant shopping event. 

The project follows a step-by-step approach, starting with mounting the data on tweets from the WeCloudData public dataset bucket, creating a Spark session and Spark DataFrame, creating a sentiment column, cleaning the text, performing feature transformation, and model training and evaluation. Finally, the data and predictions are saved to the user's bucket. 

This project demonstrates the power of machine learning in analyzing unstructured data from social media platforms like Twitter to provide valuable insights into consumer behavior, which can help businesses and policymakers make informed decisions.