## Data Engineering Project

#### Project Summary

The object of this project is to create a data pipeline in Python to extract informations from the following sources,
transform them and load them to our final data model:

- **drugs.csv** contains drugs informations:
 * `atccode` = drug id
 * `drug` = drug name

- **pubmed.csv** comes from PubMed, which is a free search engine accessing primarily the MEDLINE database of references and abstracts on life sciences and biomedical topics. It contains:
 * `id` = publication id
 * `title` = publication title
 * `date` = publication date
 * `journal` = journal name
 
- **clinical_trials.csv** come from the largest clinical trials database ClinicalTrials.gov:
 * `id` = publication id
 * `scientific_title` = publication title
 * `date` = publication date
 * `journal` = journal name

We'll use **PySpark** considering the scalability in the future.
 
And the project follows the follow steps:
* Step 1: Explore and Clean the Data
* Step 2: Define the Data Model
* Step 3: Run ETL to Model the Data
* Step 4: Traitement ad-hoc
* Step 5: To go further

In [254]:
# Do all imports and installs here
import json
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import countDistinct, collect_list, date_format, lower, col, to_date, when, col, lit

In [241]:
# create spark session
spark = SparkSession.builder\
                    .config("spark.jars.packages")\
                    .getOrCreate()

### Step 1: Explore and Clean the Data
#### Explore the Data 
Data quality issues:
###### drug
- Better to use lowercase of the drugs names

###### pubmed
- Change title text to lowercase for matching drug names
- Date value need to be in a uniform format

###### clinical trials
- Missing value
- Duplicated records
- Date value need to be in a uniform format

#### Cleaning Steps

###### drug
- Change drug name to lowercase

###### pubmed
- Change title value to lowercase
- Change date value to date format yyyy-mm-dd

###### clinical trials
- Change title value to lowercase
- Change date value to date format yyyy-mm-dd
- Fill missing value for column of id and journal, then remove duplicated values 

In [242]:
# load drugs csv
df_drug = spark.read.options(header='True',inferSchema='True')\
                        .csv("drugs.csv")
df_drug.toPandas()

Unnamed: 0,atccode,drug
0,A04AD,DIPHENHYDRAMINE
1,S03AA,TETRACYCLINE
2,V03AB,ETHANOL
3,A03BA,ATROPINE
4,A01AD,EPINEPHRINE
5,6302001,ISOPRENALINE
6,R01AD,BETAMETHASONE


In [246]:
# clean drugs
df_drug = df_drug.withColumn('drug', lower(col('drug')))
df_drug.toPandas()

Unnamed: 0,atccode,drug
0,A04AD,diphenhydramine
1,S03AA,tetracycline
2,V03AB,ethanol
3,A03BA,atropine
4,A01AD,epinephrine
5,6302001,isoprenaline
6,R01AD,betamethasone


In [243]:
# load pubmed csv
df_pubmed = spark.read.options(header='True',inferSchema='True')\
                        .csv("pubmed.csv")
df_pubmed.toPandas()

Unnamed: 0,id,title,date,journal
0,1,A 44-year-old man with erythema of the face di...,01/01/2019,Journal of emergency nursing
1,2,"An evaluation of benadryl, pyribenzamine, and ...",01/01/2019,Journal of emergency nursing
2,3,Diphenhydramine hydrochloride helps symptoms o...,02/01/2019,The Journal of pediatrics
3,4,Tetracycline Resistance Patterns of Lactobacil...,01/01/2020,Journal of food protection
4,5,Appositional Tetracycline bone formation rates...,02/01/2020,American journal of veterinary research
5,6,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,7,The High Cost of Epinephrine Autoinjectors and...,01/02/2020,The journal of allergy and clinical immunology...
7,8,Time to epinephrine treatment is associated wi...,01/03/2020,The journal of allergy and clinical immunology...


In [247]:
# clean pubmed
df_pubmed = df_pubmed.withColumn('title', lower(col('title')))\
                    .withColumn("date", when(to_date('date', 'dd/MM/yyyy').isNotNull(),\
                                             to_date('date', 'dd/MM/yyyy')).otherwise(col('date')))
df_pubmed.toPandas()

