In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType, StringType, TimestampType
import pyspark.sql.functions as F 
from collections import Counter
import nltk
import operator
nltk.download("stopwords")
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer

In [0]:
!pip install nltk

In [0]:

nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Twitter Sentiment Analysis').getOrCreate()
sc = spark.sparkContext

#### Mount S3 Bucket

In [0]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  encoded_secret_key = secret_key.replace('/', "%2F")
  
  try:
    #unmount the data if it was already mounted
    dbutils.fs.unmount("/mnt/%s"%mount_folder)
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print('Directory not unmounted: ', mount_folder)
  finally:
    #mount the bucket
    dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, bucket_name), "/mnt/%s" % mount_folder)
    print(f'The bucket {bucket_name} was mounted to {mount_folder}')
    


mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "wcd-bigdata-jojo/twitter", "twitter_project")

In [0]:
twitter_schema = StructType([
  StructField('id', StringType(), False),
  StructField('username', StringType(), True),
  StructField('screen_name', StringType(), True),
  StructField('tweet', StringType(), True),
  StructField('followers_cnt', StringType(), True),
  StructField('location', StringType(), True),
  StructField('geo', StringType(), True),
  StructField('created_at', StringType(), True)
])

twitter_file = '/mnt/twitter_project/2021/*/*/*'

twitter = (spark.read
           .option('header', 'false')
           .option('encoding', 'utf-8')
           .option('charset', 'utf-16')
           .option("delimiter", "\t")
           .schema(twitter_schema)
           .csv(twitter_file))

twitter.show(10)

In [0]:
# twitter_file = '/mnt/twitter_project/2021/08/*/*'
# rdd = sc.textFile(twitter_file)\
#         .map(lambda line: line.split('\t'))\
#         .map(lambda line: (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7]))
# df = (rdd.toDF()
#           .withColumnRenamed('_1', "id")
#           .withColumnRenamed('_2', "username")
#           .withColumnRenamed('_3', "screen_name")
#           .withColumnRenamed('_4', "tweet")
#           .withColumnRenamed('_5', "followers_cnt")
#           .withColumnRenamed('_6', "location")
#           .withColumnRenamed('_7', "geo")
#           .withColumnRenamed('_8', "created_at")
#      )  


In [0]:
# df.take(20)

In [0]:
sentiment = SentimentIntensityAnalyzer()
def sentiment_label(text):
  sent = sentiment.polarity_scores(text)
  if sent['compound'] > 0:
    return 1
  elif sent['compound'] <0:
    return -1
  else:
    return 0

label = F.udf(sentiment_label)

In [0]:
twitter = twitter.withColumn('vader_label', label(F.col('tweet')))

### Process Time

In [0]:
from datetime import datetime
from datetime import timedelta
convert_time = F.udf(lambda x: datetime.strptime(x, '%a %b %d %H:%M:%S %z %Y') - timedelta(hours=5), TimestampType())

#filter out the rows where the created_at is null otherwise it would throw an error
twitter = twitter.filter("created_at IS NOT NULL AND tweet IS NOT NULL AND created_at != 'None'") 
twitter = twitter.withColumn('etc_time', convert_time(F.col('created_at')))
twitter = twitter.withColumn('hour', F.hour('etc_time')).withColumn('dayofweek', F.dayofweek('etc_time'))

In [0]:
twitter.filter("created_at = 'None'").count()

### Process Political Party

In [0]:
def define_party(tweet):
  liberal = ['trudeau', "justin", "justintrudeau", "liberal", "liberals","red", 
              'trudeaumustgo', "teamtrudeau",'liberalmajority','kinsellagate','voteliberal','freeland', 'nevervoteliberal']
  conservative = ["conservative", 'conservatives', "blue",'securethefuture','davenportto',"erin o'toole", "o'toole", 'toole', 'voteconservative','cpc', 'nevervoteconservative']
  ndp = ["ndp","orange","quebec", 'jagmeetsingh', 'jagmeet', 'singh', 'thejagmeetsingh', 'uprisingh', 'initforyou', 'votendp', 'nevervotendp']
  
  lst = []
  
  # check liberal words
  for word in liberal:
    if word in tweet:
      lst.append('Liberal')
  
  # check conservative words
  for word in conservative:
    if word in tweet:
      lst.append('Conservative')
      
  # check ndp words
  for word in ndp:
    if word in tweet:
      lst.append('NDP')
      
  if len(lst)==0:
    return 'No Party'
  elif len(lst)==1:
    return lst[0]
  elif len(lst)==2:
    return lst[1]
  else:
    return sorted(Counter(lst).items(), key=operator.itemgetter(1))[-1][0]
  
