# 0. Libraries and spark session

In [1]:
pip install spark-nlp==3.3.4

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install numpy pandas nltk

Note: you may need to restart the kernel to use updated packages.


In [1]:
spark.stop()
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","4G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.4")\
    .getOrCreate()
spark

In [2]:
from sparknlp.annotator import LemmatizerModel
import pyspark.sql.functions as f
from pyspark.ml.feature import Tokenizer as pysparkTokenizer, HashingTF, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner, PerceptronModel, Chunker
from pyspark.ml.clustering import LDA
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# 1. Loading the data

In [3]:
df = spark.read.json("/home/ubuntu/temp-data")
df.repartition(50).write.mode("overwrite").parquet("/home/ubuntu/temp-data2")
df = spark.read.parquet("/home/ubuntu/temp-data2")
# df = spark.read.option("tableName", "tweets").format("dynamodb").load()
df.show()
f"Lenght: {df.count()}"

+--------------+----+-------------------+----+--------------------+---------------+
|extractionTime| geo|                 id|lang|                text|           user|
+--------------+----+-------------------+----+--------------------+---------------+
|    1640622119|null|1475502230484697094|  en|@POTUS ⓘ 𝗢𝗳𝗳𝗶...|      rodriQuez|
|    1639923135|null|1472570476295860228| und|@VP @RepLowenthal...|  bandarfize1ap|
|    1639923254|null|1472570974570795010|  en|@VP @allysonfelix...|   joshycharm01|
|    1639927142|null|1472587285225156609|  en|@Sen_JoeManchin w...|        Caz_U_L|
|    1640622154|null|1475502374982610946|  en|RT @HouseGOP: Pre...|     CallMeBake|
|    1639925917|null|1472582147638796293|  en|RT @TheDemocrats:...|  TheRedPill333|
|    1639926225|null|1472583437433323525|  en|RT @TheDemocrats:...|    Sailfish157|
|    1639927086|null|1472587048003461122|  en|RT @SenWhitehouse...|      margpie55|
|    1639926965|null|1472586539939078151|  en|@Sen_JoeManchin G...|        vdoog

'Lenght: 17686'

Removing special chacaters and changing to lowercase

In [4]:
df = df.withColumn("text", f.lower(f.regexp_replace(f.col("text"), "[^A-Za-z0-9@ ]", "")))
df.show()

+--------------+----+-------------------+----+--------------------+---------------+
|extractionTime| geo|                 id|lang|                text|           user|
+--------------+----+-------------------+----+--------------------+---------------+
|    1640622119|null|1475502230484697094|  en|     @potus         |      rodriQuez|
|    1639923135|null|1472570476295860228| und|@vp @replowenthal...|  bandarfize1ap|
|    1639923254|null|1472570974570795010|  en|@vp @allysonfelix...|   joshycharm01|
|    1639927142|null|1472587285225156609|  en|@senjoemanchin wa...|        Caz_U_L|
|    1640622154|null|1475502374982610946|  en|rt @housegop pres...|     CallMeBake|
|    1639925917|null|1472582147638796293|  en|rt @thedemocrats ...|  TheRedPill333|
|    1639926225|null|1472583437433323525|  en|rt @thedemocrats ...|    Sailfish157|
|    1639927086|null|1472587048003461122|  en|rt @senwhitehouse...|      margpie55|
|    1639926965|null|1472586539939078151|  en|@senjoemanchin ge...|        v

# 2. LDA - topics analysis - Pyspark only

In [7]:
tokenizer = pysparkTokenizer(inputCol="text", outputCol="tokens")
stopwords_cleaner = StopWordsRemover(inputCol="tokens", outputCol="no stop words")
nlp_pipeline = Pipeline(
    stages=[tokenizer,
            stopwords_cleaner])
nlp_model = nlp_pipeline.fit(df)
processed_df  = nlp_model.transform(df)
processed_df.show(5)
processed_df.limit(3).toPandas().to_dict("records")

