In [None]:
#Installing Pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=795fc63bec75c68100e924c4fee0405419e8fc332dae758f244b5bcf2b57c451
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
#begin session
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
sc=spark.sparkContext
sqlContext = SQLContext(sc)



In [None]:
spark

In [None]:
spark = SparkSession.builder.appName('Topic Modelling2').getOrCreate()

In [None]:
#import dataset
data = spark.read.csv('tokyo_2020_tweets.csv', header = True, inferSchema = True)
data.show()

+--------------------+--------------------+-------------------+--------------------+-------------------+--------------+------------+---------------+-------------+-------------------+--------------------+--------------------+-------------------+--------+---------+----------+
|                  id|           user_name|      user_location|    user_description|       user_created|user_followers|user_friends|user_favourites|user_verified|               date|                text|            hashtags|             source|retweets|favorites|is_retweet|
+--------------------+--------------------+-------------------+--------------------+-------------------+--------------+------------+---------------+-------------+-------------------+--------------------+--------------------+-------------------+--------+---------+----------+
| 1418888645105356803|  Abhishek Srivastav|       Udupi, India|Trying to be medi...|2021-02-01 06:33:51|            45|          39|            293|        False|2021-07-24 10

In [None]:
#View columns
data.columns

['id',
 'user_name',
 'user_location',
 'user_description',
 'user_created',
 'user_followers',
 'user_friends',
 'user_favourites',
 'user_verified',
 'date',
 'text',
 'hashtags',
 'source',
 'retweets',
 'favorites',
 'is_retweet']

In [None]:
#Create a corpus from the text column
dt = data.select('text')
dt.show()

+--------------------+
|                text|
+--------------------+
| Let the party begin|
|                null|
|Congratulations #...|
|   Big Breaking Now |
|                null|
|                null|
|     Q4: 🇬🇧3-1🇿🇦|
|                null|
|All I can think o...|
|#Tokyo2020 #Olympics|
|                null|
|                null|
|                null|
|                null|
|Can't help but ch...|
|                null|
|                null|
|@inquirerdotnet @...|
|    Q3 🇨🇦 1-4 🇩🇪|
|                null|
+--------------------+
only showing top 20 rows



In [None]:
#Check total null values
dt.toPandas()['text'].isnull().sum()

199272

In [None]:
#drop null values
dt = dt.dropna()
dt.show()

+--------------------+
|                text|
+--------------------+
| Let the party begin|
|Congratulations #...|
|   Big Breaking Now |
|     Q4: 🇬🇧3-1🇿🇦|
|All I can think o...|
|#Tokyo2020 #Olympics|
|Can't help but ch...|
|@inquirerdotnet @...|
|    Q3 🇨🇦 1-4 🇩🇪|
|Hearty Congratula...|
|                 0.0|
|Gymnastics ❤️ #To...|
|Morning everyone!...|
| #Tokyo2020 #Tennis |
|Up next for Carlo...|
|Congrates @miraba...|
|The wait for a we...|
|#Tokyo2020   #Mir...|
|#Tokyo2020 #Olymp...|
|Well done to #Tea...|
+--------------------+
only showing top 20 rows



In [None]:
#check again
dt.toPandas()['text'].isnull().sum()

0

In [None]:
#cleaning the corpus
from pyspark.sql.functions import udf, col, lower, regexp_replace
# Clean text
dt_clean = dt.select((lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('cleaned_text')))
dt_clean.show()

+--------------------+
|        cleaned_text|
+--------------------+
| let the party begin|
|congratulations t...|
|   big breaking now |
|                  q |
|all i can think o...|
|      tokyo olympics|
|cant help but che...|
|inquirerdotnet ft...|
|                q   |
|hearty congratula...|
|                    |
|   gymnastics  tokyo|
|morning everyone ...|
|       tokyo tennis |
|up next for carlo...|
|congrates mirabai...|
|the wait for a we...|
|tokyo   mirabaich...|
|tokyo olympics a ...|
|well done to team...|
+--------------------+
only showing top 20 rows



In [None]:
#import proprocessing and model libraries
import pyspark.ml.feature
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.clustering import LDA

In [None]:
#initializer
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='tokened')
SW = StopWordsRemover(inputCol='tokened', outputCol='sw_removed')
CV = CountVectorizer(inputCol='sw_removed', outputCol='vectors',minDF= 10, maxDF = 97)
lda = LDA(featuresCol= 'vectors', maxIter=10, k=5)

In [None]:
#tokenize corpus
token= tokenizer.transform(dt_clean.select('cleaned_text'))
token.show()

