# Exploratory Analysis -- Metadata


#### Imports


In [174]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext

import time 
from time import sleep

sc = SparkContext()
sc

In [175]:
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()

In [212]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,ArrayType

### Read in metadata 

In [177]:
data = spark.read.csv('s3://ai2-semanticscholar-cord-19/2021-04-19/metadata.csv/',header = True)

In [178]:
data.cache()

DataFrame[cord_uid: string, sha: string, source_x: string, title: string, doi: string, pmcid: string, pubmed_id: string, license: string, abstract: string, publish_time: string, authors: string, journal: string, mag_id: string, who_covidence_id: string, arxiv_id: string, pdf_json_files: string, pmc_json_files: string, url: string, s2_id: string]

In [179]:
data.columns

['cord_uid',
 'sha',
 'source_x',
 'title',
 'doi',
 'pmcid',
 'pubmed_id',
 'license',
 'abstract',
 'publish_time',
 'authors',
 'journal',
 'mag_id',
 'who_covidence_id',
 'arxiv_id',
 'pdf_json_files',
 'pmc_json_files',
 'url',
 's2_id']

In [180]:
data[['cord_uid','title','abstract','authors','journal']].show()

+--------+--------------------+--------------------+--------------------+--------------------+
|cord_uid|               title|            abstract|             authors|             journal|
+--------+--------------------+--------------------+--------------------+--------------------+
|ug7v899j|Clinical features...|OBJECTIVE: This r...|Madani, Tariq A; ...|      BMC Infect Dis|
|02tnwd4m|Nitric oxide: a p...|Inflammatory dise...|Vliet, Albert van...|          Respir Res|
|ejv2xln0|Surfactant protei...|Surfactant protei...|     Crouch, Erika C|          Respir Res|
|2b73a28n|Role of endotheli...|Endothelin-1 (ET-...|Fagan, Karen A; M...|          Respir Res|
|9785vg6d|Gene expression i...|Respiratory syncy...|Domachowske, Jose...|          Respir Res|
|zjufx4fo|Sequence requirem...|Nidovirus subgeno...|Pasternak, Alexan...|    The EMBO Journal|
|5yhe786e|Debate: Transfusi...|Recent evidence s...|Alvarez, Gonzalo;...|           Crit Care|
|8zchiykl|The 21st Internat...|The 21st Internat..

### Explore Metadata

##### Number of documents

In [181]:
data.count()

536817

##### Number of unique cord_uids

In [182]:
data[['cord_uid']].distinct().count()

508313

#### Number of unique Journals 


In [183]:
data[['journal']].distinct().count()

45848

#### Number of Abtracts per Journal

In [184]:
data[['journal']].groupBy("journal").count().orderBy(F.col("count").desc()).show()

+--------------------+-----+
|             journal|count|
+--------------------+-----+
|                null|33977|
|             bioRxiv| 5070|
|            PLoS One| 4854|
|                 BMJ| 4583|
|             Sci Rep| 2457|
|Int J Environ Res...| 2290|
|              Nature| 2205|
|              Lancet| 2187|
| Journal of virology| 1869|
|                JAMA| 1744|
|    Int J Infect Dis| 1611|
|              Cureus| 1590|
|             Viruses| 1518|
|             Science| 1517|
|         J Med Virol| 1295|
|    Emerg Infect Dis| 1286|
|       Front Immunol| 1278|
|     Clin Infect Dis| 1245|
|Int. j. environ. ...| 1239|
|               Chest| 1120|
+--------------------+-----+
only showing top 20 rows



##### Number of unique authors 

In [185]:
## Total rows vs dropping rows with missing authors
data.count(),data.na.drop(subset='authors').count()

(536817, 522012)

In [189]:
counter = (data.na.drop(subset='authors')
           .select("authors")
           .rdd
           # join all strings in the list and then split to get each word
           .map(lambda x: " ".join(x).split(";")) 
           #.map(lambda x: x.split(";")) 
           .flatMap(lambda x: x)
           # make a tuple for each word so later it can be grouped by to get its frequency count
           .map(lambda x: (x, 1))
           .reduceByKey(lambda a,b: a+b)
           
           #collectAsMap()
          )

In [190]:
spark.createDataFrame(counter).toDF("authors", "count").orderBy(F.col('count').desc()).show()

+-------------------+-----+
|            authors|count|
+-------------------+-----+
|         Anonymous,| 1621|
|              O039,| 1232|
|              D039,|  918|
|               2020|  832|
|          Wang, Wei|  458|
|  Mahase, Elisabeth|  415|
|    Yuen, Kwok-Yung|  371|
|         Zhang, Wei|  359|
|            Li, Yan|  343|
|              O039,|  336|
|  Wiwanitkit, Viroj|  314|
|  Iacobucci, Gareth|  309|
| Drosten, Christian|  296|
|            Li, Wei|  275|
|         Wang, Jing|  264|
|        Rimmer, Abi|  258|
|          Liu, Yang|  249|
|           Liu, Wei|  245|
|         Wang, Ying|  239|
|             Li, Li|  237|
+-------------------+-----+
only showing top 20 rows



In [191]:
data.filter(data.authors.contains("Li, Li;"))[['authors']].count()

239

In [192]:
data.filter(data.authors.contains("Wang, Ying;"))[['authors']].count()

258

In [193]:
data.filter(data.authors.contains("2020"))[['authors']].show()

+--------------------+
|             authors|
+--------------------+
|          2020-02-20|
|          2020-05-15|
|          2020-06-10|
|          2020-06-19|
|          2020-06-30|
|          2020-07-21|
|          2020-09-10|
|          2020-09-24|
| 2020: e1-e3. doi...|
|          2020-11-13|
|          2020-04-06|
|          2020-06-09|
|          2020-12-23|
|              2020."|
|          2020-09-25|
|          2020-07-03|
|          2020-10-02|
|          2020-07-01|
|          2020-09-22|
|          2020-07-01|
+--------------------+
only showing top 20 rows



In [None]:
#### create edgelist form authors 
# ['Source','Target','cord_id','year','journal']


In [220]:
def split_authors(df_authors):
    split_authors_lambda = lambda x: x.split(';')
    try:
        if df_authors== None:
            print('here')
            return None
        res = split_authors_lambda(df_authors)
        return res
    except:
        return None
    #df_authors.apply(lambda x: x.split(';'))

In [221]:
split_authors_udf = F.udf(split_authors, ArrayType(StringType(),False))


In [222]:
data.na.drop(subset='authors').withColumn("authors_split", split_authors_udf(data['authors']))[
    ['authors','authors_split']].show()

+--------------------+--------------------+
|             authors|       authors_split|
+--------------------+--------------------+
|Madani, Tariq A; ...|[Madani, Tariq A,...|
|Vliet, Albert van...|[Vliet, Albert va...|
|     Crouch, Erika C|   [Crouch, Erika C]|
|Fagan, Karen A; M...|[Fagan, Karen A, ...|
|Domachowske, Jose...|[Domachowske, Jos...|
|Pasternak, Alexan...|[Pasternak, Alexa...|
|Alvarez, Gonzalo;...|[Alvarez, Gonzalo...|
|Ball, Jonathan; V...|[Ball, Jonathan, ...|
|Slebos, Dirk-Jan;...|[Slebos, Dirk-Jan...|
|Tsui, Fu-Chiang; ...|[Tsui, Fu-Chiang,...|
|Ivanov, Ivaylo P....|[Ivanov, Ivaylo P...|
|Shi, Stephanie T....|[Shi, Stephanie T...|
|Pridgeon, Julia W...|[Pridgeon, Julia ...|
|Ploubidou, Aspasi...|[Ploubidou, Aspas...|
|       Barry, John M|     [Barry, John M]|
|Shieh, Biehuoy; L...|[Shieh, Biehuoy, ...|
|Verheij, Joanne; ...|[Verheij, Joanne,...|
|Porco, Travis C; ...|[Porco, Travis C,...|
|Kremer, Ted M; Ri...|[Kremer, Ted M,  ...|
|Bratlie, Marit S;...|[Bratlie, 

In [232]:
data_dropna_split = data.na.drop(subset='authors').withColumn("authors_split", split_authors_udf(data['authors']))

In [233]:
data_dropna_split[['authors_split']].take(10)

[Row(authors_split=['Madani, Tariq A', ' Al-Ghamdi, Aisha A']),
 Row(authors_split=['Vliet, Albert van der', ' Eiserich, Jason P', ' Cross, Carroll E']),
 Row(authors_split=['Crouch, Erika C']),
 Row(authors_split=['Fagan, Karen A', ' McMurtry, Ivan F', ' Rodman, David M']),
 Row(authors_split=['Domachowske, Joseph B', ' Bonville, Cynthia A', ' Rosenberg, Helene F']),
 Row(authors_split=['Pasternak, Alexander O.', ' van den Born, Erwin', ' Spaan, Willy J.M.', ' Snijder, Eric J.']),
 Row(authors_split=['Alvarez, Gonzalo', ' Hébert, Paul C', ' Szick, Sharyn']),
 Row(authors_split=['Ball, Jonathan', ' Venn, Richard']),
 Row(authors_split=['Slebos, Dirk-Jan', ' Ryter, Stefan W', ' Choi, Augustine MK']),
 Row(authors_split=['Tsui, Fu-Chiang', ' Espino, Jeremy U.', ' Dato, Virginia M.', ' Gesteland, Per H.', ' Hutman, Judith', ' Wagner, Michael M.'])]

In [234]:
data_dropna_split.cache()

DataFrame[cord_uid: string, sha: string, source_x: string, title: string, doi: string, pmcid: string, pubmed_id: string, license: string, abstract: string, publish_time: string, authors: string, journal: string, mag_id: string, who_covidence_id: string, arxiv_id: string, pdf_json_files: string, pmc_json_files: string, url: string, s2_id: string, authors_split: array<string>]

In [261]:
import itertools

combinations_udf = F.udf(
    lambda x: list(itertools.combinations(x, 2)),
    "array<struct<item1:string,item2:string>>"
)

edgelist = data_dropna_split.withColumn("edgelist", 
                                        F.explode(combinations_udf(F.col("authors_split")))) \
    .selectExpr("*") #.selectExpr('edgelist.*') 

edgelist[['edgelist.*','cord_uid','journal']].show()

+--------------------+--------------------+--------+----------------+
|               item1|               item2|cord_uid|         journal|
+--------------------+--------------------+--------+----------------+
|     Madani, Tariq A|  Al-Ghamdi, Aisha A|ug7v899j|  BMC Infect Dis|
|Vliet, Albert van...|   Eiserich, Jason P|02tnwd4m|      Respir Res|
|Vliet, Albert van...|    Cross, Carroll E|02tnwd4m|      Respir Res|
|   Eiserich, Jason P|    Cross, Carroll E|02tnwd4m|      Respir Res|
|      Fagan, Karen A|    McMurtry, Ivan F|2b73a28n|      Respir Res|
|      Fagan, Karen A|     Rodman, David M|2b73a28n|      Respir Res|
|    McMurtry, Ivan F|     Rodman, David M|2b73a28n|      Respir Res|
|Domachowske, Jose...| Bonville, Cynthia A|9785vg6d|      Respir Res|
|Domachowske, Jose...| Rosenberg, Helene F|9785vg6d|      Respir Res|
| Bonville, Cynthia A| Rosenberg, Helene F|9785vg6d|      Respir Res|
|Pasternak, Alexan...| van den Born, Erwin|zjufx4fo|The EMBO Journal|
|Pasternak, Alexan..

In [247]:
edgelist[['authors_split','edgelist','cord_uid']].take(10)

[Row(authors_split=['Madani, Tariq A', ' Al-Ghamdi, Aisha A'], edgelist=Row(item1='Madani, Tariq A', item2=' Al-Ghamdi, Aisha A'), cord_uid='ug7v899j'),
 Row(authors_split=['Vliet, Albert van der', ' Eiserich, Jason P', ' Cross, Carroll E'], edgelist=Row(item1='Vliet, Albert van der', item2=' Eiserich, Jason P'), cord_uid='02tnwd4m'),
 Row(authors_split=['Vliet, Albert van der', ' Eiserich, Jason P', ' Cross, Carroll E'], edgelist=Row(item1='Vliet, Albert van der', item2=' Cross, Carroll E'), cord_uid='02tnwd4m'),
 Row(authors_split=['Vliet, Albert van der', ' Eiserich, Jason P', ' Cross, Carroll E'], edgelist=Row(item1=' Eiserich, Jason P', item2=' Cross, Carroll E'), cord_uid='02tnwd4m'),
 Row(authors_split=['Fagan, Karen A', ' McMurtry, Ivan F', ' Rodman, David M'], edgelist=Row(item1='Fagan, Karen A', item2=' McMurtry, Ivan F'), cord_uid='2b73a28n'),
 Row(authors_split=['Fagan, Karen A', ' McMurtry, Ivan F', ' Rodman, David M'], edgelist=Row(item1='Fagan, Karen A', item2=' Rodman, 

In [172]:
sc.stop()

In [173]:
spark.stop()