+--------------+----+-------------------+----+--------------------+-------------+--------------------+--------------------+
|extractionTime| geo|                 id|lang|                text|         user|              tokens|       no stop words|
+--------------+----+-------------------+----+--------------------+-------------+--------------------+--------------------+
|    1640621115|null|1475498016417239041|  zh|             @potus |     xiga8806|            [@potus]|            [@potus]|
|    1639926073|null|1472582800884879361|  en|@senjoemanchin th...|sabine_durden|[@senjoemanchin, ...|[@senjoemanchin, ...|
|    1639924782|null|1472577386281463808|  en|@pattymurray @pat...|   ClydeKlotz|[@pattymurray, @p...|[@pattymurray, @p...|
|    1640621860|null|1475501140762628103|  en|@barackobama the ...|      RACENKI|[@barackobama, th...|[@barackobama, ki...|
|    1640622154|null|1475502374982610946|  en|rt @housegop pres...|   CallMeBake|[rt, @housegop, p...|[rt, @housegop, p...|
+-------

[{'extractionTime': 1640621115,
  'geo': None,
  'id': 1475498016417239041,
  'lang': 'zh',
  'text': '@potus ',
  'user': 'xiga8806',
  'tokens': ['@potus'],
  'no stop words': ['@potus']},
 {'extractionTime': 1639926073,
  'geo': None,
  'id': 1472582800884879361,
  'lang': 'en',
  'text': '@senjoemanchin thank you for standing against all the pressure and for doing whats right for americans ',
  'user': 'sabine_durden',
  'tokens': ['@senjoemanchin',
   'thank',
   'you',
   'for',
   'standing',
   'against',
   'all',
   'the',
   'pressure',
   'and',
   'for',
   'doing',
   'whats',
   'right',
   'for',
   'americans'],
  'no stop words': ['@senjoemanchin',
   'thank',
   'standing',
   'pressure',
   'whats',
   'right',
   'americans']},
 {'extractionTime': 1639924782,
  'geo': None,
  'id': 1472577386281463808,
  'lang': 'en',
  'text': '@pattymurray @pattymurray i dont see you riding horseback to washington dc oh wait thats right you httpstcojdvxtne7mk',
  'user': 'ClydeKl

In [8]:
cv = CountVectorizer(inputCol="no stop words", outputCol="features", vocabSize=500, minDF=3.0)
cv_model = cv.fit(processed_df)
lda = LDA(k=5, maxIter=100)
model = lda.fit(cv_model.transform(processed_df))
for indices in model.describeTopics(15).select("termIndices").rdd.flatMap(lambda x: x).collect():
    print([cv_model.vocabulary[i] for i in indices], '\n')

['@potus', 'rt', 'money', 'americans', 'office', 'economy', 'took', 'pockets', 'brink', 'collapse', 'act', 'president', 'must', 'rights', '@thedemocrats'] 

['', '@senjoemanchin', '@potus', 'manchin', 'joe', '@senschumer', 'vote', 'rt', 'state', '@vp', '@kamalaharris', 'million', 'bbb', 'plan', '@joemanchinwv'] 

['@potus', '@joebiden', '@tedcruz', 'rt', 'get', 'back', '@lindseygrahamsc', 'us', '@kamalaharris', 'dont', 'like', 'going', 'better', 'people', 'build'] 

['', 'rt', '@tedcruz', 'party', 'democrat', 'mask', 'must', 'one', '@amyklobuchar', 'wear', 'want', 'forever', 'official', 'positioneverybody', 'better'] 

['@senjoemanchin', 'people', 'rt', '@senwarren', 'american', 'youre', 'know', 'thats', 'bill', 'thank', 'republican', 'democrats', 'tax', 'care', 'bbb'] 



# 3. LDA - topics analysis - SparkNLP

In [5]:
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol('document')
tokenizer = Tokenizer().setInputCols(['document']).setOutputCol('tokenized')
normalizer = Normalizer().setInputCols(['tokenized']).setOutputCol('normalized')
lemmatizer = LemmatizerModel.pretrained().setInputCols(['normalized']).setOutputCol('lemmatized')
stopwords_cleaner = StopWordsCleaner().setInputCols(['lemmatized'])\
.setOutputCol('unigrams').setStopWords(stopwords.words('english'))
pos_tagger = PerceptronModel.pretrained('pos_anc').setInputCols(['document', 'unigrams']).setOutputCol('pos')
chunker = Chunker().setInputCols(['document', 'pos']).setOutputCol('ngrams').setRegexParsers(['<JJ>+<NN>', '<NN>+<NN>'])
finisher = Finisher().setInputCols(['unigrams', 'ngrams'])
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 pos_tagger,
                 chunker,
                 finisher])
processed_df = pipeline.fit(df).transform(df).withColumn("final", f.concat("finished_unigrams", "finished_ngrams"))
processed_df.show()

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:39585)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3343, in 

Py4JError: An error occurred while calling o105.transform

In [None]:
cv = CountVectorizer(inputCol="final", outputCol="features", vocabSize=500, minDF=3.0)
cv_model = cv.fit(processed_df)
lda = LDA(k=5, maxIter=100)
model = lda.fit(cv_model.transform(processed_df))
for indices in model.describeTopics(15).select("termIndices").rdd.flatMap(lambda x: x).collect():
    print([cv_model.vocabulary[i] for i in indices], '\n')