In [None]:
!pip install apache_beam

In [2]:
import string
import typing
import re

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

pipeline = beam.Pipeline(InteractiveRunner())

In [4]:
clinical_trials_dataset_path = "/content/Test-Servier/clinical_trials.csv"
drugs_dataset_path = "/content/Test-Servier/drugs.csv"
pubmed_dataset_path = "/content/Test-Servier/pubmed.csv"
pubmed_json_dataset_path = "/content/Test-Servier/pubmed.json"
drug_mention_output_path = "/content/Test-Servier/drug_mention"

# Data Structure :

In [None]:
class ClinicalTrial(typing.NamedTuple):
    id: str
    title: str
    date: int
    journal: str

class Drug(typing.NamedTuple):
    id: str
    name: str

class PubMed(typing.NamedTuple):
    id: str
    title: str
    date: int
    journal: str

class Mention(typing.NamedTuple):
    drug_id: str
    drug_name: str
    publication_type: str
    publication_id: str
    publication_title: str
    publication_date: int
    publication_journal: str


# Data Load :

In [30]:
beam.__version__

'2.54.0'

In [38]:

clinical_trials = pipeline | "ReadClinicalTrials" >>  beam.dataframe.io.read_csv(
    clinical_trials_dataset_path,
    header=0,
    names=["id", "title", "date", "journal"],
    dtype={
        "id": str,
        "title": str,
        "date": str,
        "journal": str,
    },
    parse_dates=["date"],
    dayfirst=True,
).with_output_types(ClinicalTrial)
ib.collect(clinical_trials)

