In [None]:
import os 
import pandas as pd
import json
import nltk
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk import word_tokenize, sent_tokenize
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

In [2]:
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/lam.nguyen/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/lam.nguyen/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/lam.nguyen/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [82]:
num_topics = 5
max_iter = 10

# Preprocessing

In [3]:
# def merge_files() -> None:
#     """Merge all JSON files of a folder

#     Sample folder structure
#     ```
#     └── 📁data
#         └── 📁merged
#             └── test.json
#         └── 📁processed
#         └── 📁raw
#             └── test01.json
#             └── test02.json
#     ```
#     Args:
#         None
#     Return:
#         returns None"""
#     files = os.listdir(os.path.join("..", "data", "raw"))
#     text_todf = []
#     for i in range(len(files)):
#         with open(os.path.join("..", "data", "raw", files[i]), 'r') as file:
#             data = json.load(file)
#             df_metadata = pd.json_normalize(data, max_level=1)
#             text = ""
#             for element in df_metadata['body_text'][0]:
#                 text += element['text']
#             text_todf.append(text)
#     df = pd.DataFrame({"text": text_todf})
#     df.to_json(os.path.join("..", "data", "merged", "corpus.json"))

In [38]:
# Read data
data = pd.read_json(os.path.join("..", "data", "merged", "test.json"))

In [5]:
def custom_word_tokenize(sent_tokens: list[str]) -> list[str]:
    """Tokenises words 
    
    Args:
        sent_tokens: tokens of sentences
    Returns:
        A list of word tokens"""
    word_tokens = []
    for sent in sent_tokens:
        tokens = word_tokenize(sent)
        word_tokens.extend(tokens)
    return word_tokens

In [41]:
def preprocess(data: pd.DataFrame) -> pd.DataFrame:
    """Performs the preprocessing
    
    Args:
        data: input file as pandas.DataFrame
    Returns:
        A pandas.DataFrame which as vector form of the input
    """
    stop_words = stopwords.words('english')
    lemmatiser = WordNetLemmatizer()

    #Sentence tokenisation
    data['sent_tokens'] = data['text'].apply(sent_tokenize)
    
    # Text cleaning
    data['sent_tokens'] = data['sent_tokens'].apply(lambda sentences: [re.sub(r"[^a-zA-Z\s]", "", sent) for sent in sentences])
    
    # # Normalisation
    data['sent_tokens'] = data['sent_tokens'].apply(lambda sentences: [sent.lower() for sent in sentences])

    #Word tokenisation
    data['word_tokens'] = data['sent_tokens'].apply(custom_word_tokenize)

    # # Stemming
    data['word_tokens'] = data['word_tokens'].apply(lambda word_tokens: [lemmatiser.lemmatize(word) for word in word_tokens])

    # Word Embeddings (turns into vector)
    data['to_tfidf'] = data['word_tokens'].apply(lambda tokens: ' '.join(tokens))
    vectorizer = TfidfVectorizer(min_df=0.3, max_df=0.85, stop_words=stop_words)
    tfidf_matrix = vectorizer.fit_transform(data['to_tfidf'])
    
    # Convert to DataFrame
    tfidf = pd.DataFrame(tfidf_matrix.toarray(), columns=vectorizer.get_feature_names_out())
    return data, tfidf

In [42]:
data, tfidf  = preprocess(data)

In [46]:
tfidf

Unnamed: 0,ability,able,academy,acceleration,access,accordance,according,account,accuracy,accurate,...,widely,window,work,working,world,worldwide,would,writing,wtft,wtfts
0,0.058747,0.029374,0.029374,0.029374,0.058747,0.029374,0.0,0.0,0.117495,0.029374,...,0.029374,0.029374,0.0,0.0,0.0,0.0,0.117495,0.0,0.234989,0.029374
1,0.0,0.0,0.0,0.0,0.0,0.0,0.041885,0.041885,0.0,0.0,...,0.0,0.0,0.041885,0.041885,0.041885,0.083771,0.0,0.041885,0.0,0.0


# Pyspark LDA

In [43]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

In [10]:
# Create spark spession
spark = SparkSession.builder\
        .appName("Nhom09_PySparkLDA")\
        .master("local[*]")\
        .config("spark.driver.bindAddress", "localhost")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 08:49:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Fit tfidf (panads.DataFrame) into data structure of spark