+--------------------+--------------------+
|        cleaned_text|             tokened|
+--------------------+--------------------+
| let the party begin|[let, the, party,...|
|congratulations t...|[congratulations,...|
|   big breaking now |[big, breaking, now]|
|                  q |                 [q]|
|all i can think o...|[all, i, can, thi...|
|      tokyo olympics|   [tokyo, olympics]|
|cant help but che...|[cant, help, but,...|
|inquirerdotnet ft...|[inquirerdotnet, ...|
|                q   |                 [q]|
|hearty congratula...|[hearty, congratu...|
|                    |                  []|
|   gymnastics  tokyo|[gymnastics, , to...|
|morning everyone ...|[morning, everyon...|
|       tokyo tennis |     [tokyo, tennis]|
|up next for carlo...|[up, next, for, c...|
|congrates mirabai...|[congrates, mirab...|
|the wait for a we...|[the, wait, for, ...|
|tokyo   mirabaich...|[tokyo, , , mirab...|
|tokyo olympics a ...|[tokyo, olympics,...|
|well done to team...|[well, don

In [None]:
#remove stopwords
filt_words = SW.transform(token)
filt_words.show()

+--------------------+--------------------+--------------------+
|        cleaned_text|             tokened|          sw_removed|
+--------------------+--------------------+--------------------+
| let the party begin|[let, the, party,...| [let, party, begin]|
|congratulations t...|[congratulations,...|[congratulations,...|
|   big breaking now |[big, breaking, now]|     [big, breaking]|
|                  q |                 [q]|                 [q]|
|all i can think o...|[all, i, can, thi...|[think, every, ti...|
|      tokyo olympics|   [tokyo, olympics]|   [tokyo, olympics]|
|cant help but che...|[cant, help, but,...|[cant, help, chee...|
|inquirerdotnet ft...|[inquirerdotnet, ...|[inquirerdotnet, ...|
|                q   |                 [q]|                 [q]|
|hearty congratula...|[hearty, congratu...|[hearty, congratu...|
|                    |                  []|                  []|
|   gymnastics  tokyo|[gymnastics, , to...|[gymnastics, , to...|
|morning everyone ...|[mo

In [None]:
#vectorize the model using countvectorizer
cv_model = CV.fit(filt_words)
CVM = cv_model.transform(filt_words)
CVM.show()

+--------------------+--------------------+--------------------+--------------------+
|        cleaned_text|             tokened|          sw_removed|             vectors|
+--------------------+--------------------+--------------------+--------------------+
| let the party begin|[let, the, party,...| [let, party, begin]|   (6260,[11],[1.0])|
|congratulations t...|[congratulations,...|[congratulations,...|        (6260,[],[])|
|   big breaking now |[big, breaking, now]|     [big, breaking]|        (6260,[],[])|
|                  q |                 [q]|                 [q]|        (6260,[],[])|
|all i can think o...|[all, i, can, thi...|[think, every, ti...|        (6260,[],[])|
|      tokyo olympics|   [tokyo, olympics]|   [tokyo, olympics]|        (6260,[],[])|
|cant help but che...|[cant, help, but,...|[cant, help, chee...|        (6260,[],[])|
|inquirerdotnet ft...|[inquirerdotnet, ...|[inquirerdotnet, ...|(6260,[97,185,513...|
|                q   |                 [q]|           

In [None]:
#Fit the processed corpus into the lda modle for training
lda_mod = lda.fit(CVM)
lda_mod

LocalLDAModel: uid=LDA_38bc8af52d23, k=5, numFeatures=6260

In [None]:
#Initialize variables to extract the trained topic index
ext_vocab = cv_model.vocabulary
ext_topics = lda_mod.describeTopics()
ext_topics.rdd.map(lambda row: row['termIndices']).collect()

[[81, 3, 96, 0, 82, 202, 357, 349, 325, 252],
 [258, 42, 128, 186, 106, 129, 120, 216, 68, 75],
 [16, 78, 116, 41, 23, 180, 152, 166, 114, 136],
 [7, 24, 190, 400, 239, 178, 69, 140, 124, 111],
 [4, 100, 18, 15, 217, 54, 151, 540, 211, 308]]

In [None]:
#code to link and retrieve the the topics as words in a list
Mod_topics = ext_topics.rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [ext_vocab[idx] for idx in idx_list]).collect()

In [None]:
#view topics
Mod_topics[4]

['strikes',
 'leave',
 'sweet',
 'congratulation',
 'serbia',
 'pro',
 'austria',
 'spoonflower',
 'control',
 'divers']

In [None]:
#view all topics
from os import truncate
from pyspark.sql.types import StringType
LDA_Topics = spark.createDataFrame(Mod_topics, StringType()).show(truncate = 200)

+-------------------------------------------------------------------------------------------------------------+
|                                                                                                        value|
+-------------------------------------------------------------------------------------------------------------+
|[tkdbradly, funnearnapp, juegosolimpicos, blozadainq, killing, goldmedal, kim, daniil, beyond, ianuragthakur]|
|                                         [sir, bar, simply, chan, indeed, ying, rush, sister, nadine, nearly]|
|             [drop, congratulates, bhavanidevi, indiaattokyo, sign, carapaz, divyansh, rather, greece, tight]|
|                               [rain, cheung, catching, heads, defense, million, sitting, blow, lucky, wants]|
|                  [strikes, leave, sweet, congratulation, serbia, pro, austria, spoonflower, control, divers]|
+-------------------------------------------------------------------------------------------------------