In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder


In [2]:
spark = SparkSession \
    .builder \
    .appName('Project4') \
    .getOrCreate()

In [3]:
meta_data = spark.read\
            .format("csv")\
            .option("header", "true")\
            .load("/home/bitnami/Projects/Project4/metadata.csv")

In [4]:
covid_papers = spark.read\
            .format("json")\
            .option("multiLine", "true")\
            .load("/home/bitnami/Projects/Project4/pdf_json/")

In [5]:
meta_data.show()

+--------+--------------------+--------+--------------------+--------------------+---------+---------+---------+--------------------+------------+--------------------+--------------------+------+----------------+--------+--------------------+--------------------+--------------------+-----+
|cord_uid|                 sha|source_x|               title|                 doi|    pmcid|pubmed_id|  license|            abstract|publish_time|             authors|             journal|mag_id|who_covidence_id|arxiv_id|      pdf_json_files|      pmc_json_files|                 url|s2_id|
+--------+--------------------+--------+--------------------+--------------------+---------+---------+---------+--------------------+------------+--------------------+--------------------+------+----------------+--------+--------------------+--------------------+--------------------+-----+
|ug7v899j|d1aafb70c066a2068...|     PMC|Clinical features...|10.1186/1471-2334...| PMC35282| 11472636|    no-cc|OBJECTIVE: This

In [6]:
meta_data.printSchema()

root
 |-- cord_uid: string (nullable = true)
 |-- sha: string (nullable = true)
 |-- source_x: string (nullable = true)
 |-- title: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- pmcid: string (nullable = true)
 |-- pubmed_id: string (nullable = true)
 |-- license: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- mag_id: string (nullable = true)
 |-- who_covidence_id: string (nullable = true)
 |-- arxiv_id: string (nullable = true)
 |-- pdf_json_files: string (nullable = true)
 |-- pmc_json_files: string (nullable = true)
 |-- url: string (nullable = true)
 |-- s2_id: string (nullable = true)



In [7]:
# counting nans 
meta_data.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column  in meta_data.columns]).show()

+--------+------+--------+-----+------+------+---------+-------+--------+------------+-------+-------+------+----------------+--------+--------------+--------------+------+-----+
|cord_uid|   sha|source_x|title|   doi| pmcid|pubmed_id|license|abstract|publish_time|authors|journal|mag_id|who_covidence_id|arxiv_id|pdf_json_files|pmc_json_files|   url|s2_id|
+--------+------+--------+-----+------+------+---------+-------+--------+------------+-------+-------+------+----------------+--------+--------------+--------------+------+-----+
|       0|399672|       0|  302|277692|389860|   323028|    661|  160110|         621|  15698|  37692|583915|          337996|  578827|        388907|        427246|252432|59359|
+--------+------+--------+-----+------+------+---------+-------+--------+------------+-------+-------+------+----------------+--------+--------------+--------------+------+-----+