tfidf_ps = spark.createDataFrame(tfidf)
tfidf_ps = tfidf_ps.rdd.map(lambda row: (Vectors.dense(row), )).toDF(["features"])

                                                                                

In [45]:
from pyspark.ml.clustering import LDA

# Create an LDA model
lda = LDA(k=num_topics, maxIter=max_iter)  # k is the number of 
# Fit the LDA model
lda_model = lda.fit(tfidf_ps)
# Describe topics
topics = lda_model.describeTopics(3)  # Get the top 3 terms for each topic
topics.show(truncate=False)

                                                                                

+-----+---------------+---------------------------------------------------------------------+
|topic|termIndices    |termWeights                                                          |
+-----+---------------+---------------------------------------------------------------------+
|0    |[404, 41, 105] |[0.002398466433135447, 0.002334772230784771, 0.0023063777294193063]  |
|1    |[343, 164, 35] |[0.0023859780701145855, 0.0023294478680108786, 0.0023203198160067742]|
|2    |[108, 8, 232]  |[0.0024689895728190795, 0.002377024731400353, 0.0023564226010567954] |
|3    |[176, 220, 273]|[0.0023427878938911107, 0.0023394471897149522, 0.0023343600894544132]|
|4    |[26, 196, 278] |[0.0024366442857787266, 0.002359836168195985, 0.002356088139912443]  |
+-----+---------------+---------------------------------------------------------------------+



# SparkNLP

In [83]:
import sparknlp
import os
import pandas as pd
import string
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.clustering import LDA

In [2]:
sparknlp_session = sparknlp.start(params={"spark.driver.host": "localhost",
                                          "spark.driver.port": "9999",
                                          "spark.driver.bindAddress": "127.0.0.1"})

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", sparknlp_session.version)

:: loading settings :: url = jar:file:/Users/lam.nguyen/Desktop/GithubClone/BigData_Final/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/lam.nguyen/.ivy2/cache
The jars for the packages stored in: /Users/lam.nguyen/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5a92852d-f11f-45bd-89e7-379d8bc12258;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;6.0.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500

Spark NLP version:  6.0.0
Apache Spark version:  3.5.5


In [3]:
# Read data
corpus = pd.read_json(os.path.join("..", "data", "merged", "test.json"))
corpus

Unnamed: 0,text
0,As a consequence of the global COVID-19 pandem...
1,According to current live statistics at the ti...


In [4]:
# Change from pandas.DataFrame to Spark DataFrame
sparknlp_df = sparknlp_session.createDataFrame(corpus)
sparknlp_df.show(truncate=False)

                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
# Document Assembler: Converts input text into a suitable format for NLP processing
documentAssembler = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")\
        .setCleanupMode("shrink")

doc_df = documentAssembler.transform(sparknlp_df)

doc_df.show(truncate=True)
        

