<a href="https://colab.research.google.com/github/MizanMustakim/big_data_processing_final_project/blob/Mizan/DataAcquisition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!pip install pyspark
!pip install sparknlp

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=42e6f37dfbe88b6af5f952889472312b3316e22556d2b92dea033b7804cfe60b
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [5]:
import sparknlp
from pyspark.sql import SparkSession
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.sql.functions import json_tuple
from pyspark.sql.types import MapType,StringType,ArrayType,StructType,StructField
from pyspark.sql.functions import *

In [6]:
# Checking the versions of pyspark and sparknlp

spark = SparkSession.builder \
    .master("local") \
    .appName("Proj") \
    .config("spark.sql.warehouse.dir")\
    .getOrCreate()
print("Spark NLP version: {}".format(sparknlp.version()))
print("Apache Spark version: {}".format(spark.version))

Spark NLP version: 3.4.4
Apache Spark version: 3.2.1


In [7]:
# load the files to a data frame
# select only the fearures needed:
# paper_id, metadata
df = spark.read.option("multiline","true")\
          .json("/content/drive/MyDrive/pdf_json")\
          .select('paper_id','metadata')

In [8]:
# the data frame's schema
df.printSchema()

root
 |-- paper_id: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- authors: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- affiliation: struct (nullable = true)
 |    |    |    |    |-- institution: string (nullable = true)
 |    |    |    |    |-- laboratory: string (nullable = true)
 |    |    |    |    |-- location: struct (nullable = true)
 |    |    |    |    |    |-- addrLine: string (nullable = true)
 |    |    |    |    |    |-- country: string (nullable = true)
 |    |    |    |    |    |-- postBox: string (nullable = true)
 |    |    |    |    |    |-- postCode: string (nullable = true)
 |    |    |    |    |    |-- region: string (nullable = true)
 |    |    |    |    |    |-- settlement: string (nullable = true)
 |    |    |    |-- email: string (nullable = true)
 |    |    |    |-- first: string (nullable = true)
 |    |    |    |-- last: string (nullable = true)
 |    |    |    |-- middle: array

In [9]:
# brief preview of the original data frame data before flattening
df.select('paper_id','metadata.*').show()