In [8]:
covid_papers.show(10)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            abstract|         back_matter|         bib_entries|           body_text|            metadata|            paper_id|         ref_entries|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  []|                  []|[[[[D, Antonson, ...|[[[], [], INTRODU...|              [[], ]|ff78ee43150a59344...|[[, Decrease or e...|
|[[[], [], Abstrac...|[[[[23, BIBREF267...|[[[[F, Abad, [X],...|[[[], [], SUMMARY...|[[[[European Food...|ff60baae582612fbf...|[[, Number of not...|
|[[[], [], Abstrac...|[[[], [], Acknowl...|[[[], , [,], , b0...|[[[[250, BIBREF0,...|[[[[Boston Univer...|ffec12aa4a9fd44c1...|[[, Development o...|
|[[[], [], Abstrac...|                  []|[[[[N, Hooper, [M...|[[[], [], Introdu...|[[[[Wrocław Unive...|

In [9]:
from pyspark.sql.functions import udf

def text_contact(col):
    text = ""
    for row in col:
        text = text + " " + row['text']
    return text

text_contact = udf(text_contact)

In [10]:
text_contact

<function __main__.text_contact(col)>

In [11]:
covid_papers = covid_papers.select(covid_papers['Paper_ID'], covid_papers['metadata']['title'], text_contact(covid_papers['abstract']))


In [12]:
covid_papers.show()

+--------------------+--------------------+----------------------+
|            Paper_ID|      metadata.title|text_contact(abstract)|
+--------------------+--------------------+----------------------+
|ff78ee43150a59344...|                    |                      |
|ff60baae582612fbf...|Scientific Opinio...|   A review of the ...|
|ffec12aa4a9fd44c1...|Humanized Mice fo...|   Live-attenuated ...|
|ffdf3927a517dbdb2...|Metallo-aminopept...|   Aminopeptidases ...|
|ffae196fecc4da7f1...|Emerging and Reem...|                      |
|ff6e7c70571d00234...|Grand Challenges ...|   Synopsis Billion...|
|fff4688e7755c8ec7...|Prophylaxis and E...|                      |
|ffc93dea5a5f4e6e6...|From Touchdown to...|                      |
|ff6c0b549e3f20ec7...|Increased Neurite...|   Extracellular β-...|
|ff4e235c4c231feb3...|POTENTIAL DETRIME...|   Lussier, G. 1988...|
|ffa07201a988202e9...|CUMULATIVE BIOLOG...|                      |
|ffd185ed9dce81cf6...|Health Benefi ts ...|   The Mediterranea

In [13]:
covid_papers = covid_papers.withColumnRenamed('metadata.title', 'Title')\
        .withColumnRenamed('text_contact(abstract)', 'Abstract')

In [14]:
covid_papers.show(10)

+--------------------+--------------------+--------------------+
|            Paper_ID|               Title|            Abstract|
+--------------------+--------------------+--------------------+
|ff78ee43150a59344...|                    |                    |
|ff60baae582612fbf...|Scientific Opinio...| A review of the ...|
|ffec12aa4a9fd44c1...|Humanized Mice fo...| Live-attenuated ...|
|ffdf3927a517dbdb2...|Metallo-aminopept...| Aminopeptidases ...|
|ffae196fecc4da7f1...|Emerging and Reem...|                    |
|ff6e7c70571d00234...|Grand Challenges ...| Synopsis Billion...|
|fff4688e7755c8ec7...|Prophylaxis and E...|                    |
|ffc93dea5a5f4e6e6...|From Touchdown to...|                    |
|ff6c0b549e3f20ec7...|Increased Neurite...| Extracellular β-...|
|ff4e235c4c231feb3...|POTENTIAL DETRIME...| Lussier, G. 1988...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [15]:
from pyspark.sql.functions import isnan, when, count, col

covid_papers.select([count(when(col(column) == "", column)).alias(column) for column in covid_papers.columns]).show()

+--------+-----+--------+
|Paper_ID|Title|Abstract|
+--------+-----+--------+
|       0|   27|      62|
+--------+-----+--------+



In [16]:
covid_papers.count()

189

In [17]:
covid_papers.filter("Title != '' and Abstract != ''"  ).count()


124

In [18]:
covid_papers_meta = covid_papers.join(meta_data, covid_papers['Paper_ID'] == meta_data['sha'], how='left_outer')\
            .select(covid_papers['Paper_ID'], covid_papers['title'], meta_data['publish_time'],\
                   meta_data['authors'], meta_data['journal'])

In [19]:
from pyspark.sql.functions import year, month, to_date
covid_papers_meta = covid_papers_meta.withColumn("publish_Year", year(to_date("publish_time")))

In [20]:
covid_papers_meta.show(5)

+--------------------+--------------------+------------+--------------------+--------------------+------------+
|            Paper_ID|               title|publish_time|             authors|             journal|publish_Year|
+--------------------+--------------------+------------+--------------------+--------------------+------------+
|ffdd84a06e470242c...|SEQUENCE O F MURI...|  2008-05-12|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|
|ffef8194e52de95fe...|Virology Journal ...|  2009-10-10|Ngandu, Nobubelo ...|             Virol J|        2009|
|ff6c61a7a6a4fd518...|Optimal COVID-19 ...|        null|                null|                null|        null|
|ff7d49ac4008f60ef...|                    |  2010-10-31|Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|
|fff69e4894df7b413...|INTERACTION BETWE...|        2006|Pöhlmann, Stefan;...|     The Nidoviruses|        2006|
+--------------------+--------------------+------------+--------------------+--------------------+------

In [21]:
#!pip3 install langdetect 


In [22]:
from langdetect import detect
def detect_lang(txt):
    try:
        return detect(txt)
    except:
        return None
udf_detect_lang = udf(detect_lang)

In [23]:
covid_papers_meta = covid_papers_meta.withColumn('Language', udf_detect_lang(covid_papers_meta['title']))

In [24]:
# we think there is a prob with this result due to choosing title instead of text itself from abstract or body

# if we have time we will get back to this 
covid_papers_meta.select("*").where("Language<>'en'").count()

9

In [25]:
covid_papers_meta.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+
|            Paper_ID|               title|        publish_time|             authors|             journal|publish_Year|Language|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------+
|ffdd84a06e470242c...|SEQUENCE O F MURI...|          2008-05-12|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|      en|
|ffef8194e52de95fe...|Virology Journal ...|          2009-10-10|Ngandu, Nobubelo ...|             Virol J|        2009|      en|
|ff6c61a7a6a4fd518...|Optimal COVID-19 ...|                null|                null|                null|        null|      es|
|ff7d49ac4008f60ef...|                    |          2010-10-31|Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|    null|
|fff69e4894df7b413...|INTERACTION BETWE...|                2006|Pöhlmann, Stefan;...|     The Nid

In [26]:
#Drop the unneeded columns
covid_papers_meta = covid_papers_meta.drop('publish_time', 'Language')
covid_papers_meta.show(2)

+--------------------+--------------------+--------------------+--------------------+------------+
|            Paper_ID|               title|             authors|             journal|publish_Year|
+--------------------+--------------------+--------------------+--------------------+------------+
|ffdd84a06e470242c...|SEQUENCE O F MURI...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|
|ffef8194e52de95fe...|Virology Journal ...|Ngandu, Nobubelo ...|             Virol J|        2009|
+--------------------+--------------------+--------------------+--------------------+------------+
only showing top 2 rows



In [27]:
print("Count of Null")
covid_papers_meta.select([count(when(isnan(column) | col(column).isNull(), column)).alias(column) for column in covid_papers_meta.columns]).show()
print("Count of Empty values")
covid_papers_meta.select([count(when(col(column) == "", column)).alias(column) for column in covid_papers_meta.columns]).show()

Count of Null
+--------+-----+-------+-------+------------+
|Paper_ID|title|authors|journal|publish_Year|
+--------+-----+-------+-------+------------+
|       0|    0|     47|     45|          43|
+--------+-----+-------+-------+------------+

Count of Empty values
+--------+-----+-------+-------+------------+
|Paper_ID|title|authors|journal|publish_Year|
+--------+-----+-------+-------+------------+
|       0|   27|      0|      0|           0|
+--------+-----+-------+-------+------------+



In [28]:
category_cols = [item[0] for item in covid_papers_meta.dtypes if item[1].startswith('string')] 
category_cols

['Paper_ID', 'title', 'authors', 'journal']

In [29]:

category_null_cols = [column for column in category_cols if covid_papers_meta.where(col(column).isNull()| col(column).isin('')).count() > 0]
category_null_cols

['title', 'authors', 'journal']

In [30]:
# filling nan values with empty string
for column in category_null_cols:
    covid_papers_meta = covid_papers_meta.na.fill(" ")

In [31]:
numeric_cols = [item[0] for item in covid_papers_meta.dtypes if item[1].startswith('int') | item[1].startswith('double')] 
numeric_cols

['publish_Year']

In [32]:
### Now let's find numerical columns with null values
num_null_cols = [column for column in numeric_cols if covid_papers_meta.filter(col(column).isNull() | col(column).eqNullSafe(0)).count() > 0]
num_null_cols

['publish_Year']

In [33]:
### Now let's fill with 0
for column in num_null_cols:
    covid_papers_meta = covid_papers_meta.na.fill(0)

In [36]:
covid_papers_meta.show(10)

+--------------------+--------------------+--------------------+--------------------+------------+
|            Paper_ID|               title|             authors|             journal|publish_Year|
+--------------------+--------------------+--------------------+--------------------+------------+
|ffdd84a06e470242c...|SEQUENCE O F MURI...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|
|ffef8194e52de95fe...|Virology Journal ...|Ngandu, Nobubelo ...|             Virol J|        2009|
|ff6c61a7a6a4fd518...|Optimal COVID-19 ...|                    |                    |           0|
|ff7d49ac4008f60ef...|                    |Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|
|fff69e4894df7b413...|INTERACTION BETWE...|Pöhlmann, Stefan;...|     The Nidoviruses|        2006|
|ff5939c10252c289d...|Adenovirus recept...|                    |                    |           0|
|ffbd7555a33770623...|Title: Estimation...| and far higher t...| it is likely tha...|           0|
|ff7f8110e

In [34]:
covid_papers_meta.dropDuplicates()

DataFrame[Paper_ID: string, title: string, authors: string, journal: string, publish_Year: int]

In [38]:
covid_papers_meta.count()

189

In [35]:
# Remove any special characters

covid_papers_meta = covid_papers_meta.withColumn("title", lower(col('title')))
covid_papers_meta = covid_papers_meta.withColumn("title", regexp_replace("title", "[^a-zA-Z\\s]" , " "))
covid_papers_meta = covid_papers_meta.withColumn("title", regexp_replace("title", " +" , " "))
covid_papers_meta = covid_papers_meta.withColumn("title", regexp_replace("title", "^ +" , ""))

In [40]:
covid_papers_meta.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+
|            Paper_ID|               title|             authors|             journal|publish_Year|
+--------------------+--------------------+--------------------+--------------------+------------+
|ffdd84a06e470242c...|sequence o f muri...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|
|ffef8194e52de95fe...|virology journal ...|Ngandu, Nobubelo ...|             Virol J|        2009|
|ff6c61a7a6a4fd518...|optimal covid epi...|                    |                    |           0|
|ff7d49ac4008f60ef...|                    |Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|
|fff69e4894df7b413...|interaction betwe...|Pöhlmann, Stefan;...|     The Nidoviruses|        2006|
+--------------------+--------------------+--------------------+--------------------+------------+
only showing top 5 rows



In [36]:
tokenizer = Tokenizer(inputCol='title', outputCol='title_token')
covid_papers_meta = tokenizer.transform(covid_papers_meta).select('*')

In [42]:
covid_papers_meta.select('title_token').show(5)

+--------------------+
|         title_token|
+--------------------+
|[sequence, o, f, ...|
|[virology, journa...|
|[optimal, covid, ...|
|                  []|
|[interaction, bet...|
+--------------------+
only showing top 5 rows



In [37]:
remove = StopWordsRemover(inputCol='title_token', outputCol='title_clean')
covid_papers_meta = remove.transform(covid_papers_meta).select('*')

In [44]:
covid_papers_meta.select('title_clean').show(5)

+--------------------+
|         title_clean|
+--------------------+
|[sequence, o, f, ...|
|[virology, journa...|
|[optimal, covid, ...|
|                  []|
|[interaction, spi...|
+--------------------+
only showing top 5 rows



In [38]:
remove2 = StopWordsRemover(inputCol='title_clean', outputCol='title_clean_', stopWords = [ 'doi', 'preprint', 'copyright', 'peer', 'reviewed', 'org',
 'https', 'et', 'al', 'author', 'figure','rights', 'reserved', 'permission', 'used', 'using',
 'biorxiv', 'medrxiv', 'license', 'fig', 'fig.', 'al.', 'Elsevier', 'PMC', 'CZI', 'www'])
covid_papers_meta = remove2.transform(covid_papers_meta).select('*')

In [46]:
covid_papers_meta.select( 'title_clean_').show(5)

+--------------------+
|        title_clean_|
+--------------------+
|[sequence, o, f, ...|
|[virology, journa...|
|[optimal, covid, ...|
|                  []|
|[interaction, spi...|
+--------------------+
only showing top 5 rows



In [47]:
# from pyspark.sql.functions import size
# covid_papers_meta = covid_papers_meta.withColumn("wordcount", size("words_clean_"))

In [39]:
covid_papers_meta.printSchema()

root
 |-- Paper_ID: string (nullable = false)
 |-- title: string (nullable = false)
 |-- authors: string (nullable = false)
 |-- journal: string (nullable = false)
 |-- publish_Year: integer (nullable = true)
 |-- title_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_clean_: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [40]:
covid_papers_meta = covid_papers_meta.drop('title_token')

In [50]:
covid_papers_meta.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+
|            Paper_ID|               title|             authors|             journal|publish_Year|         title_clean|        title_clean_|
+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+
|ffdd84a06e470242c...|sequence o f muri...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[sequence, o, f, ...|[sequence, o, f, ...|
|ffef8194e52de95fe...|virology journal ...|Ngandu, Nobubelo ...|             Virol J|        2009|[virology, journa...|[virology, journa...|
|ff6c61a7a6a4fd518...|optimal covid epi...|                    |                    |           0|[optimal, covid, ...|[optimal, covid, ...|
|ff7d49ac4008f60ef...|                    |Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|                  []|                  []|
|fff69e4894df

In [41]:
covid_papers_meta.repartition(12).write.parquet("./covid_papers_meta_processed")


In [4]:
covid_papers_meta = spark.read\
           .format("parquet")\
           .option("header", "true")\
           .load("./covid_papers_meta_processed")

In [52]:

# # Learn a mapping from words to Vectors.
# word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="words_clean_custom", outputCol="word2vec_body")
# model = word2Vec.fit(papers_meta)

# papers_meta = model.transform(papers_meta)

In [5]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="title_clean_", outputCol="titl_eword2vec")
model = word2Vec.fit(covid_papers_meta)

covid_papers_meta = model.transform(covid_papers_meta)

In [54]:
covid_papers_meta.show(5)

+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+
|            Paper_ID|               title|             authors|             journal|publish_Year|         title_clean|        title_clean_|      titl_eword2vec|
+--------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+
|ffdd84a06e470242c...|sequence o f muri...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[sequence, o, f, ...|[sequence, o, f, ...|[-8.6179045452312...|
|ffef8194e52de95fe...|virology journal ...|Ngandu, Nobubelo ...|             Virol J|        2009|[virology, journa...|[virology, journa...|[-0.0014203501850...|
|ff6c61a7a6a4fd518...|optimal covid epi...|                    |                    |           0|[optimal, covid, ...|[optimal, covid, ...|[-4.2925175512209...|
|ff7d49ac4008f60ef...|      

In [6]:
covid_papers_meta.repartition(12).write.parquet("./Dataset/covid_papers_meta_word2vec")


In [8]:
covid_papers_meta = spark.read\
            .format("parquet")\
            .option("header", "true")\
            .load("./Dataset/covid_papers_meta_word2vec")

In [56]:
covid_papers_meta.printSchema()

root
 |-- Paper_ID: string (nullable = false)
 |-- title: string (nullable = false)
 |-- authors: string (nullable = false)
 |-- journal: string (nullable = false)
 |-- publish_Year: integer (nullable = true)
 |-- title_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_clean_: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- titl_eword2vec: vector (nullable = true)



In [9]:
#Select only the needed columns
covid_papers_meta = covid_papers_meta.select('Paper_ID', 'authors', 'journal', 'publish_Year', 'titl_eword2vec')


In [59]:
covid_papers_meta.show(5)


+--------------------+--------------------+--------------------+------------+--------------------+
|            Paper_ID|             authors|             journal|publish_Year|      titl_eword2vec|
+--------------------+--------------------+--------------------+------------+--------------------+
|ffdd84a06e470242c...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[-8.6179045452312...|
|ffef8194e52de95fe...|Ngandu, Nobubelo ...|             Virol J|        2009|[-0.0014203501850...|
|ff6c61a7a6a4fd518...|                    |                    |           0|[-4.2925175512209...|
|ff7d49ac4008f60ef...|Cook, Alex R.; Ch...|    Emerg Infect Dis|        2010|[-0.0038492346648...|
|fff69e4894df7b413...|Pöhlmann, Stefan;...|     The Nidoviruses|        2006|[1.02162403183885...|
+--------------------+--------------------+--------------------+------------+--------------------+
only showing top 5 rows



In [10]:
categorical_cols = [item[0] for item in covid_papers_meta.dtypes if item[1].startswith('string')][1:]
categorical_cols

['authors', 'journal']

In [11]:
indexers = [StringIndexer(
    inputCol=column, 
    outputCol=column + '_index', 
    handleInvalid='keep') for column in categorical_cols]

In [12]:
encoders = [OneHotEncoder(
    inputCol=column + '_index', 
    outputCol= column + '_encoded') for column in categorical_cols]

In [13]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + encoders)

In [14]:
covid_papers_meta_transformed = pipeline.fit(covid_papers_meta).transform(covid_papers_meta)


In [64]:
covid_papers_meta_transformed.show(5)

+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+
|            Paper_ID|             authors|             journal|publish_Year|      titl_eword2vec|authors_index|journal_index|  authors_encoded| journal_encoded|
+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+
|ffdd84a06e470242c...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[-8.6179045452312...|          4.0|         15.0|  (143,[4],[1.0])|(123,[15],[1.0])|
|ffef8194e52de95fe...|Ngandu, Nobubelo ...|             Virol J|        2009|[-0.0014203501850...|         29.0|          8.0| (143,[29],[1.0])| (123,[8],[1.0])|
|ff6c61a7a6a4fd518...|                    |                    |           0|[-4.2925175512209...|          0.0|          0.0|  (143,[0],[1.0])| (123,[0],[1.0])|
|ff7d49ac4008f60ef...|Cook, 

In [15]:
requiredFeatures = ['publish_Year','titl_eword2vec','authors_encoded','journal_encoded']

In [16]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=requiredFeatures, outputCol='features')

In [17]:
covid_papers_meta_transformed = assembler.transform(covid_papers_meta_transformed)


In [67]:
covid_papers_meta_transformed.show(5)

+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+
|            Paper_ID|             authors|             journal|publish_Year|      titl_eword2vec|authors_index|journal_index|  authors_encoded| journal_encoded|            features|
+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+
|ffdd84a06e470242c...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[-8.6179045452312...|          4.0|         15.0|  (143,[4],[1.0])|(123,[15],[1.0])|(367,[0,1,2,3,4,5...|
|ffef8194e52de95fe...|Ngandu, Nobubelo ...|             Virol J|        2009|[-0.0014203501850...|         29.0|          8.0| (143,[29],[1.0])| (123,[8],[1.0])|(367,[0,1,2,3,4,5...|
|ff6c61a7a6a4fd518...|                    |                    |           0|[-4.2925

In [18]:
covid_papers_meta_transformed.select('features').take(1)

[Row(features=SparseVector(367, {0: 2016.0, 1: 0.0002, 2: -0.0004, 3: 0.0002, 4: 0.0008, 5: 0.0011, 6: 0.0006, 7: -0.0007, 8: 0.0002, 9: 0.0006, 10: -0.0009, 11: 0.0005, 12: -0.0001, 13: 0.0012, 14: 0.0025, 15: 0.0012, 16: -0.0003, 17: -0.0001, 18: -0.0001, 19: 0.0006, 20: 0.0009, 21: -0.0008, 22: -0.0006, 23: -0.0019, 24: 0.0002, 25: -0.0, 26: 0.0011, 27: 0.002, 28: -0.0017, 29: 0.0002, 30: -0.001, 31: -0.0001, 32: 0.0017, 33: 0.0013, 34: 0.0002, 35: -0.0005, 36: -0.0001, 37: 0.0003, 38: 0.0003, 39: 0.0004, 40: -0.0001, 41: 0.0002, 42: -0.0007, 43: -0.0004, 44: 0.0004, 45: 0.0002, 46: 0.0001, 47: 0.0005, 48: 0.0001, 49: -0.0012, 50: -0.001, 51: -0.0009, 52: -0.0002, 53: -0.0007, 54: 0.0013, 55: -0.0003, 56: 0.0007, 57: 0.0001, 58: 0.0011, 59: 0.0006, 60: 0.0004, 61: 0.0001, 62: -0.0009, 63: 0.0, 64: 0.0008, 65: 0.0012, 66: 0.0, 67: -0.0003, 68: -0.0012, 69: 0.001, 70: 0.0, 71: -0.0027, 72: 0.0007, 73: 0.0015, 74: 0.0003, 75: -0.0003, 76: -0.0006, 77: 0.0013, 78: -0.0012, 79: 0.0004, 8

In [70]:
covid_papers_meta_transformed.show(5)

+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+
|            Paper_ID|             authors|             journal|publish_Year|      titl_eword2vec|authors_index|journal_index|  authors_encoded| journal_encoded|            features|
+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+
|ffdd84a06e470242c...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[-8.6179045452312...|          4.0|         15.0|  (143,[4],[1.0])|(123,[15],[1.0])|(367,[0,1,2,3,4,5...|
|ffef8194e52de95fe...|Ngandu, Nobubelo ...|             Virol J|        2009|[-0.0014203501850...|         29.0|          8.0| (143,[29],[1.0])| (123,[8],[1.0])|(367,[0,1,2,3,4,5...|
|ff6c61a7a6a4fd518...|                    |                    |           0|[-4.2925

In [19]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

pca = PCA(k=2, inputCol="features", outputCol="features_pca")
model = pca.fit(covid_papers_meta_transformed)
covid_papers_meta_transformed = model.transform(covid_papers_meta_transformed)

In [20]:
model.explainedVariance

DenseVector([1.0, 0.0])

In [74]:
covid_papers_meta_transformed.show(5)

+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+--------------------+
|            Paper_ID|             authors|             journal|publish_Year|      titl_eword2vec|authors_index|journal_index|  authors_encoded| journal_encoded|            features|        features_pca|
+--------------------+--------------------+--------------------+------------+--------------------+-------------+-------------+-----------------+----------------+--------------------+--------------------+
|ffdd84a06e470242c...|KOGA, M.; WEGE, H...|Neuropathol Appl ...|        2008|[-8.6179045452312...|          4.0|         15.0|  (143,[4],[1.0])|(123,[15],[1.0])|(367,[0,1,2,3,4,5...|[-2007.9995828662...|
|ffef8194e52de95fe...|Ngandu, Nobubelo ...|             Virol J|        2009|[-0.0014203501850...|         29.0|          8.0| (143,[29],[1.0])| (123,[8],[1.0])|(367,[0,1,2,3,4,5...|[-

In [21]:
covid_papers_meta_transformed.repartition(12).write.parquet("./Dataset/covid_papers_meta_transformed")

In [None]:
# papers_meta_transformed.repartition(12,'features').write.parquet("./Dataset/papersmeta_transformed")

In [22]:
covid_papers_meta_transformed = spark.read\
            .format("parquet")\
            .option("header", "true")\
            .load("./Dataset/covid_papers_meta_transformed")

In [23]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Calculate cost and plot
import numpy as np
import pandas as pd 

cost = np.zeros(15)
silhouette = np.zeros(15)

for k in range(2,15):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol('features_pca')
    model = kmeans.fit(covid_papers_meta_transformed)
    cost[k] = model.computeCost(covid_papers_meta_transformed)
    clusterdData = model.transform(covid_papers_meta_transformed)
    evaluator = ClusteringEvaluator()
    silhouette[k] = evaluator.evaluate(clusterdData)

# Plot the cost
df_eval = pd.DataFrame(np.array([cost[2:].tolist(),silhouette[2:].tolist()])).transpose()
df_eval.columns = ["cost", "silhouette_score"]
new_col = [2,3,4,5,6,7,8,9,10,11,12,13,14]
df_eval.insert(0, 'cluster', new_col)

In [24]:
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_style("whitegrid")
fig = plt.figure(figsize=(40,10))
ax1 = fig.add_subplot(1, 2, 1)
ax1.set(xticks=range(2,15))
ax2 = fig.add_subplot(1, 2, 2)
ax2.set(xticks=range(2,15))
sns.lineplot(x='cluster', y='cost', data=df_eval, ax=ax1)
sns.lineplot(x='cluster', y='silhouette_score', data=df_eval, ax=ax2)

<matplotlib.axes._subplots.AxesSubplot at 0x7f370d0a52e8>

In [25]:
df_eval

Unnamed: 0,cluster,cost,silhouette_score
0,2,8224.246654,0.999977
1,3,3294.11478,0.810664
2,4,1387.607632,0.788133
3,5,709.31228,0.776285
4,6,461.45852,0.72428
5,7,394.344251,0.716439
6,8,379.412789,0.709574
7,9,216.786511,0.688536
8,10,201.855049,0.682481
9,11,154.502926,0.634406


In [8]:
kmeans = KMeans().setK(4).setFeaturesCol('features_pca')
model = kmeans.fit(covid_papers_meta_transformed)
clusterdData = model.transform(covid_papers_meta_transformed)

In [9]:
paper_titles_df = spark.read\
            .format("parquet")\
            .option("header", "true")\
            .load("./Dataset/covid_papers_meta_word2vec").select('Paper_ID', 'title', 'title_clean_')
clusterdData = paper_titles_df.join(clusterdData, on=['Paper_ID'], how='inner')
clusterdData.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------------+-------------+--------------------+--------------------+-------------+-------------+----------------+----------------+--------------------+--------------------+----------+
|            Paper_ID|               title|         title_clean|             authors|             journal|wordcount|publish_Year|publish_Month|       word2vec_body|      word2vec_title|authors_index|journal_index| authors_encoded| journal_encoded|            features|        features_pca|prediction|
+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------------+-------------+--------------------+--------------------+-------------+-------------+----------------+----------------+--------------------+--------------------+----------+
|ff56926b9a45fe14f...|intracellular syn...|[intracellular, s...|Dea, S.; Garzon, ...|          Ar

In [10]:
from pyspark.sql.functions import udf, col
df_recommender = clusterdData.select('title', 'title_clean', col('prediction').alias('cluster'))
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="title_clean", outputCol="tf")
tf = hashingTF.transform(df_recommender)

idf = IDF(inputCol="tf", outputCol="idf_feature").fit(tf)
tfidf = idf.transform(tf)

In [11]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="idf_feature", outputCol="norm")
data = normalizer.transform(tfidf)

In [12]:
data.take(1)

[Row(title='effect of porcine circovirus type a or b on infection kinetics and pathogenicity of two genetically divergent strains of porcine reproductive and respiratory syndrome virus in the conventional pig model', title_clean=['effect', 'porcine', 'circovirus', 'type', 'b', 'infection', 'kinetics', 'pathogenicity', 'two', 'genetically', 'divergent', 'strains', 'porcine', 'reproductive', 'respiratory', 'syndrome', 'virus', 'conventional', 'pig', 'model'], cluster=2, tf=SparseVector(262144, {3435: 1.0, 15664: 1.0, 22808: 1.0, 30913: 1.0, 60319: 1.0, 72317: 1.0, 86845: 1.0, 90586: 1.0, 92225: 1.0, 129645: 1.0, 135838: 1.0, 167455: 1.0, 177959: 1.0, 180249: 1.0, 185155: 1.0, 215686: 1.0, 218844: 1.0, 248011: 2.0, 260027: 1.0}), idf_feature=SparseVector(262144, {3435: 4.5539, 15664: 4.1484, 22808: 4.5539, 30913: 3.4553, 60319: 2.539, 72317: 4.1484, 86845: 4.1484, 90586: 2.3026, 92225: 3.3011, 129645: 3.0498, 135838: 4.5539, 167455: 3.6376, 177959: 4.1484, 180249: 3.6376, 185155: 2.069, 2

In [13]:
from pyspark.sql.types import DoubleType

dot_udf = udf(lambda x,y: float(x.dot(y)), DoubleType())

def recommendPaper(paper_title,N, data=data):
    target_paper = data.filter(data['title'] == paper_title)
    input_cluster = target_paper.select('cluster').collect()[0].cluster
    data = data.filter(data['cluster'] == input_cluster)
    recommendations = target_paper.alias("tearget_paper").crossJoin(data.alias("right"))\
        .select(col("tearget_paper.title").alias("target_title"), 
            col("right.title").alias("recommended_title"), 
            dot_udf("tearget_paper.norm", "right.norm").alias("dot"))\
        .sort(col("dot").desc())\
        .limit(N+1)
    return {reccomendation.recommended_title:reccomendation.dot for reccomendation in recommendations.collect()[1:]}

In [14]:
recommendPaper('sequence o f murine coronavirus jhm induced neuropathological changes i n rats',2)


{'interleukin induction in vitro in mouse brain endothelial cells and astrocytes by exposure to mouse hepatitis virus mhv jhm ': 0.07681728881364465,
 'novel coronavirus epidemic': 0.09311463265696489}