## Environment and Spark Session setup

In [1]:
import sys
sys.path.append("..")
from ctdi_treatment.env_setup_start import *
import time
license_keys = set_envvars('/home/ubuntu/hasham/jsl_keys.json')
spark = start_sparknlp(license_keys)

Desired SparkNLP Version: 3.1.3
Desired SparkNLP-JSL Version: 3.1.3
Real Spark NLP Version : 3.1.1
Real Spark NLP_JSL Version : 3.1.3


## Getting the joined `design_group` + `intervention` table

In [2]:
import pandas as pd
alldf = pd.read_csv("../data/raw_aac.csv")
print (alldf.columns)
alldf.head()

Index(['design_group_id', 'nct_id', 'group_type', 'title', 'arm_desc',
       'intervention_id', 'intervention_type', 'name', 'intervention_desc',
       'arm_title_desc', 'intervention_name_desc',
       'intervention_type_name_desc'],
      dtype='object')


Unnamed: 0,design_group_id,nct_id,group_type,title,arm_desc,intervention_id,intervention_type,name,intervention_desc,arm_title_desc,intervention_name_desc,intervention_type_name_desc
0,13299803,NCT00000435,Placebo Comparator,A,Subjects randomized to arm A received 25mg/day...,12910066,Drug,None-placebo,placebo was taken in pill form at 25mg/day for...,A\n\nSubjects randomized to arm A received 25m...,None-placebo: placebo was taken in pill form a...,Drug\nNone-placebo: placebo was taken in pill ...
1,13299804,NCT00000435,Active Comparator,B,Subjects randomized to Arm B received 25mg/day...,12910065,Drug,dnaJ peptide,dnaJP1 was taken in pill form at 25mg/day for ...,B\n\nSubjects randomized to Arm B received 25m...,dnaJ peptide: dnaJP1 was taken in pill form at...,Drug\ndnaJ peptide: dnaJP1 was taken in pill f...
2,13690845,NCT00001337,Experimental,Arm A,EPOCH + Rituximab every 3 weeks for 6 cycles.,13274570,Drug,EPOCH,Combination chemotherapy given with Rituximab ...,Arm A\n\nEPOCH + Rituximab every 3 weeks for 6...,EPOCH: Combination chemotherapy given with Rit...,Drug\nEPOCH: Combination chemotherapy given wi...
3,13690845,NCT00001337,Experimental,Arm A,EPOCH + Rituximab every 3 weeks for 6 cycles.,13274571,Biological,Rituximab,Rituximab given on Day 1 of combination chemot...,Arm A\n\nEPOCH + Rituximab every 3 weeks for 6...,Rituximab: Rituximab given on Day 1 of combina...,Biological\nRituximab: Rituximab given on Day ...
4,13732309,NCT00001563,Experimental,1,EPOCH-R every 3 weeks for up to 6 cycle,13311178,Biological,Filgrastim,Filgrastim after EPOCH-R from Day 6 for 10 day...,1\n\nEPOCH-R every 3 weeks for up to 6 cycle,Filgrastim: Filgrastim after EPOCH-R from Day ...,Biological\nFilgrastim: Filgrastim after EPOCH...


### Also calculating the fields that represent the actual input to the pipeline
#### Concatenation of ARM Title and Desc, Intervention Name and Desc,  Intervention Type, Name and Desc

In [3]:
name_sep = ": "
sent_sep = "."
tit_sep = "\n\n"

alldf["arm_title_desc"] = alldf["title"]+tit_sep+alldf["arm_desc"]
alldf["intervention_name_desc"] = alldf["name"]+name_sep+alldf["intervention_desc"]+sent_sep
alldf["intervention_type_name_desc"] = alldf["intervention_type"]+tit_sep+alldf["intervention_name_desc"]
alldf.shape

(45, 12)

In [4]:
annotation_df = alldf.groupby(["nct_id","design_group_id","title","arm_desc"]).agg(
    intervention_type_name_desc=("intervention_type_name_desc", tit_sep.join)
).reset_index()

# Exps Quadruplets: 1. DataFrame, 2. PipelineField, 3. AACT fallback field for drugs
exps = [
    (alldf[["nct_id","design_group_id","title","arm_desc","arm_title_desc"]].drop_duplicates(), 
     "arm_title_desc", "title"),
    
    (alldf[["nct_id","design_group_id","title","arm_desc","arm_title_desc","name","intervention_type","intervention_name_desc"]].drop_duplicates(), 
     "intervention_name_desc", "name"),
    
    (annotation_df, "intervention_type_name_desc", "name"),
]

In [5]:
annotation_df

