In [1]:
import pyspark
import os
import warcio
import re
import nltk
nltk.download('words')
from nltk.corpus import words
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as sql_sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.clustering import LDA
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, explode, lit,array
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
import gensim
from gensim import corpora
from gensim.models import LdaModel
from gensim.corpora import Dictionary
import pyLDAvis.gensim_models as gensimvis
import pyLDAvis
from scipy.sparse import csr_matrix
spark = SparkSession.builder \
    .appName("myApp") \
    .config("spark.driver.memory", "32g") \
    .getOrCreate()
spark

[nltk_data] Downloading package words to /Users/rohan/nltk_data...
[nltk_data]   Package words is already up-to-date!


23/07/27 17:33:08 WARN Utils: Your hostname, Rohans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
23/07/27 17:33:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/27 17:33:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Defining a schema and the corresponding data types to store the web crawl data.
### Attributions :
##### https://groups.google.com/g/common-crawl/c/yEP1Lt1_B4E

In [2]:
schema = StructType([
    StructField("WARC-Type", StringType(), True),
    StructField("WARC-Target-URI", StringType(), True),
    StructField("WARC-Date", StringType(), True),
    StructField("WARC-Record-ID", StringType(), True),
    StructField("WARC-Refers-To", StringType(), True),
    StructField("WARC-Block-Digest", StringType(), True),
    StructField("WARC-Identified-Content-Language", StringType(), True),
    StructField("Content-Type", StringType(), True),
    StructField("Content-Length", IntegerType(), True),
    StructField("Content", StringType(), True)
])

#### Reading the WARC data from the file .wet file, and storing the headers and content in a list 'records'.
### Attributions :
##### https://www.programcreek.com/python/example/127575/warcio.archiveiterator.ArchiveIterator

In [3]:
records = []
with open('crawl.wet.gz', 'rb') as data:
    for i in warcio.archiveiterator.ArchiveIterator(data):
        headers = dict(i.rec_headers.headers)
        headers['Content-Length'] = int(headers['Content-Length'])
        headers['Content'] = i.raw_stream.read().decode('utf-8')
        records.append(headers)

#### Creating a spark dataframe out of the list records in the format of the schema built above

In [4]:
spark_df = spark.createDataFrame(records, schema=schema)

#### Tokenizing the 'Content' column in the DataFrame using a regex pattern that matches non-word characters, and storing the tokens in a new column 'words'.
#### Attributions :
##### https://www.sparkitecture.io/natural-language-processing/data-preparation

In [5]:
tokenizer = RegexTokenizer(inputCol="Content", outputCol="words", pattern="\\W")
tokenized_df = tokenizer.transform(spark_df)

#### Removing stopwords from the 'words' column, and storing the filtered words in a new column 'filtered'.
#### Attributions :
##### https://ashokpalivela.medium.com/multi-class-text-classification-using-spark-ml-in-python-b8d2a6545cb

In [6]:
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_df = stopwords_remover.transform(tokenized_df)

#### Instantiated a CountVectorizer with the specified parameters to vectorize the 'english_words' column, and stored the results in a new column 'rawFeatures'.  Additionally, calculated the TF-IDF values of the words in the 'rawFeatures' column, and stored the results in a new column 'features'.

In [7]:
english_words = set(words.words())
def english(words):
    return [word for word in words if word.lower() in english_words]
english_udf = udf(english, ArrayType(StringType()))
english_df = filtered_df.withColumn("english_words", english_udf("filtered"))

In [8]:
vectorizer = CountVectorizer(inputCol="english_words", outputCol="rawFeatures", vocabSize=5000, minDF=5.0)
vectorized_df = vectorizer.fit(english_df).transform(english_df)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(vectorized_df)
tfidf_vectors = idf_model.transform(vectorized_df)

23/07/27 17:33:22 WARN TaskSetManager: Stage 0 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:34:16 WARN TaskSetManager: Stage 4 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

#### Training an LDA model on the 'features' column, with the specified number of topics and iterations and extracting the top terms of each topicfrom the LDA model, and store them in a new DataFrame 'topics_with_terms'.
### Attributions :
##### https://medium.com/analytics-vidhya/distributed-topic-modelling-using-spark-nlp-and-spark-mllib-lda-6db3f06a4da3