Unnamed: 0,id,title,date,journal
0,1,a 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
1,2,"an evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing
2,3,diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics
3,4,tetracycline resistance patterns of lactobacil...,2020-01-01,Journal of food protection
4,5,appositional tetracycline bone formation rates...,2020-01-02,American journal of veterinary research
5,6,rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,7,the high cost of epinephrine autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...
7,8,time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...


In [244]:
# load clinical_trials csv
df_clinical = spark.read.options(header='True',inferSchema='True')\
                        .csv("clinical_trials.csv")
df_clinical.toPandas()

Unnamed: 0,id,scientific_title,date,journal
0,NCT01967433,Use of Diphenhydramine as an Adjunctive Sedati...,1 January 2020,Journal of emergency nursing
1,NCT04189588,Phase 2 Study IV QUZYTTIR™ (Cetirizine Hydroch...,1 January 2020,Journal of emergency nursing
2,NCT04237090,,1 January 2020,Journal of emergency nursing
3,NCT04237091,Feasibility of a Randomized Controlled Clinica...,1 January 2020,Journal of emergency nursing
4,NCT04153396,Preemptive Infiltration With Betamethasone and...,1 January 2020,Hôpitaux Universitaires de Genève
5,NCT03490942,Glucagon Infusion in T1D Patients With Recurre...,25/05/2020,
6,,Glucagon Infusion in T1D Patients With Recurre...,25/05/2020,Journal of emergency nursing
7,NCT04188184,Tranexamic Acid Versus Epinephrine During Expl...,27 April 2020,Journal of emergency nursing\xc3\x28


In [248]:
# fill na and remove duplicated record
w = Window.partitionBy(df_clinical.scientific_title, df_clinical.date)
df_clinical = df_clinical.na.drop(subset=["scientific_title"])\
            .withColumn("journal", collect_list("journal").over(w)[0])\
            .withColumn("id", collect_list("id").over(w)[0])\
            .dropDuplicates()

# change date format and lowercase title
df_clinical = df_clinical.withColumn('scientific_title', lower(col('scientific_title')))\
                        .withColumnRenamed('scientific_title', 'title')\
                        .withColumn("date", when(to_date('date', 'dd MMMM yyyy').isNotNull(), \
                                                 to_date('date', 'dd MMMM yyyy')).otherwise(to_date('date', 'dd/MM/yyyy')))
df_clinical.toPandas()

Unnamed: 0,id,title,date,journal
0,NCT01967433,use of diphenhydramine as an adjunctive sedati...,2020-01-01,Journal of emergency nursing
1,NCT03490942,glucagon infusion in t1d patients with recurre...,2020-05-25,Journal of emergency nursing
2,NCT04189588,phase 2 study iv quzyttir™ (cetirizine hydroch...,2020-01-01,Journal of emergency nursing
3,NCT04153396,preemptive infiltration with betamethasone and...,2020-01-01,Hôpitaux Universitaires de Genève
4,NCT04237090,,2020-01-01,Journal of emergency nursing
5,NCT04237091,feasibility of a randomized controlled clinica...,2020-01-01,Journal of emergency nursing
6,NCT04188184,tranexamic acid versus epinephrine during expl...,2020-04-27,Journal of emergency nursing\xc3\x28


### Step 2: Define the Data Model
![Tux, the Linux mascot](schema.PNG)
#### 3.1 Conceptual Data Model
Our final schema will represent relations between the drug and the published articles in which the drug name has been mentioned.
And we will name the final model **mention_records**, it contains following attributes:
 * `atccode` = drug id
 * `drug` = drug name
 * `id` = article id
 * `title` = article title
 * `date` = publishing date
 * `journal` = journal name
 * `publication` = publishing plateform 'pubmed' or 'clinical trials'

### Step 3: Run ETL to Model the Data

In [251]:
# Records drug name mentiond in pubmed
df_drug_pubmed = df_drug.join(df_pubmed, \
             df_pubmed.title.contains(df_drug.drug), \
             how = "inner")\
            .withColumn('publication', lit('PubMed'))
df_drug_pubmed.toPandas()

Unnamed: 0,atccode,drug,id,title,date,journal,publication
0,A04AD,diphenhydramine,1,a 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing,PubMed
1,A04AD,diphenhydramine,2,"an evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing,PubMed
2,A04AD,diphenhydramine,3,diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics,PubMed
3,S03AA,tetracycline,4,tetracycline resistance patterns of lactobacil...,2020-01-01,Journal of food protection,PubMed
4,S03AA,tetracycline,5,appositional tetracycline bone formation rates...,2020-01-02,American journal of veterinary research,PubMed
5,S03AA,tetracycline,6,rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology,PubMed
6,V03AB,ethanol,6,rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology,PubMed
7,A01AD,epinephrine,7,the high cost of epinephrine autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...,PubMed
8,A01AD,epinephrine,8,time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...,PubMed


In [252]:
# Records drug name mentiond in clinical trials
df_drug_clinical = df_drug.join(df_clinical, \
             df_clinical.title.contains(df_drug.drug), \
             how = "inner")\
            .withColumn('publication', lit('clinical trials'))
df_drug_clinical.toPandas()

Unnamed: 0,atccode,drug,id,title,date,journal,publication
0,A04AD,diphenhydramine,NCT01967433,use of diphenhydramine as an adjunctive sedati...,2020-01-01,Journal of emergency nursing,clinical trials
1,A04AD,diphenhydramine,NCT04189588,phase 2 study iv quzyttir™ (cetirizine hydroch...,2020-01-01,Journal of emergency nursing,clinical trials
2,R01AD,betamethasone,NCT04153396,preemptive infiltration with betamethasone and...,2020-01-01,Hôpitaux Universitaires de Genève,clinical trials
3,A04AD,diphenhydramine,NCT04237091,feasibility of a randomized controlled clinica...,2020-01-01,Journal of emergency nursing,clinical trials
4,A01AD,epinephrine,NCT04188184,tranexamic acid versus epinephrine during expl...,2020-04-27,Journal of emergency nursing\xc3\x28,clinical trials


In [253]:
# Union all the records
mention_records = df_drug_pubmed.union(df_drug_clinical)
mention_records.toPandas()

Unnamed: 0,atccode,drug,id,title,date,journal,publication
0,A04AD,diphenhydramine,1,a 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing,PubMed
1,A04AD,diphenhydramine,2,"an evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing,PubMed
2,A04AD,diphenhydramine,3,diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics,PubMed
3,S03AA,tetracycline,4,tetracycline resistance patterns of lactobacil...,2020-01-01,Journal of food protection,PubMed
4,S03AA,tetracycline,5,appositional tetracycline bone formation rates...,2020-01-02,American journal of veterinary research,PubMed
5,S03AA,tetracycline,6,rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology,PubMed
6,V03AB,ethanol,6,rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology,PubMed
7,A01AD,epinephrine,7,the high cost of epinephrine autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...,PubMed
8,A01AD,epinephrine,8,time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...,PubMed
9,A04AD,diphenhydramine,NCT01967433,use of diphenhydramine as an adjunctive sedati...,2020-01-01,Journal of emergency nursing,clinical trials


In [255]:
# Save final dataframe into a single JSON file
with open("mention_records.json","w") as f:
    f.write(json.dumps(json.loads(mention_records.toPandas().to_json(orient='records')), indent=2))

### Step 4: Traitement ad-hoc
 - Question: Which journal has mentioned the most different drugs?
 - The answer is "Psychopharmacology"

In [265]:
# Which journal has mentioned the most different drugs?
mention_records = spark.read.option("multiLine","true").json("mention_records.json")
mention_records.groupBy("journal").agg(countDistinct("drug").alias("countDistinct")).sort('countDistinct', ascending=False).show()

+--------------------+-------------+
|             journal|countDistinct|
+--------------------+-------------+
|  Psychopharmacology|            2|
|The journal of al...|            1|
|Hôpitaux Universi...|            1|
|Journal of emerge...|            1|
|The Journal of pe...|            1|
|Journal of food p...|            1|
|American journal ...|            1|
|Journal of emerge...|            1|
+--------------------+-------------+



### Step 5: To go further
* What are the elements to consider in order to develop your code so that it can handle large volumes of data (files of several TB or millions of files for example)?
    - Use Spark cluster to parallelise calculations across multiple computers.
    - Use distributed storage system to enable a quick access to data over a large number of nodes. e.g. Hadoop HDFS, AWS S3, etc.
    
* Could you describe the changes that would need to be made, if any, to take into account such volumes? 
![Tux, the Linux mascot](datalake.png)