# Examinations Graph

## Import Moduli

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, arrays_zip, monotonically_increasing_id, lit, split, to_date, upper, when
from pyspark.ml import Pipeline

print("Apache Spark version: ", spark.version)

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

## Neo4j Config

In [None]:
# Sostituire myip con l'IP su cui è installato il db
url = "bolt://myip:7687"
username = "neo4j"
password = "neo4j"

## Lettura Dataset

### Visite

In [None]:
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
path = "/FileStore/tables/DF_VISITA_COMPLETO.csv"


exams = spark.read.format(file_type) \
             .option("inferSchema", infer_schema) \
             .option("header", first_row_is_header) \
             .option("multiLine", "true") \
             .option("sep", delimiter) \
             .load(path)

exams = exams.withColumn("DATA_EVENTO", to_date(col("DATA_EVENTO"), "yyyy-MM-dd")) \
             .replace("null", None) \
             .withColumn("ESAMINATORE", upper(col("ESAMINATORE")))

### Anamnesi

In [None]:
path = "/FileStore/tables/anamnesi_completa.json"

res_anamnesi = spark.read.format("json") \
                    .option("inferSchema", "true") \
                    .load(path)

In [None]:
print("Anamnesi analizzate dal NER:", res_anamnesi.count())

In [None]:
entities_anamnesi = res_anamnesi.select("ID_VISITA", "chunk.result", "chunk.metadata") \
                                .withColumn("tmp", arrays_zip("result", "metadata")) \
                                .withColumn("tmp", explode("tmp")) \
                                .select("ID_VISITA", "tmp.result", "tmp.metadata.entity") \
                                .withColumnRenamed("result", "chunk") \
                                .withColumn("chunk", upper(col("chunk")))

In [None]:
print("Coppie (visita, entity) di anamnesi:", entities_anamnesi.count())
print("Coppie (visita, entity) di anamnesi distinte:", entities_anamnesi.distinct().count())

In [None]:
entities_anamnesi = entities_anamnesi.drop_duplicates()

print("Coppie (visita, entity) di anamnesi:", entities_anamnesi.count())

### Diagnosi

In [None]:
path = "/FileStore/tables/diagnosi_completa.json"

res_diagnosi = spark.read.format("json") \
                    .option("inferSchema", "true") \
                    .load(path)

In [None]:
print("Diagnosi analizzate dal NER:", res_diagnosi.count())

In [None]:
entities_diagnosi = res_diagnosi.select("ID_VISITA", "chunk.result", "chunk.metadata") \
                                .withColumn("tmp", arrays_zip("result", "metadata")) \
                                .withColumn("tmp", explode("tmp")) \
                                .select("ID_VISITA", "tmp.result", "tmp.metadata.entity") \
                                .withColumnRenamed("result", "chunk") \
                                .withColumn("chunk", upper(col("chunk")))

In [None]:
print("Coppie (visita, entity) di diagnosi:", entities_diagnosi.count())
print("Coppie (visita, entity) di diagnosi distinte:", entities_diagnosi.distinct().count())

In [None]:
entities_diagnosi = entities_diagnosi.drop_duplicates()

print("Tuple (paziente, medico, data, entità):", entities_diagnosi.count())

### Farmaci

In [None]:
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
path = "/FileStore/tables/DF_FARMACI_COMPLETO.csv"

drugs = spark.read.format(file_type) \
             .option("inferSchema", infer_schema) \
             .option("header", first_row_is_header) \
             .option("multiLine", "true") \
             .option("sep", delimiter) \
             .load(path)

drugs = drugs.replace("null", None).dropna(subset=["FARMACO_FINALE"])

In [None]:
display(drugs.limit(5))

FARMACO_FINALE,ID_VISITA,CONFEZIONE,DOSE
SIRDALUD,34,SIRDALUD,un/una al mattino e sera
NORVASC,143,NORVASC 5,1 cp al pomeriggio
ENAPREN,207,ENAPREN 20,un/una al mattino
TOTALIP,242,TOTALIP 20,un/una alla sera
ISOPTIN,324,ISOPTIN 240,un/una al mattino


## Entità

### Pazienti

In [None]:
patients = exams.select("CODPAZ") \
                .distinct() \
                .withColumnRenamed("CODPAZ", "codice") \
                .orderBy("codice")