In [9]:
num_topics = 20
lda = LDA(k=num_topics, maxIter=10)
lda_model = lda.fit(tfidf_vectors)

23/07/27 17:35:09 WARN TaskSetManager: Stage 6 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:36:00 WARN TaskSetManager: Stage 7 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.
23/07/27 17:36:00 WARN TaskSetManager: Stage 8 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.
23/07/27 17:36:01 WARN TaskSetManager: Stage 9 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 9:>                                                          (0 + 8) / 8]

23/07/27 17:36:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/07/27 17:36:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


[Stage 11:>                                                         (0 + 0) / 1]

23/07/27 17:36:27 WARN TaskSetManager: Stage 11 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 12:>                                                         (0 + 0) / 8]

23/07/27 17:36:28 WARN TaskSetManager: Stage 12 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 14:>                                                         (0 + 0) / 1]

23/07/27 17:36:40 WARN TaskSetManager: Stage 14 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:36:41 WARN TaskSetManager: Stage 15 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 17:>                                                         (0 + 1) / 1]

23/07/27 17:36:51 WARN TaskSetManager: Stage 17 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:36:52 WARN TaskSetManager: Stage 18 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 20:>                                                         (0 + 0) / 1]

23/07/27 17:37:01 WARN TaskSetManager: Stage 20 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 21:>                                                         (0 + 0) / 8]

23/07/27 17:37:04 WARN TaskSetManager: Stage 21 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 23:>                                                         (0 + 0) / 1]

23/07/27 17:37:13 WARN TaskSetManager: Stage 23 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 24:>                                                         (0 + 0) / 8]

23/07/27 17:37:15 WARN TaskSetManager: Stage 24 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:37:23 WARN TaskSetManager: Stage 26 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:37:24 WARN TaskSetManager: Stage 27 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:37:33 WARN TaskSetManager: Stage 29 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


[Stage 30:>                                                         (0 + 0) / 8]

23/07/27 17:37:34 WARN TaskSetManager: Stage 30 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:37:41 WARN TaskSetManager: Stage 32 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.
23/07/27 17:37:42 WARN TaskSetManager: Stage 33 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

23/07/27 17:37:46 WARN TaskSetManager: Stage 35 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.
23/07/27 17:37:46 WARN TaskSetManager: Stage 36 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

#### extracting the vocabulary that the model has learned

In [10]:
vectorizer_model = vectorizer.fit(english_df)
vocab = vectorizer_model.vocabulary

23/07/27 17:37:54 WARN TaskSetManager: Stage 38 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [11]:
topics = lda_model.describeTopics(maxTermsPerTopic=10)
topic_terms_udf = udf(lambda indices: [vocab[i] for i in indices], ArrayType(StringType()))
topics_with_terms = topics.withColumn("topic_terms", topic_terms_udf("termIndices"))
topics_with_terms.select("topic", "topic_terms").show(truncate=False)