party_label = F.udf(define_party, StringType())

twitter = twitter.withColumn('tweet', F.lower('tweet')).withColumn('Party', party_label('tweet'))

In [0]:
display(twitter)

id,username,screen_name,tweet,followers_cnt,location,geo,created_at,vader_label,etc_time,hour,dayofweek,Party,clean_tweet,location_clean
1430659884073246734,Nick Gottlieb,ngottliebphoto,rt @avilewis: this is obscene. https://t.co/c5lj6bu87l,482,Sḵwx̱wú7mesh Úxwumixw land,,Wed Aug 25 22:34:31 +0000 2021,-1,2021-08-25T18:34:31.000+0000,18,4,No Party,obscene,Other
1430659886459891715,patti doyle-bedwell,Pattidbedwell,i agree with you 100%. erin o’toole wants private health care and refuses to answer a direct question about it.,2419,"Halifax, Nova Scotia",,Wed Aug 25 22:34:31 +0000 2021,1,2021-08-25T18:34:31.000+0000,18,4,Conservative,agree erin toole wants private health care refuses answer direct question,"NS, Canada"
1430659888267550721,Jenny,jenny_hol,rt @thejagmeetsingh: justin trudeau promised not to call a snap pandemic election. calling a selfish election where on-campus voting is c…,100,,,Wed Aug 25 22:34:32 +0000 2021,-1,2021-08-25T18:34:32.000+0000,18,4,NDP,justin trudeau promised call snap pandemic calling selfish campus voting c,
1430659888401768456,Ryan Ternapolski,RT1925,"rt @gtlem: ndp mp don davies calls his female liberal opponent candidacy (a filipino canadian) @virginiabremner ""exploitative"" & ""element…",129,"Pembroke, Ontario",,Wed Aug 25 22:34:32 +0000 2021,0,2021-08-25T18:34:32.000+0000,18,4,NDP,ndp mp davies calls female liberal opponent candidacy filipino canadian exploitative amp element,"ON, Canada"
1430659890331099137,Jesse Chilton,jchilton666,"rt @emmmacfarlane: what does the ""minority government situation"" have to do with whether you can offer on campus voting? https://t.co/pxaq…",247,"British Columbia, Canada",,Wed Aug 25 22:34:32 +0000 2021,0,2021-08-25T18:34:32.000+0000,18,4,No Party,minority government situation whether offer campus voting,"BC, Canada"
1430659898937954310,lornai1952,lornai1952,rt @lpcpressbox: fact check: this is a frequent & categorically false claim made by erin o'toole & the conservatives. see what @honahmedhu…,693,,,Wed Aug 25 22:34:34 +0000 2021,0,2021-08-25T18:34:34.000+0000,18,4,Conservative,fact check frequent amp categorically false claim made erin toole amp conservatives see,
1430659899898449923,The Teacher Down the Hall,toptopp,rt @lexharvs: important clarification on some of the messaging from the @ndp this week: trudeau didn't cut health care transfers to provinc…,206,,,Wed Aug 25 22:34:35 +0000 2021,-1,2021-08-25T18:34:35.000+0000,18,4,NDP,important clarification messaging week trudeau cut health care transfers provinc,
1430659901693571073,Dr Bertha Mispireta Garcia,berthagarcia5,"rt @docs4ltcjustice: fact: revera, a for-profit ltc company, is owned by the federal government. fact: revera had 866 #covid19 deaths, the…",1528,"London, Ontario",,Wed Aug 25 22:34:35 +0000 2021,0,2021-08-25T18:34:35.000+0000,18,4,No Party,fact revera profit ltc company owned federal government fact revera covid deaths,"ON, Canada"
1430659903178297344,˗ˏˋ🧞‍♀️♡🅂🄴🄽🅂🄸☆🧜🏽‍♀️ˎˊ˗,SENSIMILLIEA,rt @mini_bubbly: 116 conservative mp's voted against undrip - united nations declaration on the rights of indigenous peoples - bill c-15 -…,4909,God's Green 🌎🌍🌏,,Wed Aug 25 22:34:35 +0000 2021,1,2021-08-25T18:34:35.000+0000,18,4,Conservative,conservative mp voted undrip united nations declaration rights indigenous peoples bill c,Other
1430659908903522305,please Don't vote Liberal vote 4 Canada,Canad_IANism,"rt @anthonyfurey: erin o’toole joins me for the latest episode of my postmedia podcast full comment — discussing afghanistan, the economy a…",1087,Canada,,Wed Aug 25 22:34:37 +0000 2021,0,2021-08-25T18:34:37.000+0000,18,4,Conservative,erin toole joins latest episode postmedia podcast full comment discussing afghanistan economy,Canada