print("Pazienti distinti:", patients.count())

In [None]:
patients.write.format("org.neo4j.spark.DataSource") \
        .option("url", url) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("labels", ":Paziente") \
        .option("node.keys", "codice") \
        .mode("Overwrite") \
        .save()

### Dottori

In [None]:
doctors = exams.select("COD_MEDICO_FIRMANTE", "ESAMINATORE") \
               .distinct() \
               .withColumnRenamed("COD_MEDICO_FIRMANTE", "codice") \
               .withColumnRenamed("ESAMINATORE", "nome") \
               .orderBy("codice") \
               .dropna(subset=["codice"])

print("Medici distinti:", doctors.count())

In [None]:
display(doctors.distinct())

codice,nome
1_11,CARMINE MORISCO
1_13,M.A.E. RAO
1_15,
1_16_1,RAFFAELE IZZO
1_16_3,N. DE LUCA
1_19,
1_20,
1_21,ORESTE ARCUCCI
1_22,
1_22_1,LETIZIA SPINELLI


In [None]:
doctors.write.format("org.neo4j.spark.DataSource") \
        .option("url", url) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("labels", ":Medico") \
        .option("node.keys", "codice") \
        .option("node.properties", "nome") \
        .mode("Overwrite") \
        .save()

### Visita

In [None]:
exams_ = exams.select("ID_VISITA", "DATA_EVENTO") \
              .distinct() \
              .withColumnRenamed("ID_VISITA", "id") \
              .withColumnRenamed("DATA_EVENTO", "data") \
              .orderBy("id")

print("Visite distinte:", exams_.count())

In [None]:
display(exams_.limit(5))

id,data
0,2008-03-27
1,2008-06-10
2,2019-03-29
3,2019-05-03
4,2019-11-25


In [None]:
exams_.write.format("org.neo4j.spark.DataSource") \
      .option("url", url) \
      .option("authentication.type", "basic") \
      .option("authentication.basic.username", username) \
      .option("authentication.basic.password", password) \
      .option("labels", ":Visita") \
      .option("node.keys", "id, data") \
      .mode("Overwrite") \
      .save()

### Malattia

In [None]:
disease_anamnesi =  entities_anamnesi.filter("entity == 'Disease'") \
                                     .select("chunk") \
                                     .withColumnRenamed("chunk", "nome") \
                                     .distinct()

disease_diagnosi =  entities_diagnosi.filter("entity == 'Disease'") \
                                     .select("chunk") \
                                     .withColumnRenamed("chunk", "nome") \
                                     .distinct()

print("Malattie di anamnesi distinte:", disease_anamnesi.count())
print("Malattie di diagnosi distinte:", disease_diagnosi.count())

In [None]:
diseases = disease_anamnesi.union(disease_diagnosi).distinct()

print("Malattie distinte:", diseases.count())

In [None]:
diseases.write.format("org.neo4j.spark.DataSource") \
        .option("url", url) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("labels", ":Malattia") \
        .option("node.keys", "nome") \
        .mode("Overwrite") \
        .save()

### Sintomo

In [None]:
symptom_anamnesi =  entities_anamnesi.filter("entity == 'Symptom'") \
                                     .select("chunk") \
                                     .withColumnRenamed("chunk", "nome") \
                                     .distinct()

symptom_diagnosi =  entities_diagnosi.filter("entity == 'Symptom'") \
                                     .select("chunk") \
                                     .withColumnRenamed("chunk", "nome") \
                                     .distinct()

print("Sintomi di anamnesi distinti:", symptom_anamnesi.count())
print("Sintomi di diagnosi distinti:", symptom_diagnosi.count())

In [None]:
symptoms = symptom_anamnesi.union(symptom_diagnosi).distinct()

print("Sintomi distinti:", symptoms.count())

In [None]:
symptoms.write.format("org.neo4j.spark.DataSource") \
        .option("url", url) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", username) \
        .option("authentication.basic.password", password) \
        .option("labels", ":Sintomo") \
        .option("node.keys", "nome") \
        .mode("Overwrite") \
        .save()

### Farmaco