+--------------------+--------------------+
|                text|            document|
+--------------------+--------------------+
|As a consequence ...|[{document, 0, 68...|
|According to curr...|[{document, 0, 42...|
+--------------------+--------------------+



                                                                                

In [None]:
# Sentence tokenisation
sentenceDetector = SentenceDetector()\
      .setInputCols(['document'])\
      .setOutputCol('sentences')

sent_df = sentenceDetector.transform(doc_df)
sent_df.show(truncate=True)

+--------------------+--------------------+--------------------+
|                text|            document|           sentences|
+--------------------+--------------------+--------------------+
|As a consequence ...|[{document, 0, 68...|[{document, 0, 21...|
|According to curr...|[{document, 0, 42...|[{document, 0, 18...|
+--------------------+--------------------+--------------------+



                                                                                

In [31]:
# Tokenisation and Normalisation 
tokeniser = Tokenizer()\
    .setInputCols(["sentences"]) \
    .setOutputCol("token")

token_df = tokeniser.fit(sent_df).transform(sent_df)
token_df.select('token.result').take(1)

                                                                                

[Row(result=['As', 'a', 'consequence', 'of', 'the', 'global', 'COVID-19', 'pandemic', ',', 'an', 'acceleration', 'in', 'the', 'implementation', 'of', 'telemedicine', 'occurred', 'in', 'order', 'to', 'better', 'triage', 'patients', 'while', 'maintaining', 'medical', 'resources', 'and', 'promoting', 'social', 'distancing', '.', 'Concurrent', 'improvements', 'in', 'video', 'conferencing', 'technology', 'and', 'the', 'convenience', 'afforded', 'by', 'telemedicine', 'have', 'allowed', 'virtual', 'health', 'practices', 'to', 'persist', 'even', 'after', 'many', 'clinics', 'have', 'begun', 'returning', 'to', 'in-person', 'practice', '.', 'Nevertheless', ',', 'a', 'primary', 'disadvantage', 'of', 'telemedicine', 'remains', 'its', 'relative', 'inability', 'to', 'perform', 'a', 'complete', 'physical', 'examination', ',', 'at', 'least', 'not', 'in', 'the', 'same', 'manner', 'one', 'would', 'in', 'clinic', '.', 'In', 'particular', ',', 'an', 'effective', 'otologic', 'exam', 'remains', 'elusive.The'

In [None]:
# Text Cleaning 
normaliser = Normalizer()\
     .setInputCols("token")\
     .setOutputCol("normalised")\
     .setLowercase(True)\
     .setCleanupPatterns(["[^a-zA-Z\s]"])



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [None]:
# Stopword Removal
stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalised")\
    .setOutputCol("stopwords_removed")\
    .setCaseSensitive(False)

In [None]:
# Lemmatisation
lemmatizer = LemmatizerModel.pretrained()\
    .setInputCols(["stopwords_removed"])\
    .setOutputCol("lemma")

lemma_antbnc download started this may take some time.


25/05/01 21:25:43 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


Approximate size to download 907.6 KB
[OK!]


In [70]:
finisher = Finisher() \
    .setInputCols("lemma") \
    .setOutputCols("finish") \
    .setIncludeMetadata(False) # set to False to remove metadata

pipeline = Pipeline().setStages([
     documentAssembler,
     sentenceDetector,
     tokeniser,
     normaliser,
     stopwords_cleaner,
     lemmatizer,
     finisher
])
result = pipeline.fit(sparknlp_df).transform(sparknlp_df)
result.select("finish").show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [74]:
tfizer = CountVectorizer(inputCol='finish', outputCol='tf_features')
tf_model = tfizer.fit(result)
tf_result = tf_model.transform(result)
tf_result.show(5)

                                                                                

+--------------------+--------------------+--------------------+
|                text|              finish|         tf_features|
+--------------------+--------------------+--------------------+
|As a consequence ...|[consequence, glo...|(576,[0,1,3,4,5,6...|
|According to curr...|[accord, current,...|(576,[0,1,2,3,4,5...|
+--------------------+--------------------+--------------------+



                                                                                

In [81]:
idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)
tfidf_result.show(5)

                                                                                

+--------------------+--------------------+--------------------+--------------------+
|                text|              finish|         tf_features|     tf_idf_features|
+--------------------+--------------------+--------------------+--------------------+
|As a consequence ...|[consequence, glo...|(576,[0,1,3,4,5,6...|(576,[0,1,3,4,5,6...|
|According to curr...|[accord, current,...|(576,[0,1,2,3,4,5...|(576,[0,1,2,3,4,5...|
+--------------------+--------------------+--------------------+--------------------+



In [84]:
lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)
topics = lda_model.describeTopics(3)  # Get the top 3 terms for each topic
topics.show(truncate=False)

                                                                                

+-----+---------------+---------------------------------------------------------------------+
|topic|termIndices    |termWeights                                                          |
+-----+---------------+---------------------------------------------------------------------+
|0    |[403, 3, 395]  |[0.00229576121278365, 0.0022629753171704396, 0.0021983315604854497]  |
|1    |[145, 206, 243]|[0.002363763783625437, 0.002311296300993184, 0.0022137760633377515]  |
|2    |[62, 151, 546] |[0.0022826458617294563, 0.0022737865312271468, 0.0022717844376154207]|
|3    |[490, 349, 76] |[0.002365765042149912, 0.0022072434298510374, 0.0022055900246234204] |
|4    |[448, 548, 317]|[0.002326554154138089, 0.0023005053558257226, 0.0022530961892851076] |
+-----+---------------+---------------------------------------------------------------------+



In [None]:
def preprocess(data: pd.DataFrame) -> pd.DataFrame:
    """Performs the preprocessing using SparkNLP
    
    Args:
        data: input file as pandas.DataFrame
    Returns:
        A pandas.DataFrame which as vector form of the input
    """
    # Sentence tokenisation
    
    
    # Word tokenisation and Normalisation
    
    
    # Text cleaning
    

    # Lemmatisation
    

    # Word Embeddings (turns into vector)
    
    
    # Convert to DataFrame
    
    return data, tfidf