In [0]:
twitter.filter("location IS NOT NULL AND location != 'None'").count()

### Process Text

In [0]:
import re
stop_words = stopwords.words('english') + ['retweeted', 'b', 'elxn', 'cdnpoli', 'canada', 'election']
def clean_tweet(text):
  text = text.lower()
  text = re.sub(r'<[^>]+>', '', text) #remove all the html tags
  text = re.sub(r'http\S+', '', text) # remove links
  text = re.sub(r'(rt)? @[^\s]+:?','', text)# remove at
  text = re.sub(r'[^a-zA-Z]', ' ', text) 
  tokenizer = RegexpTokenizer(r'\w+') 
  tokens = tokenizer.tokenize(text)
  
  text = ' '.join(list(filter(lambda x: x.strip() not in stop_words, tokens)))
  return text

clean_text = F.udf(clean_tweet)
twitter = twitter.withColumn('clean_tweet', clean_text(F.col('tweet')))

### Process Location

In [0]:
def define_loc(location):
  location = location.lower()
  loc_dict = {'ab': ['alberta', 'ab'],
              'bc': ['british columbia', 'bc', 'vancouver', 'victoria'],
              'mb': ['manitoba', 'mb'],
              'nb': ['new brunswick', 'nb'],
              'nl': ['newfoundland and labrador', 'nl'],
              'nt': ['northwest territories', 'nt'],
              'ns': ['nova scotia', 'ns'],
              'nu': ['nunavut', 'nu'],
              'on': ['ontario', 'on', 'ottawa', 'toronto', 'hamilton'],
              'pe': ['prince edward island', 'pe'],
              'qc': ['quebec', 'québec', 'montréal', 'qc', 'montreal'],
              'sk': ['saskatchewan', 'sk'], 
              'yt': ['yukon', 'yt']}

  if str(location) == 'none':
    return 'None'
  
  else:
    #check if in province
    for key, values in loc_dict.items():
      for v in values:
        if re.findall(fr'\b{v}\b', location):
          return f'{key.upper()}, Canada'
        else: continue
    #check if in canada
    if 'canada' in location:
      return 'Canada'
    elif location != 'none':
      return 'Other'

loc_func = F.udf(define_loc, StringType())
twitter = twitter.withColumn('location_clean', loc_func(F.col('location')))

In [0]:
 display(twitter)