In [None]:
drugs_ = drugs.select("FARMACO_FINALE") \
              .distinct() \
              .withColumnRenamed("FARMACO_FINALE", "nome") \
              .orderBy("nome")

print("Farmaci distinti:", drugs_.count())

In [None]:
display(drugs_)

nome
A
A PARTIRE DA GG PRIMA
A PARTIRE DA SABATO
A TEM AEREOSOL DOSATO
ABASAGLAR KWIKPEN
ABATACEPT
ABBA
ABIETTA
ABILIFY
ABOLIRE FUMO DI SIGARETT


In [None]:
drugs_.write.format("org.neo4j.spark.DataSource") \
      .option("url", url) \
      .option("authentication.type", "basic") \
      .option("authentication.basic.username", username) \
      .option("authentication.basic.password", password) \
      .option("labels", ":Farmaco") \
      .option("node.keys", "nome") \
      .mode("Overwrite") \
      .save()

## Relazioni

### Paziente - Visita

In [None]:
patient_exam =  exams.select("ID_VISITA", "CODPAZ", "SESSO", "ETA", "PESO", "ALTEZZA") \
                    .distinct() \
                    .withColumnRenamed("ID_VISITA", "id") \
                    .withColumnRenamed("CODPAZ", "codice") \
                    .orderBy("id")

In [None]:
print(patient_exam.count())

In [None]:
patient_exam.write.format("org.neo4j.spark.DataSource")\
            .option("url", url) \
            .option("authentication.type", "basic") \
            .option("authentication.basic.username", username) \
            .option("authentication.basic.password", password) \
            .option("relationship", "SI_SOTTOPONE") \
            .option("relationship.save.strategy", "keys") \
            .option("relationship.source.labels", ":Paziente") \
            .option("relationship.source.save.mode", "overwrite") \
            .option("relationship.source.node.keys", "codice") \
            .option("relationship.target.labels", ":Visita") \
            .option("relationship.target.node.keys", "id") \
            .option("relationship.target.save.mode", "overwrite") \
            .option("relationship.properties", "SESSO:sesso, ETA:eta, PESO:peso, ALTEZZA:altezza")\
            .mode("Overwrite") \
            .save()

### Dottore - Visita

In [None]:
doctor_exam =  exams.select("ID_VISITA", "COD_MEDICO_FIRMANTE") \
                    .distinct() \
                    .withColumnRenamed("ID_VISITA", "id") \
                    .withColumnRenamed("COD_MEDICO_FIRMANTE", "codice") \
                    .orderBy("id") \
                    .dropna()

In [None]:
print(doctor_exam.count())

In [None]:
doctor_exam.write.format("org.neo4j.spark.DataSource")\
            .option("url", url) \
            .option("authentication.type", "basic") \
            .option("authentication.basic.username", username) \
            .option("authentication.basic.password", password) \
            .option("relationship", "EFFETTUA") \
            .option("relationship.save.strategy", "keys") \
            .option("relationship.source.labels", ":Medico") \
            .option("relationship.source.save.mode", "overwrite") \
            .option("relationship.source.node.keys", "codice") \
            .option("relationship.target.labels", ":Visita") \
            .option("relationship.target.node.keys", "id") \
            .option("relationship.target.save.mode", "overwrite") \
            .mode("Overwrite") \
            .save()

### Malattia - Visita

#### Anamnesi

In [None]:
disease_anamnesi = entities_anamnesi.filter("entity == 'Disease'") \
                                   .select("ID_VISITA", "chunk") \
                                   .withColumnRenamed("chunk", "nome") \
                                   .withColumnRenamed("ID_VISITA", "id")

In [None]:
disease_anamnesi.write.format("org.neo4j.spark.DataSource")\
                .option("url", url) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", username) \
                .option("authentication.basic.password", password) \
                .option("relationship", "ANAMNESI") \
                .option("relationship.save.strategy", "keys") \
                .option("relationship.source.labels", ":Malattia") \
                .option("relationship.source.save.mode", "overwrite") \
                .option("relationship.source.node.keys", "nome") \
                .option("relationship.target.labels", ":Visita") \
                .option("relationship.target.node.keys", "id") \
                .option("relationship.target.save.mode", "overwrite") \
                .mode("Overwrite") \
                .save()

#### Diagnosi