Unnamed: 0,id,title,date,journal
0,NCT01967433,Use of Diphenhydramine as an Adjunctive Sedati...,2020-01-01,Journal of emergency nursing
1,NCT04189588,Phase 2 Study IV QUZYTTIR™ (Cetirizine Hydroch...,2020-01-01,Journal of emergency nursing
2,NCT04237090,,2020-01-01,Journal of emergency nursing
3,NCT04237091,Feasibility of a Randomized Controlled Clinica...,2020-01-01,Journal of emergency nursing
4,NCT04153396,Preemptive Infiltration With Betamethasone and...,2020-01-01,Hôpitaux Universitaires de Genève
5,NCT03490942,Glucagon Infusion in T1D Patients With Recurre...,2020-05-25,
6,,Glucagon Infusion in T1D Patients With Recurre...,2020-05-25,Journal of emergency nursing
7,NCT04188184,Tranexamic Acid Versus Epinephrine During Expl...,2020-04-27,Journal of emergency nursing\xc3\x28


In [33]:
drugs = pipeline | "ReadDrugs" >> beam.dataframe.io.read_csv(
    drugs_dataset_path,
    header=0,
    names=["id", "name"],
    dtype={"id": str, "name": str},
).with_output_types(Drug)

ib.collect(drugs)

Unnamed: 0,id,name
0,A04AD,DIPHENHYDRAMINE
1,S03AA,TETRACYCLINE
2,V03AB,ETHANOL
3,A03BA,ATROPINE
4,A01AD,EPINEPHRINE
5,6302001,ISOPRENALINE
6,R01AD,BETAMETHASONE


In [41]:
pubmed = pipeline | "ReadPubmed" >> beam.dataframe.io.read_csv(
    pubmed_dataset_path,
    header=0,
    names=["id", "title", "date", "journal"],
    dtype={"id": str,"title": str,"date": str,"journal": str,},
    parse_dates=["date"],
    dayfirst=True,
).with_output_types(PubMed)

ib.collect(pubmed)

Unnamed: 0,id,title,date,journal
0,1,A 44-year-old man with erythema of the face di...,2019-01-01,Journal of emergency nursing
1,2,"An evaluation of benadryl, pyribenzamine, and ...",2019-01-01,Journal of emergency nursing
2,3,Diphenhydramine hydrochloride helps symptoms o...,2019-01-02,The Journal of pediatrics
3,4,Tetracycline Resistance Patterns of Lactobacil...,2020-01-01,Journal of food protection
4,5,Appositional Tetracycline bone formation rates...,2020-01-02,American journal of veterinary research
5,6,Rapid reacquisition of contextual fear followi...,2020-01-01,Psychopharmacology
6,7,The High Cost of Epinephrine Autoinjectors and...,2020-02-01,The journal of allergy and clinical immunology...
7,8,Time to epinephrine treatment is associated wi...,2020-03-01,The journal of allergy and clinical immunology...


# Utilitaire :

In [10]:
def title_words_key(element):
    words = re.findall(r'\b\w+\b', element.title)
    for word in words:
        yield (word.upper(), element)

def name_key(element):
    return (element.name, element)

def clinical_trials_mentions(element):
    for drug in element[1]["drugs"]:
        for clinical_trial in element[1]["clinical_trials"]:
            yield Mention(
                drug_id=drug.id,
                drug_name=drug.name,
                publication_type="CLINICAL_TRIAL",
                publication_id=clinical_trial.id,
                publication_title=clinical_trial.title,
                publication_date=clinical_trial.date,
                publication_journal=clinical_trial.journal,
            )
def pubmed_mentions(element):
    for drug in element[1]["drugs"]:
        for pubmed in element[1]["pubmed"]:
            yield Mention(
                drug_id=drug.id,
                drug_name=drug.name,
                publication_type="PUBMED",
                publication_id=pubmed.id,
                publication_title=pubmed.title,
                publication_date=pubmed.date,
                publication_journal=pubmed.journal,
            )

In [None]:
clinical_trials_by_word_keys = clinical_trials | beam.FlatMap(
    title_words_key
    ).with_output_types(typing.Tuple[str, ClinicalTrial])
ib.collect(clinical_trials_by_word_keys)

In [None]:
pubmed_words = pubmed | beam.FlatMap(title_words_key).with_output_types(
    typing.Tuple[str, PubMed]
)

ib.collect(pubmed_words)

In [None]:
drugs_by_word_keys = drugs | beam.Map(name_key).with_output_types(
    typing.Tuple[str, Drug]
)

ib.collect(drugs_by_word_keys)

# Get Mentions :

In [None]:
clinical_trials_mentions = (
    {
        "clinical_trials": clinical_trials_by_word_keys,
        "drugs": drugs_by_word_keys,
    }
    | beam.CoGroupByKey()
    | beam.FlatMap(clinical_trials_mentions).with_output_types(Mention)
)

ib.collect(clinical_trials_mentions)

In [None]:
drugs_pubmed_mentions = (
    {
        "pubmed": pubmed_words,
        "drugs": drugs_by_word_keys,
    }
    | beam.CoGroupByKey()
    | beam.FlatMap(pubmed_mentions).with_output_types(Mention)
)

ib.collect(drugs_pubmed_mentions)

# Result :

In [None]:
drug_mentions = (
    clinical_trials_mentions,
    drugs_pubmed_mentions,
) | beam.Flatten().with_output_types(Mention)

ib.collect(drug_mentions)

In [23]:
drug_mentions_write = drug_mentions | beam.io.WriteToJson(
    drug_mention_output_path,
    orient="records",
    date_format="iso",
    lines=True,
)

ib.show(drug_mentions_write)

# 6. Pour aller plus loin
Par retour de mail (ou directement sur le repo git si vous le souhaitez), vous pouvez répondre aux questions suivantes (ne nécessite pas d’implémentation dans votre projet) :


- Quels sont les éléments à considérer pour faire évoluer votre code afin qu’il puisse gérer de grosses volumétries de données (fichiers de plusieurs To ou millions de fichiers par exemple) ?

- Réponse : le découpage des titres en motes séparés des publications médicales et des essaie cliniques, fait que la volumétrie va drastiquement augmenter, ainsi l'utilisation de `CoGroupByKey` va entraîner des goulets d'étranglement.

----

- Pourriez-vous décrire les modifications qu’il faudrait apporter, s’il y en a, pour prendre en considération de telles volumétries ?

- Réponse : le moyen de remédier à ceci est de partitionner les données et d'utiliser le système des `window`.

# 7. Structure du code :
Pour améliorer la maintenabilité et la réutilisabilité du code, j'aurais dû mieux organiser mon code en module Python en respectant les consignes suivantes :

- Ainsi, nous allons séparer toutes les classes de schémas de données telles que class ClinicalTrial, class Drug, class PubMed, class Mention, dans un sous-module data.schema.

- Toutes les fonctions permettant de charger et de sauvegarder les données seront regroupées dans data.io.

- Les fonctions traitant les titres ainsi que les petites fonctions utilitaires seront placées dans data.utils.

- Toutes les transformations seront regroupées dans un sous-module transform.

- Le point d'entrée de notre module sera le fichier main.py à la racine, qui permettra de parser les arguments avec argparser et d'exécuter le pipeline.