id,username,screen_name,tweet,followers_cnt,location,geo,created_at,vader_label,etc_time,hour,dayofweek,Party,clean_tweet,location_clean
1430659884073246734,Nick Gottlieb,ngottliebphoto,rt @avilewis: this is obscene. https://t.co/c5lj6bu87l,482,Sḵwx̱wú7mesh Úxwumixw land,,Wed Aug 25 22:34:31 +0000 2021,-1,2021-08-25T18:34:31.000+0000,18,4,No Party,obscene,Other
1430659886459891715,patti doyle-bedwell,Pattidbedwell,i agree with you 100%. erin o’toole wants private health care and refuses to answer a direct question about it.,2419,"Halifax, Nova Scotia",,Wed Aug 25 22:34:31 +0000 2021,1,2021-08-25T18:34:31.000+0000,18,4,Conservative,agree erin toole wants private health care refuses answer direct question,"NS, Canada"
1430659888267550721,Jenny,jenny_hol,rt @thejagmeetsingh: justin trudeau promised not to call a snap pandemic election. calling a selfish election where on-campus voting is c…,100,,,Wed Aug 25 22:34:32 +0000 2021,-1,2021-08-25T18:34:32.000+0000,18,4,NDP,justin trudeau promised call snap pandemic calling selfish campus voting c,
1430659888401768456,Ryan Ternapolski,RT1925,"rt @gtlem: ndp mp don davies calls his female liberal opponent candidacy (a filipino canadian) @virginiabremner ""exploitative"" & ""element…",129,"Pembroke, Ontario",,Wed Aug 25 22:34:32 +0000 2021,0,2021-08-25T18:34:32.000+0000,18,4,NDP,ndp mp davies calls female liberal opponent candidacy filipino canadian exploitative amp element,"ON, Canada"
1430659890331099137,Jesse Chilton,jchilton666,"rt @emmmacfarlane: what does the ""minority government situation"" have to do with whether you can offer on campus voting? https://t.co/pxaq…",247,"British Columbia, Canada",,Wed Aug 25 22:34:32 +0000 2021,0,2021-08-25T18:34:32.000+0000,18,4,No Party,minority government situation whether offer campus voting,"BC, Canada"
1430659898937954310,lornai1952,lornai1952,rt @lpcpressbox: fact check: this is a frequent & categorically false claim made by erin o'toole & the conservatives. see what @honahmedhu…,693,,,Wed Aug 25 22:34:34 +0000 2021,0,2021-08-25T18:34:34.000+0000,18,4,Conservative,fact check frequent amp categorically false claim made erin toole amp conservatives see,
1430659899898449923,The Teacher Down the Hall,toptopp,rt @lexharvs: important clarification on some of the messaging from the @ndp this week: trudeau didn't cut health care transfers to provinc…,206,,,Wed Aug 25 22:34:35 +0000 2021,-1,2021-08-25T18:34:35.000+0000,18,4,NDP,important clarification messaging week trudeau cut health care transfers provinc,
1430659901693571073,Dr Bertha Mispireta Garcia,berthagarcia5,"rt @docs4ltcjustice: fact: revera, a for-profit ltc company, is owned by the federal government. fact: revera had 866 #covid19 deaths, the…",1528,"London, Ontario",,Wed Aug 25 22:34:35 +0000 2021,0,2021-08-25T18:34:35.000+0000,18,4,No Party,fact revera profit ltc company owned federal government fact revera covid deaths,"ON, Canada"
1430659903178297344,˗ˏˋ🧞‍♀️♡🅂🄴🄽🅂🄸☆🧜🏽‍♀️ˎˊ˗,SENSIMILLIEA,rt @mini_bubbly: 116 conservative mp's voted against undrip - united nations declaration on the rights of indigenous peoples - bill c-15 -…,4909,God's Green 🌎🌍🌏,,Wed Aug 25 22:34:35 +0000 2021,1,2021-08-25T18:34:35.000+0000,18,4,Conservative,conservative mp voted undrip united nations declaration rights indigenous peoples bill c,Other
1430659908903522305,please Don't vote Liberal vote 4 Canada,Canad_IANism,"rt @anthonyfurey: erin o’toole joins me for the latest episode of my postmedia podcast full comment — discussing afghanistan, the economy a…",1087,Canada,,Wed Aug 25 22:34:37 +0000 2021,0,2021-08-25T18:34:37.000+0000,18,4,Conservative,erin toole joins latest episode postmedia podcast full comment discussing afghanistan economy,Canada


In [0]:
twitter_df = twitter.select(F.col('id'), \
               F.col('vader_label'), \
               F.col('hour'), \
               F.col('dayofweek'), \
               F.col('followers_cnt'),\
               F.col('Party') , \
               F.col('clean_tweet'), \
               F.col('location_clean'))