Unnamed: 0,nct_id,design_group_id,title,arm_desc,intervention_type_name_desc
0,NCT00000435,13299803,A,Subjects randomized to arm A received 25mg/day...,Drug\n\nNone-placebo: placebo was taken in pil...
1,NCT00000435,13299804,B,Subjects randomized to Arm B received 25mg/day...,Drug\n\ndnaJ peptide: dnaJP1 was taken in pill...
2,NCT00001337,13690845,Arm A,EPOCH + Rituximab every 3 weeks for 6 cycles.,Drug\n\nEPOCH: Combination chemotherapy given ...
3,NCT00001563,13732309,1,EPOCH-R every 3 weeks for up to 6 cycle,Biological\n\nFilgrastim: Filgrastim after EPO...
4,NCT00001575,13299883,Anti-Tac yttrium 90-labeled humanized anti-Tac...,10 mCi (if a bone marrow transplant was part o...,Biological\n\nY-90 Humanized Anti-Tac: 10 mCi ...
5,NCT00001586,13299880,Low-Intermediate Risk B-Cell Pts,Previously untreated low or intermediate risk ...,Other\n\nLeukemic or stroma cells: Patients ar...
6,NCT00001586,13299881,Intermediate-high Risk B-Cell Pts,Previously untreated intermediate or high risk...,Biological\n\nRituximab: Rituxan.\n\nDrug\n\nF...
7,NCT00001984,13299801,Alemtuzumab and DSG,The recipients of live donor kidneys were trea...,Drug\n\nAlemtuzumab and DSG: Alemtuzumab was a...
8,NCT00002548,13296605,HDCTX and PBSC,High dose chemotherapy with peripheral blood s...,Drug\n\ndoxorubicin hydrochloride: 10 mg/m2/da...
9,NCT00002548,13296606,HDCTX with PBSC and Autologous BMT,High dose chemotherapy with peripheral blood s...,Drug\n\ncarmustine: 20 mg/m2 I.V. day 1 q 35 d...


## Pipeline Execution

### Building or Loading the Spark Pipeline

In [6]:
from ctdi_treatment.resolver_pipeline import ResolverPipeline
resolver_pipeline = ResolverPipeline(spark)

sbiobert_base_cased_mli download started this may take some time.
Approximate size to download 384.3 MB
[OK!]
sbiobertresolve_rxnorm_dispo download started this may take some time.
Approximate size to download 804.3 MB
[OK!]


In [14]:
resolver_pipeline.resolve(['acetaminophen'])[0]['resolution_rxnorm']

[Annotation(entity, 0, 12, 1125315, {'all_k_results': '1125315:::19052416:::40220874:::19122864:::19052418:::36216998:::40005655:::36244345:::40220875:::40005806:::36216996:::19072617:::40005733:::19018485:::40005663:::36216997:::1361205:::36244163:::19056032:::40005682:::36216999:::36217000:::19087812:::40229063:::40005662', 'all_k_distances': '0.0000:::4.8733:::5.5359:::5.9080:::5.9561:::6.8234:::7.1581:::7.1857:::7.3107:::7.3581:::7.4884:::7.5802:::7.6557:::7.6765:::7.7845:::7.9702:::8.0238:::8.0340:::8.0736:::8.3391:::8.4086:::8.4157:::8.6243:::8.6471:::8.7823', 'confidence': '0.9749', 'all_k_cosine_distances': '0.0000:::0.0393:::0.0503:::0.0578:::0.0595:::0.0776:::0.0872:::0.0870:::0.0900:::0.0907:::0.0929:::0.0959:::0.0979:::0.0975:::0.1007:::0.1063:::0.1080:::0.1103:::0.1070:::0.1165:::0.1175:::0.1194:::0.1220:::0.1264:::0.1306', 'all_k_resolutions': 'acetaminophen:::acetaminophen jr:::acetaminophen injection:::acetaminophen child:::acetaminophen pm:::acetaminophen oral product:

In [10]:
dispo_df = spark.read.parquet("../../rxnorm_dispo.parquet")

In [11]:
dispo_df.show(3)

+--------------------+----+------------+--------------------+--------------------+--------------------+-----------+
|               lterm|  id|concept_code|         rxnorm_term| sentence_embeddings|            sentence|destination|
+--------------------+----+------------+--------------------+--------------------+--------------------+-----------+
|acetaminophen / d...|8192|    40151278|acetaminophen / d...|[[sentence_embedd...|[[document, 0, 88...|       null|
|cytarabine liposo...|8193|    40175466|cytarabine liposo...|[[sentence_embedd...|[[document, 0, 50...|       null|
|neisseria meningi...|8194|    40173200|neisseria meningi...|[[sentence_embedd...|[[document, 0, 25...|       null|
+--------------------+----+------------+--------------------+--------------------+--------------------+-----------+
only showing top 3 rows



In [15]:
from ctdi_treatment.treatment_pipeline import build_treatment_pipeline, build_df, LightPipeline, PipelineModel
pl_name = "20210722_ner_pl_from_arms"
build_or_load = "build"#"load"
s = time.time()
if build_or_load=="build":
    pl = build_treatment_pipeline()
    plm = pl.fit(spark.createDataFrame([("",)]).toDF("text"))
else:
    plm = PipelineModel.load(f"models/{pl_name}")
print(time.time()-s)

embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]
pos_clinical download started this may take some time.
Approximate size to download 1.5 MB
[OK!]
dependency_conllu download started this may take some time.
Approximate size to download 16.7 MB
[OK!]
ner_posology download started this may take some time.
Approximate size to download 13.8 MB
[OK!]
ner_clinical download started this may take some time.
Approximate size to download 13.9 MB
[OK!]
ner_clinical_large download started this may take some time.
Approximate size to download 13.9 MB
[OK!]
ner_jsl download started this may take some time.
Approximate size to download 14.5 MB
[OK!]
assertion_dl download started this may take some time.
Approximate size to download 1.3 MB
[OK!]
sbiobert_base_cased_mli download started this may take some time.
Approximate size to download 384.3 MB
[OK!]
sbiobertresolve_rxnorm_dispo download started this may take some time.
Approximate size to downl

### Saving the Pipeline

In [7]:
save_pipeline = False
if save_pipeline:
    s = time.time()
    plm.write().overwrite().save(f"models/{pl_name}")
    print(time.time()-s)

### Creating SparkNLP LightPipeline to avoid Spark overhead

In [18]:
lpl = LightPipeline(plm)

In [19]:
use_light_pipeline = True
s = time.time()
dfs = []
for e in exps:
    if use_light_pipeline:
        dfs.append((e[-1],build_df(*e[:-1], lpl)))
    else:
        plm.stages[0].setInputCol(e[1])
        # These line here makes no sense in reality but just for reference if you were to use a cluster
        dfs.append((e[-1],plm.transform(spark.createDataFrame(e[0])).toPandas()))
print(time.time()-s)

410.96175360679626


In [20]:
for _,o in dfs:
    print(o.shape)

(13, 32)
(45, 35)
(13, 32)


## Running postprocessing for all experiments

In [21]:
import pandas as pd
from scipy.stats import mode
from ctdi_treatment.postprocessing import build_dict_acc, prepare_output_acc, aggregate_entity_dict, dict_diff_acc, dict_join_append_acc
f
or i, (ff,o) in enumerate(dfs):
    dfs[i][1]["entity_dict"] = o["full_chunk"].apply(build_dict_acc(by_sentence=False))
    dfs[i][1]["entity_dict_sent"] = o["full_chunk"].apply(build_dict_acc())
    dfs[i][1]["num_sents"] = o["sentence"].apply(len)
    dfs[i][1]["num_drugs"] = o["entity_dict"].apply(lambda x: len(x.get("Drug",[])))
    dfs[i][1]["output"] = o.apply(prepare_output_acc(fallback_field=ff, name_sep=name_sep, sent_sep=sent_sep, tit_sep=tit_sep), axis=1)
    dfs[i][1][['output_class', 'missing_entities', 'output']] = pd.DataFrame(o["output"].tolist(), index=o.index)

ModuleNotFoundError: No module named 'pylcs'

In [None]:
(_,dfx), (_,dfz), (_,dfa) = dfs

### Using ARM and Intervention level information together and calculate `combined` final output

### Aggregation of the Intervention level output

In [15]:
dfz_ = dfz.groupby(["nct_id","design_group_id"])\
    .agg({"name":name_sep.join,"title":max,"arm_desc":max,
           "intervention_name_desc":tit_sep.join,"entity_dict":aggregate_entity_dict,
          "output_class":lambda x: mode(x)[0],"missing_entities":sum,"output":sum,"num_drugs":sum}).reset_index()
dfz_.shape

(13, 11)

In [16]:
arm_int = pd.merge(dfx,dfz.drop(["nct_id","title","arm_desc","arm_title_desc"],axis=1),on="design_group_id")

In [17]:
required = ["Drug","Strength","Administration"]
arm_int["entity_diff_xy"] = arm_int[["entity_dict_x","entity_dict_y"]].apply(dict_diff_acc(False,required), axis=1)
arm_int["label_diff_xy"] = arm_int[["entity_dict_x","entity_dict_y"]].apply(dict_diff_acc(True,required), axis=1)
arm_int["labels_x"] = arm_int["entity_dict_x"].apply(lambda x: [y for y in x])
arm_int["labels_y"] = arm_int["entity_dict_y"].apply(lambda x: [y for y in x])

In [18]:
arm_int["output_read_x"] = arm_int["output_x"].apply(lambda x: "\n".join([str(y) for y in x]))
arm_int["output_read_y"] = arm_int["output_y"].apply(lambda x: "\n".join([str(y) for y in x]))

In [19]:
arm_int["combined"] = arm_int.apply(dict_join_append_acc(),axis=1)
arm_int["combined_read"] = arm_int["combined"].apply(lambda x: "\n".join([str(y) for y in x]))

In [20]:
arm_int[["nct_id","title","arm_title_desc","name","intervention_name_desc","output_class_x","output_class_y","output_x","output_y","entity_diff_xy","combined","output_read_x","output_read_y","combined_read"]]\
.to_csv("../data/arm_title_desc_int_name_desc_combined.csv",index=False)

## Create some preannotated data for the AnnotationLab

In [21]:
ann_df = pd.merge(dfx,dfa.drop(["nct_id","title","arm_desc"],axis=1),on="design_group_id")

In [22]:
from ctdi_treatment.annotation_lab import write_json_files
write_json_files(ann_df, ["nct_id","title"],[("arm","arm_title_desc","full_chunk_x","relations_x"),
                  ("int","intervention_type_name_desc","full_chunk_y","relations_y")], out_path="../data")