+-----+-------------------------------------------------------------------------------------+
|topic|topic_terms                                                                          |
+-----+-------------------------------------------------------------------------------------+
|0    |[add, price, march, cart, share, august, product, june, sign, new]                   |
|1    |[n, c, h, v, l, th, m, b, p, k]                                                      |
|2    |[maximum, file, translate, chapter, reply, ago, data, interested, one, post]         |
|3    |[tax, request, quite, letter, estate, ruling, bulletin, bell, section, court]        |
|4    |[array, depth, maximum, null, employer, name, vacancy, id, notification, title]      |
|5    |[function, e, o, return, b, n, c, r, h, f]                                           |
|6    |[l, ban, p, k, quite, saj, term, elm, web, letter]                                   |
|7    |[every, r, f, til, fa, board, till, administrator, us

In [12]:
topics_with_terms.toPandas()

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


Unnamed: 0,topic,termIndices,termWeights,topic_terms
0,0,"[78, 77, 65, 101, 104, 117, 93, 137, 112, 21]","[0.0076941325046242575, 0.007693776934229889, ...","[add, price, march, cart, share, august, produ..."
1,1,"[1, 4, 12, 14, 3, 39, 5, 11, 13, 16]","[0.06723138205299167, 0.043122568105190474, 0....","[n, c, h, v, l, th, m, b, p, k]"
2,2,"[173, 356, 547, 395, 221, 248, 26, 1582, 49, 64]","[0.006573568314335474, 0.004896075790143036, 0...","[maximum, file, translate, chapter, reply, ago..."
3,3,"[605, 380, 1871, 544, 589, 2013, 2002, 2296, 2...","[0.01011432131329287, 0.00936863577780147, 0.0...","[tax, request, quite, letter, estate, ruling, ..."
4,4,"[119, 181, 173, 54, 264, 32, 592, 38, 396, 76]","[0.08952572453929379, 0.07060915439696092, 0.0...","[array, depth, maximum, null, employer, name, ..."
5,5,"[17, 2, 8, 31, 11, 1, 4, 6, 12, 18]","[0.04961851585104877, 0.043057672677134784, 0....","[function, e, o, return, b, n, c, r, h, f]"
6,6,"[3, 259, 13, 16, 1871, 890, 327, 895, 71, 544]","[0.05028156081725143, 0.030018590333076304, 0....","[l, ban, p, k, quite, saj, term, elm, web, let..."
7,7,"[410, 6, 18, 385, 545, 278, 988, 971, 168, 366]","[0.02969765575496639, 0.023195168972654497, 0....","[every, r, f, til, fa, board, till, administra..."
8,8,"[2377, 1336, 1760, 681, 497, 654, 478, 2855, 1...","[0.029710120563884515, 0.016261543288942824, 0...","[contest, song, tagged, cloud, tour, server, d..."
9,9,"[116, 43, 150, 127, 82, 597, 145, 44, 87, 149]","[0.03071867080424468, 0.03040812990634948, 0.0...","[margin, color, padding, border, block, marker..."


In [13]:
n = 3
top_topics = topics_with_terms.select("topic", "topic_terms").limit(n).collect()
top_topics_list = [(row.topic, row.topic_terms) for row in top_topics]

In [14]:
print("The top", n, "topics are:", top_topics_list)

The top 3 topics are: [(0, ['add', 'price', 'march', 'cart', 'share', 'august', 'product', 'june', 'sign', 'new']), (1, ['n', 'c', 'h', 'v', 'l', 'th', 'm', 'b', 'p', 'k']), (2, ['maximum', 'file', 'translate', 'chapter', 'reply', 'ago', 'data', 'interested', 'one', 'post'])]


#### Defining a function to convert a sparse vector from the 'rawFeatures' column to a SciPy sparse matrix. I have converted each vector in the 'rawFeatures' column to a SciPy sparse matrix, and stored them in a list.
### Attributions :
##### https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html

In [15]:
def scipy_sparse(vector):
    return csr_matrix((vector.values, vector.indices, [0, len(vector.values)]), shape=(1, len(vector)))
sparse_matrices = [scipy_sparse(vector) for vector in vectorized_df.select("rawFeatures").rdd.map(lambda x: x[0]).collect()]

23/07/27 17:38:58 WARN TaskSetManager: Stage 46 contains a task of very large size (41255 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

#### Converting scipy sparse matrices to a gensim corpus

In [16]:
def gensim_corpus(sparse_matrices):
    for matrix in sparse_matrices:
        yield [(i, float(matrix[0, i])) for i in matrix.indices]
corpus = list(gensim_corpus(sparse_matrices))

#### Creating a Gensim Dictionary from the vocabulary of the CountVectorizer and filtering out the outliers.
### Attributions:
##### https://www.tutorialspoint.com/gensim/gensim_creating_a_dictionary.htm
##### https://tedboy.github.io/nlps/generated/generated/gensim.corpora.Dictionary.filter_extremes.html

In [17]:
gensim_vocab = gensim.corpora.Dictionary([vectorizer_model.vocabulary])
gensim_vocab.filter_extremes(no_below=1, no_above=1.0, keep_n=None)

#### Training a Gensim LDA model on the Gensim corpus with the specified number of topics and preparing the Gensim LDA model's data for visualization using the pyLDAvis library.
### Attributions :
##### https://www.machinelearningplus.com/nlp/topic-modeling-gensim-python/

In [18]:
gensim_lda_model = gensim.models.LdaModel(corpus, num_topics=num_topics, id2word=gensim_vocab)

In [19]:
vis = gensimvis.prepare(gensim_lda_model, corpus, gensim_vocab)

  default_term_info = default_term_info.sort_values(


In [20]:
pyLDAvis.display(vis)