In [0]:
display(twitter_df)

id,vader_label,hour,dayofweek,followers_cnt,Party,clean_tweet,location_clean
1430659884073246734,-1,18,4,482,No Party,obscene,Other
1430659886459891715,1,18,4,2419,Conservative,agree erin toole wants private health care refuses answer direct question,"NS, Canada"
1430659888267550721,-1,18,4,100,NDP,justin trudeau promised call snap pandemic calling selfish campus voting c,
1430659888401768456,0,18,4,129,NDP,ndp mp davies calls female liberal opponent candidacy filipino canadian exploitative amp element,"ON, Canada"
1430659890331099137,0,18,4,247,No Party,minority government situation whether offer campus voting,"BC, Canada"
1430659898937954310,0,18,4,693,Conservative,fact check frequent amp categorically false claim made erin toole amp conservatives see,
1430659899898449923,-1,18,4,206,NDP,important clarification messaging week trudeau cut health care transfers provinc,
1430659901693571073,0,18,4,1528,No Party,fact revera profit ltc company owned federal government fact revera covid deaths,"ON, Canada"
1430659903178297344,1,18,4,4909,Conservative,conservative mp voted undrip united nations declaration rights indigenous peoples bill c,Other
1430659908903522305,0,18,4,1087,Conservative,erin toole joins latest episode postmedia podcast full comment discussing afghanistan economy,Canada


In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'wcd-athena/twitter_df', 'twitter_df')

twitter_df.write \
           .option('header', 'true')  \
           .option('delimiter', '\t')  \
           .mode('overwrite') \
           .csv('/mnt/twitter_df/twitter_df')

In [0]:
twitter_df.groupby('Party').agg({'vader_label':'mean'}).alias('avg sentiment').show()

### Building ML model with twitter dataset

In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata/datasets/social/twitter/sentiment_analysis", "twitter_sentiment_dataset")

In [0]:
tweets = spark.read.option('header', True).csv('/mnt/twitter_sentiment_dataset')
tweets.cache()

In [0]:
tweets = tweets.select(F.col('sentiment'), F.col('SentimentText').alias('tweet'))
display(tweets)

sentiment,tweet
0,is so sad for my APL friend.............
0,I missed the New Moon trailer...
1,omg its already 7:30 :O
0,.. Omgaga. Im sooo im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...
0,i think mi bf is cheating on me!!! T_T
0,or i just worry too much?
1,Juuuuuuuuuuuuuuuuussssst Chillin!!
0,Sunny Again Work Tomorrow :-| TV Tonight
1,handed in my uniform today . i miss you already
1,hmmmm.... i wonder how she my number @-)


In [0]:
# check predict with vader first to see how accurate
sentiment = SentimentIntensityAnalyzer()
def sentiment_vader(text):
  sent = sentiment.polarity_scores(text)
  if sent['compound'] >= 0:
    return 1
  else:
    return 0

vader = F.udf(sentiment_vader)
tweets = tweets.withColumn('vader_label', vader(F.col('tweet')))

In [0]:
tweets.filter(F.col('sentiment')==F.col('vader_label')).count()

In [0]:
clean_tweet = tweets.select(F.col('sentiment').alias('label'), F.col('tweet')).withColumn('tweet', clean_text(F.col('tweet')))
display(clean_tweet)

label,tweet
0,sad apl friend
0,missed new moon trailer
1,omg already
0,omgaga im sooo im gunna cry dentist since suposed get crown put mins
0,think mi bf cheating
0,worry much
1,juuuuuuuuuuuuuuuuussssst chillin
0,sunny work tomorrow tv tonight
1,handed uniform today miss already
1,hmmmm wonder number


#### Building Pipeline for word embedding

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, Word2Vec, HashingTF, IDF, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, NGram, ChiSqSelector, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
clean_tweet.withColumn('label', F.col('label').cast(IntegerType()))

In [0]:
clean_tweet = clean_tweet.filter(F.col('tweet')!='').withColumn('label', F.col('label').cast(IntegerType()))
train, test = clean_tweet.randomSplit([0.9, 0.1], seed=42)
display(train)