+--------------------+--------------------+--------------------+
|            paper_id|             authors|               title|
+--------------------+--------------------+--------------------+
|dc0795b37b5378b1c...|[{{, Hematopoieti...|ABSTRACTS FROM TH...|
|de6018b3043cdccd4...|[{{Stanford Unive...|Development/Novel...|
|dc2f210539245c7a0...|[{{null, null, nu...|OR-01. Distinct R...|
|2c4cd65b373b8524d...|                  []|2 Lexikalischer H...|
|2b1cbb43a4f06e232...|[{{null, null, nu...|Level 3 guideline...|
|1faae7ce711b362a2...|                  []|                    |
|dc91dd2b0be757c23...|                  []|                    |
|2bfe70ed52c64f411...|[{{University of ...|PA03 -Pattern Rec...|
|dc6e18ee4dbe5ac14...|[{{null, null, nu...|SCIENTIFIC REPORT...|
|2a37e35929251d28d...|                  []|Protocol Protocol...|
|2bf80b20f4f497112...|[{{null, null, nu...|Oral Presentation...|
|dcedc8ca7b10557df...|                  []|                    |
|2cf6a96f9b5336b13...|[{{

In [10]:
# extract the elements of the authors from array to a single element entries
df2=df.select('paper_id',col('metadata.title').alias('title'),\
              explode('metadata.authors').alias('author'))

In [11]:
# the new data frame schema does not have an array field
df2.printSchema()

root
 |-- paper_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author: struct (nullable = true)
 |    |-- affiliation: struct (nullable = true)
 |    |    |-- institution: string (nullable = true)
 |    |    |-- laboratory: string (nullable = true)
 |    |    |-- location: struct (nullable = true)
 |    |    |    |-- addrLine: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- postBox: string (nullable = true)
 |    |    |    |-- postCode: string (nullable = true)
 |    |    |    |-- region: string (nullable = true)
 |    |    |    |-- settlement: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- last: string (nullable = true)
 |    |-- middle: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- suffix: string (nullable = true)



In [12]:
# flatten the data frame
# drop the original rows and a few useless features 
final_df=df2.select('*','author.*')\
.select('*','affiliation.*')\
.select('*','location.*')\
.drop('affiliation').drop('location')\
.drop('postBox').drop('author')\
.drop('middle').drop('suffix')\

In [13]:
# concat the first and last name to a single feature
final_df=final_df.select('*',concat(col('first'),lit(' '),col('last')).alias('name'))\
       .drop('first').drop('last')

In [14]:
# replace the empty fields with null
final_df=final_df.na.replace('',None)

In [15]:
# remove the dublicate rows
final_df=final_df.distinct()

In [16]:
#sort by paper_id
final_df=final_df.sort('paper_id')

In [17]:
# combine rows, which are not identical, but has the same
# paper_id and name and are missing data
final_df = final_df.groupBy("paper_id", "name")\
        .agg(last('email',True).alias('email'),
            last('title',True).alias( 'title'),
            last('institution',True).alias( 'institution'),
            last('laboratory',True).alias( 'laboratory'),
            last('addrLine',True).alias( 'addrLine'),
            last('country',True).alias( 'country'),
            last('postCode',True).alias( 'postCode'),
            last('region',True).alias( 'region'),
            last('settlement',True).alias( 'settlement'),
            )

In [18]:
# the final data frame schema
final_df.printSchema()

root
 |-- paper_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- title: string (nullable = true)
 |-- institution: string (nullable = true)
 |-- laboratory: string (nullable = true)
 |-- addrLine: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postCode: string (nullable = true)
 |-- region: string (nullable = true)
 |-- settlement: string (nullable = true)



In [19]:
# data frame data preview
final_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------+--------------------+
|            paper_id|                name|               email|               title|         institution|          laboratory|            addrLine|    country|postCode|  region|          settlement|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------+--------+--------------------+
|0000b6da665726420...|             Alex Ba|                null|The cell phone vi...|Uniformed Service...|                null|                null|        USA|    null|Maryland|            Bethesda|
|0000b6da665726420...|    Anthony Tolisano|anthony.m.tolisan...|The cell phone vi...|Walter Reed Natio...|                null|                null|        USA|    null|Maryland|            Bethesda|


In [20]:
# number of people researching a paper by institution 
institutions = final_df.groupBy(['paper_id','institution']).count()\
                  .na.drop(how='any')\
                  .sort(desc('count'))

In [21]:
institutions.show()

+--------------------+--------------------+-----+
|            paper_id|         institution|count|
+--------------------+--------------------+-----+
|2b06dda25000e6085...|I, Calvo AO, Gall...|  129|
|db0c0308a7405e6ba...|Orange County Hea...|  104|
|db6db211b8abddafe...|Northern Illinois...|   77|
|002b2e094126e78ea...|Harvard Medical S...|   64|
|db1d31b294ab30c2e...|Zentrum für Rhino...|   64|
|de41dc462ed4d3576...|Sechenov First Mo...|   51|
|2a155491b04182a59...|Imperial College ...|   48|
|dd863cabb9e0edd30...|Hospital Sírio Li...|   46|
|2cb5497c065e3c1ef...|Leiden University...|   46|
|2d09cf04f003ed93e...|Aristotle Univers...|   41|
|2bfe70ed52c64f411...|University of Not...|   41|
|db3dbfeec25904ea1...|Duke University S...|   39|
|2cf60380bb05becd1...|Virginia Polytech...|   37|
|2ec268221c21259c0...|OpenSAFELY Collab...|   37|
|1e04601589c695728...|University of Pen...|   36|
|dc6a1bf7e383925a8...|    Emory University|   36|
|2da484bab5fea1c1e...|Azienda Ospedalie...|   32|


In [22]:
# The number of total rows of this dataframe
institutions.count()

21302

In [23]:
# put the df into pandas and export it into a single file for our purpoces
institutions.toPandas().to_csv('/content/drive/MyDrive/processed_data/institutions.csv',index=False)

In [24]:
# map all the researchers participated  in covid research
# and the amount of papers they took part of
researchers =final_df.groupBy(['name']).count()\
                  .na.drop(how='any')\
                  .sort(desc('count'))

In [25]:
# Showing the preview of the researchers dataframe
researchers.show()

+------+-----+
|  name|count|
+------+-----+
|    † |  305|
|    M |  155|
|    J |   86|
| Wang |   74|
|    A |   72|
|    Y |   68|
|Zhang |   61|
|    D |   58|
| Chen |   51|
|    W |   47|
|    S |   45|
|    C |   43|
|    B |   43|
|    ; |   42|
|    E |   40|
|    ✉ |   37|
|    R |   35|
|  Liu |   34|
| Zhou |   33|
|    ‡ |   33|
+------+-----+
only showing top 20 rows



In [26]:
# The number of total rows of this researchers dataframe
researchers.count()

98841

In [27]:
# put the df into pandas and export it into a single file for our purpoces
researchers.toPandas().to_csv('/content/drive/MyDrive/processed_data/researchers.csv',index=False)

In [28]:
# researchers by country
countries=final_df.groupBy(['country']).count()\
                  .na.drop(how='any')\
                  .sort(desc('count'))

In [29]:
# Showing the preview of the countries dataframe
countries.show()

+--------------------+-----+
|             country|count|
+--------------------+-----+
|                 USA| 8506|
|               China| 5622|
|               Italy| 3663|
|                  UK| 2910|
|             Germany| 1989|
|               Spain| 1515|
|               Japan| 1449|
|              France| 1392|
|               India| 1376|
|              Canada| 1292|
|              Brazil| 1208|
|           Australia| 1072|
|       United States| 1027|
|                Iran|  961|
|      United Kingdom|  893|
|         Switzerland|  645|
|              Taiwan|  566|
|United States of ...|  551|
|     The Netherlands|  545|
|             Belgium|  525|
+--------------------+-----+
only showing top 20 rows



In [30]:
# The number of total rows of this countries dataframe
countries.count()

823

In [31]:
# put the df into pandas and export it into a single file for our purpoces
countries.toPandas().to_csv('/content/drive/MyDrive/processed_data/countries.csv',index=False)