In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python

import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
from time import time
from nltk.corpus import stopwords
import pyLDAvis
from IPython.display import FileLink
import warnings
import os
import glob
warnings.filterwarnings("ignore")

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.1.tar.gz (212.3 MB)
[K     |████████████████████████████████| 212.3 MB 14 kB/s s eta 0:00:01    |█████████████▉                  | 92.1 MB 48.4 MB/s eta 0:00:03     |██████████████████▍             | 121.9 MB 48.4 MB/s eta 0:00:02
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 44.8 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=e7778aacf53d04405c96edb2557f7dc9deccbc0d7e7c1810c8984b4e811b251b
  Stored in directory: /root/.cache/pip/wheels/43/47/42/bc413c760cf9d3f7b46ab7cd6590e8c47ebfd19a7386cd4a57
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [3]:
from pyspark import SparkContext
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover,IDF
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import size,explode
from pyspark.sql import functions as F
from pyspark.ml.clustering import LDA
import glob
from pathlib import Path

In [4]:
# defining stop words
stop_words = set(stopwords.words('english'))

In [5]:
## Initialising spark session - 
def init_spark():
    spark = SparkSession.builder.master('local').config('spark.executor,memory','60g') \
    .config('spark.driver.memory','60g').config('spark.memory.offHeap.enabled',True)\
    .config('spark.memory.offHeap.size','100g').appName('spark_proj').getOrCreate()
    
    return spark

In [6]:
spark = init_spark()

In [7]:
root_path = Path('/kaggle/input/CORD-19-research-challenge/')
metadata_path = root_path / Path('metadata.csv')

metadata = pd.read_csv(metadata_path, dtype={
    'pubmed_id': str,
    'Microsoft Academic Paper ID': str, 
    'doi': str
})

metadata.rename(columns={'source_x': 'source', 'Microsoft Academic Paper ID': 'mic_id', 'WHO #Covidence': 'who_covidence'}, inplace=True)

print("There are ", len(metadata), " sources in the metadata file.")

metadata.head(2)

There are  522159  sources in the metadata file.


Unnamed: 0,cord_uid,sha,source,title,doi,pmcid,pubmed_id,license,abstract,publish_time,authors,journal,mag_id,who_covidence_id,arxiv_id,pdf_json_files,pmc_json_files,url,s2_id
0,ug7v899j,d1aafb70c066a2068b02786f8929fd9c900897fb,PMC,Clinical features of culture-proven Mycoplasma...,10.1186/1471-2334-1-6,PMC35282,11472636,no-cc,OBJECTIVE: This retrospective chart review des...,2001-07-04,"Madani, Tariq A; Al-Ghamdi, Aisha A",BMC Infect Dis,,,,document_parses/pdf_json/d1aafb70c066a2068b027...,document_parses/pmc_json/PMC35282.xml.json,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3...,
1,02tnwd4m,6b0567729c2143a66d737eb0a2f63f2dce2e5a7d,PMC,Nitric oxide: a pro-inflammatory mediator in l...,10.1186/rr14,PMC59543,11667967,no-cc,Inflammatory diseases of the respiratory tract...,2000-08-15,"Vliet, Albert van der; Eiserich, Jason P; Cros...",Respir Res,,,,document_parses/pdf_json/6b0567729c2143a66d737...,document_parses/pmc_json/PMC59543.xml.json,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5...,


In [8]:
# Function to read json file in spark dataframe - 
def read_json_files(root_path,spark,n):
    json_dir = str(root_path) + '/document_parses/pdf_json/'
    filenames = os.listdir(json_dir)
    all_json = [json_dir+filename for filename in filenames]
    all_json = all_json[:n]
    json_data = spark.read.json(all_json,multiLine=True)
    json_data.createOrReplaceTempView("json_data")
    return json_data

In [9]:
a = read_json_files(root_path,spark,n=10)

In [10]:
# Function to read article text - 
def get_body_text(json_data,spark):
    
    b = spark.sql(
    """
    Select body_text.text as body_text,paper_id
    from json_data
    """
    )
    return b

In [11]:
# Function to clean and render sentences - 
def clean_sentence(sen):
    clean = [word for word in sen.split(' ') if word.isalnum()]
    clean = [word.lower() for word in clean]
    clean = [word for word in clean if word not in stop_words]
    clean = [word for word in clean if len(word)>=4]
    return clean

def clean_up(document):
    clean = [clean_sentence(a) for a in document]
    joined = [' '.join(a) for a in clean]
    return joined

def topic_render(topic,vocab,wordNum):
    terms = topic[1]
    result = []
    for i in range(wordNum):
        term = vocab[terms[i]]
        result.append(term)
    return result

In [12]:
## main

#n = len(glob.glob(str(root_path)+ '/document_parses/pdf_json/*.json' ,recursive=False))
n=10000
start = time()
json_files = read_json_files(root_path,spark,n)
data = get_body_text(json_files,spark)
print('Done data reading...')

#clean data - 
word_clean_F = F.udf(lambda x: clean_up(x), StringType())
data = data.withColumn('clean_body_text',word_clean_F('body_text'))
data = data.select('clean_body_text')
print('done cleaning data ...')

#tokenzing - 
tokeniszer = Tokenizer(inputCol='clean_body_text',outputCol='tokenized_words')
token_df = tokeniszer.transform(data).select('tokenized_words')
print('Tokenizing done ...')

# Stop word remover -
rem = StopWordsRemover(inputCol='tokenized_words',outputCol='filtered')
clean_df = rem.transform(token_df).select('filtered')
print('Stop words removed ...')

# Count Vectorizer - 
cv = CountVectorizer(inputCol='filtered',outputCol='cv_feat')
cvmodel = cv.fit(clean_df)
count_df = cvmodel.transform(clean_df).select('cv_feat')
print('Done Count Vectorizer ..')

# TF-IDF -
tfidf = IDF(inputCol='cv_feat',outputCol='features')
tfidfmodel = tfidf.fit(count_df)
tfidf_df = tfidfmodel.transform(count_df).select('features')
print('Done TF-IDF ...')
end = time()
print('total preprocessing time: ',end-start,' seconds')

##LDA model -- 
topics = 10
max_iter = 10
start = time()
lda = LDA(optimizer='em',k=topics,maxIter=max_iter)
lda_model = lda.fit(tfidf_df)
lda_trans = lda_model.transform(tfidf_df)
end = time()
print('LDA Ccomplete')
print('total modeling time: ',end-start,' seconds')

Done data reading...
done cleaning data ...
Tokenizing done ...
Stop words removed ...
Done Count Vectorizer ..
Done TF-IDF
total preprocessing time:  532.4119021892548  seconds
LDA Ccomplete
total modeling time:  1363.6192405223846  seconds


In [13]:
# terms per topic - 
topics = lda_model.topicsMatrix()
vocabArray = cvmodel.vocabulary
wordnum = 20
topicIndices = lda_model.describeTopics(maxTermsPerTopic=wordnum).rdd.map(tuple)
topic_final = topicIndices.map(lambda topic: topic_render(topic,vocabArray,wordnum)).collect()
for topic in range(len(topic_final)):
    print('Topic '+str(topic)+':')
    print(topic_final[topic])

Topic 0:
['cells', 'patients', 'cell', 'mice', 'protein', 'expression', 'viral', 'license', 'preprint', 'medrxiv', 'immune', 'virus', 'lung', 'binding', 'tourism', 'activity', 'infection', 'gene', 'model', 'proteins']
Topic 1:
['patients', 'cells', 'eine', 'license', 'patienten', 'cell', 'medrxiv', 'preprint', 'lesions', 'expression', 'protein', 'blood', 'einer', 'lung', 'holder', 'activity', 'treatment', 'immune', 'nach', 'children']
Topic 2:
['eine', 'einer', 'sich', 'auch', 'voor', 'nicht', 'oder', 'durch', 'nach', 'zijn', 'cells', 'sind', 'dass', 'patients', 'werden', 'patienten', 'worden', 'niet', 'wird', 'door']
Topic 3:
['patients', 'cells', 'gvhd', 'cell', 'stem', 'transplantation', 'preprint', 'medrxiv', 'allogeneic', 'license', 'hsct', 'mice', 'viral', 'participants', 'waste', 'treatment', 'patient', 'cancer', 'conditioning', 'health']
Topic 4:
['cells', 'patients', 'license', 'medrxiv', 'preprint', 'protein', 'cell', 'viral', 'health', 'children', 'vaccine', 'vitamin', 'mode

In [14]:
# Data Visualization - 
def format_data_pyldavis(clean_df,cvmodel,lda_trans,lda_model):
    counts = clean_df.select((explode(clean_df.filtered)).alias('tokens')).groupby('tokens').count()
    wc = {i['tokens']:i['count'] for i in counts.collect()}
    wc = [wc[x] for x in cvmodel.vocabulary]
    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T,
           'doc_topic_dists': np.array([x.toArray() for x in lda_trans.select(['topicDistribution']).toPandas()['topicDistribution']]),
           'doc_lengths': [x[0] for x in clean_df.select(size(clean_df.filtered)).collect()],
           'vocab': cvmodel.vocabulary,
           'term_frequency': wc}
    return data

data = format_data_pyldavis(clean_df,cvmodel,lda_trans,lda_model)
py_lda_data = pyLDAvis.prepare(**data)
file_name = '/output/data-viz.html'
pyLDAvis.display(py_lda_data)

In [15]:
pyLDAvis.save_html(py_lda_data,'viz.html')

In [16]:
FileLink(r'viz.html')

In [17]:
spark.stop()