In [None]:
disease_diagnosi = entities_diagnosi.filter("entity == 'Disease'") \
                                   .select("ID_VISITA", "chunk") \
                                   .withColumnRenamed("chunk", "nome") \
                                   .withColumnRenamed("ID_VISITA", "id")

In [None]:
print(disease_diagnosi.count())
print(disease_diagnosi.distinct().count())

In [None]:
disease_diagnosi.write.format("org.neo4j.spark.DataSource")\
                .option("url", url) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", username) \
                .option("authentication.basic.password", password) \
                .option("relationship", "DIAGNOSI") \
                .option("relationship.save.strategy", "keys") \
                .option("relationship.source.labels", ":Visita") \
                .option("relationship.source.save.mode", "overwrite") \
                .option("relationship.source.node.keys", "id") \
                .option("relationship.target.labels", ":Malattia") \
                .option("relationship.target.node.keys", "nome") \
                .option("relationship.target.save.mode", "overwrite") \
                .option("batch.size", "10000") \
                .mode("Overwrite") \
                .save()

### Sintomo - Visita

#### Anamnesi

In [None]:
symptom_anamnesi = entities_anamnesi.filter("entity == 'Symptom'") \
                                   .select("ID_VISITA", "chunk") \
                                   .withColumnRenamed("chunk", "nome") \
                                   .withColumnRenamed("ID_VISITA", "id")

In [None]:
print(symptom_anamnesi.count())
print(symptom_anamnesi.distinct().count())

In [None]:
symptom_anamnesi.write.format("org.neo4j.spark.DataSource")\
                .option("url", url) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", username) \
                .option("authentication.basic.password", password) \
                .option("relationship", "ANAMNESI") \
                .option("relationship.save.strategy", "keys") \
                .option("relationship.source.labels", ":Sintomo") \
                .option("relationship.source.save.mode", "overwrite") \
                .option("relationship.source.node.keys", "nome") \
                .option("relationship.target.labels", ":Visita") \
                .option("relationship.target.node.keys", "id") \
                .option("relationship.target.save.mode", "overwrite") \
                .option("batch.size", "10000") \
                .mode("Overwrite") \
                .save()

#### Diagnosi

In [None]:
symptom_diagnosi = entities_diagnosi.filter("entity == 'Symptom'") \
                                   .select("ID_VISITA", "chunk") \
                                   .withColumnRenamed("chunk", "nome") \
                                   .withColumnRenamed("ID_VISITA", "id")

In [None]:
print(symptom_diagnosi.count())
print(symptom_diagnosi.distinct().count())

In [None]:
symptom_diagnosi.write.format("org.neo4j.spark.DataSource")\
                .option("url", url) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", username) \
                .option("authentication.basic.password", password) \
                .option("relationship", "DIAGNOSI") \
                .option("relationship.save.strategy", "keys") \
                .option("relationship.source.labels", ":Visita") \
                .option("relationship.source.save.mode", "overwrite") \
                .option("relationship.source.node.keys", "id") \
                .option("relationship.target.labels", ":Sintomo") \
                .option("relationship.target.node.keys", "nome") \
                .option("relationship.target.save.mode", "overwrite") \
                .option("batch.size", "10000") \
                .mode("Overwrite") \
                .save()

### Farmaco - Visita

In [None]:
print(drugs.dropna().count())
print(drugs.where(drugs.DOSE.isNull()).count())

In [None]:
drugs.write.format("org.neo4j.spark.DataSource")\
      .option("url", url) \
      .option("authentication.type", "basic") \
      .option("authentication.basic.username", username) \
      .option("authentication.basic.password", password) \
      .option("relationship", "PRESCRITTO") \
      .option("relationship.save.strategy", "keys") \
      .option("relationship.source.labels", ":Visita") \
      .option("relationship.source.save.mode", "overwrite") \
      .option("relationship.source.node.keys", "ID_VISITA:id") \
      .option("relationship.target.labels", ":Farmaco") \
      .option("relationship.target.node.keys", "FARMACO_FINALE:nome") \
      .option("relationship.target.save.mode", "overwrite") \
      .option("relationship.properties", "CONFEZIONE:confezione, DOSE:dose")\
      .mode("Append") \
      .save()