label,tweet
0,aaa amerah aw sucks
0,aaa amerah bk algonac mcdonald kfc taco bell wendy shut sad
0,aaa amerah haha suggested foofie nope
0,aaa amerah seeing red x leis
0,aaaaaand brad lidge blew save
0,aaaairy
0,aaaale happened hunny sad
0,aaaarae oh sucks
0,aaaashleyyyy awwww
0,aaadiscounts know aaa iphone app covers also accessed outside us


In [0]:
train.printSchema()

In [0]:
clean_tweet = clean_tweet.filter(F.col('tweet')!='').withColumn('label', F.col('label').cast(IntegerType()))
train, test = clean_tweet.randomSplit([0.9, 0.1], seed=42)

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# Assemble all text features
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Regression model estimator
lr = LogisticRegression(maxIter=100)

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, lr])

# Pipeline model fitting
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

In [0]:
twi = twitter.select(F.col('id'), F.col('clean_tweet').alias('tweet'), F.col('location_clean'), F.col('vader_label'), F.col('Party'))

In [0]:
pipeline_model.transform(twi)

In [0]:
twitter_prediction = pipeline_model.transform(twi)

In [0]:
twitter_prediction = twitter_prediction.withColumn('sentiment', F.when(F.col('prediction')==0, -1).otherwise(F.col('prediction')))\
                  .select('id', 'location_clean', 'sentiment', 'vader_label', 'Party')

In [0]:
twitter_prediction.groupby('Party').agg({'sentiment':'mean'}).show()

In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'wcd-athena/twitter_sent', 'twitter_sent')

twitter_prediction.write \
           .option('header', 'true')  \
           .option('delimiter', '\t')  \
           .mode('overwrite') \
           .csv('/mnt/twitter_sent/twitter_sent')

#### LDA

In [0]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA

In [0]:
df = twitter.select('id', 'tweet')
df.count()

In [0]:
df_clean = df.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                   .withColumn('tweet', F.lower('tweet')) \
                   .withColumn('tweet', F.trim('tweet'))

tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
df_tokenized = tokenizer.transform(df_clean)

In [0]:
stopwordList = ['', 'elxn', 'amp', 'retweeted', 'b', 'cdnpoli', 'canada', 'election', 'rt', 'd', 'e']
stopwordList.extend(StopWordsRemover().getStopWords())
remover = StopWordsRemover(inputCol='tokens', outputCol = 'clean_tokens', stopWords=stopwordList)
df_stopword = remover.transform(df_tokenized)
df_stopword.show(5)

In [0]:
vec = CountVectorizer(inputCol="clean_tokens", outputCol="vectors")
model = vec.fit(df_stopword)
result = model.transform(df_stopword)

import numpy as np
vocabArray = np.array(model.vocabulary)
vocabArray

In [0]:
corpus = result.select("id", "vectors")

In [0]:
lda = LDA(k=5, seed=42, optimizer="em", featuresCol='vectors')
lda_model = lda.fit(corpus)

In [0]:
topics = lda_model.describeTopics(maxTermsPerTopic = 100)
topics_pd = topics.toPandas()


In [0]:
common = set(topics_pd['termIndices'][0]).intersection(set(topics_pd['termIndices'][1]))\
                                         .intersection(set(topics_pd['termIndices'][2]))\
                                         .intersection(set(topics_pd['termIndices'][3]))\
                                         .intersection(set(topics_pd['termIndices'][4]))
len(common)

In [0]:
lda_0 = list(set(topics_pd['termIndices'][0]) - common)
lda_1 = list(set(topics_pd['termIndices'][1]) - common)
lda_2 = list(set(topics_pd['termIndices'][2]) - common)
lda_3 = list(set(topics_pd['termIndices'][3]) - common)
lda_4 = list(set(topics_pd['termIndices'][4]) - common)
lda_0, lda_1, lda_2, lda_3,lda_4

In [0]:
vocabArray[lda_0], vocabArray[lda_1],vocabArray[lda_2], vocabArray[lda_3],vocabArray[lda_4]