<a href="https://colab.research.google.com/github/danb-neo4j/patient_journey/blob/main/patientJourney_dataLoad.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Neo4j GDS Patient Journey Demo: Data Loading
This notebook walks through the process of loading [Synthea](https://synthea.mitre.org/) data into Neo4j. The code and data are based upon those referenced in [Graph Data Processing with Cypher](https://github.com/PacktPublishing/Cypher-Querying). 

*Last updated: 23 March 2023*

# Import Libraries

In [1]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

import matplotlib.pyplot as plt 
import seaborn as sns
sns.set_style('darkgrid')
sns.set_palette("colorblind")
sns.set(rc={'figure.figsize':(12,7)})

import os
import configparser
from IPython.display import Image

In [2]:
# install or import Neo4j GraphDataScience library
try: 
  from graphdatascience import GraphDataScience
  print('Successfully imported GraphDataScience')
except ModuleNotFoundError:
  !pip install graphdatascience
  from graphdatascience import GraphDataScience
  print('installed and imported GraphDataScience')

Successfully imported GraphDataScience


# Mount Google Drive

In [3]:
from google.colab import drive 
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Connect to Neo4j Database

In [4]:
# change directory
os.chdir('drive/MyDrive/Colab Notebooks/gds_demos/patient_journey/')

In [5]:
# set data directory
DATA_DIRECTORY = 'data/'

In [6]:
# import utility function to connect to Neo4j DB
from neoUtils import read_neo4j_properties

In [7]:
# read in Neo4j host and authentication 
NEO4J_PROPERTIES_FILE = 'auth/patientJourney_auth.ini'
HOST, USERNAME, PASSWORD = read_neo4j_properties(NEO4J_PROPERTIES_FILE=NEO4J_PROPERTIES_FILE)

Using HOST, USERNAME, PASSWORD from .ini file


In [8]:
# connect to and instantiate GDS
gds = GraphDataScience(HOST, auth=(USERNAME, PASSWORD), aura_ds=True)

# confirm connection with gds version 
print('Neo4j GDS Version:', gds.version())

Neo4j GDS Version: 2.3.2+14


In [None]:
# code to delete database, if necessary
gds.run_cypher('''
MATCH (n) 
CALL { WITH n 
DETACH DELETE n 
} IN TRANSACTIONS OF 50000 ROWS
''')

# Load Patient Data

In [None]:
patient_df = pd.read_csv(DATA_DIRECTORY + 'patients.csv')
patient_df.shape

## Patient Constraints

In [None]:
# source file patient_indexes.cql
gds.run_cypher('''CREATE CONSTRAINT patient_id IF NOT EXISTS FOR (n:Patient) REQUIRE n.id IS UNIQUE''')
gds.run_cypher('''CREATE CONSTRAINT zipcode_id IF NOT EXISTS FOR (n:ZipCode) REQUIRE n.zip IS UNIQUE''')
gds.run_cypher('''CREATE CONSTRAINT race_id IF NOT EXISTS FOR (n:Race) REQUIRE n.type IS UNIQUE''')
gds.run_cypher('''CREATE CONSTRAINT ethnicity_id IF NOT EXISTS FOR (n:Ethnicity) REQUIRE n.type IS UNIQUE''')

## Patient Data and Relationships

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted
UNWIND $patient_data AS row

CALL {
WITH row
MERGE (p:Patient {id: row.Id})
SET
    p.marital = row.MARITAL,
    p.ssn = row.SSN,
    p.firstName = row.FIRST,
    p.lastName = row.LAST,
    p.suffix = row.SUFFIX,
    p.prefix = row.PREFIX,
    p.city = row.CITY,
    p.county = row.COUNTY,
    p.location = POINT({latitude:toFloat(row.LAT), longitude:toFloat(row.LON)}),
    p.drivers = row.DRIVERS,
    p.birthDate = DATE(row.BIRTHDATE),
    p.expenses = row.HEALTHCARE_EXPENSES,
    p.coverage = row.HEALTHCARE_COVERAGE

WITH row, p
MERGE (r:Race {type: row.RACE})
MERGE (p)-[:HAS_RACE]->(r)

WITH row, p
MERGE (e:Ethnicity {type: row.ETHNICITY})
MERGE (p)-[:HAS_ETHNICITY]->(e)

WITH row, p
WHERE row.ZIP IS NOT NULL
MERGE (z:ZipCode {zip: row.ZIP})
MERGE (p)-[:HAS_ZIPCODE]->(z)
} IN TRANSACTIONS OF 1000 ROWS
''', {'patient_data': patient_df.to_dict('records')})

# Load Encounters Data

In [None]:
encounters_df = pd.read_csv(DATA_DIRECTORY + 'encounters.csv')
encounters_df.shape

In [None]:
encounters_df.info()

## Create Encounter Index

In [None]:
gds.run_cypher('''CREATE INDEX encounter_id IF NOT EXISTS FOR (n:Encounter) ON n.id''')

## Create Encounter Constraints

In [None]:
gds.run_cypher('''CREATE CONSTRAINT snomed_id IF NOT EXISTS FOR (n:SNOMED_CT) REQUIRE n.code IS UNIQUE''')
gds.run_cypher('''CREATE CONSTRAINT provider_id IF NOT EXISTS FOR (n:Provider) REQUIRE n.id IS UNIQUE''')
gds.run_cypher('''CREATE CONSTRAINT organization_id IF NOT EXISTS FOR (n:Organization) REQUIRE n.id IS UNIQUE''')

## Encounter Data and Relationships

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $encounters_data AS row
CALL {
WITH row
MERGE(e:Encounter {id: row.Id})
SET
    e.date=datetime(row.START),
    e.description=row.DESCRIPTION,
    e.isEnd = false,
    e.totalCost = row.TOTAL_CLAIM_COST

FOREACH (ignore in CASE WHEN row.STOP IS NOT NULL AND row.STOP <> '' THEN [1] ELSE [] END |
      SET e.end=datetime(row.STOP)
    )
FOREACH (ignore in CASE WHEN row.CODE IS NOT NULL AND row.CODE <> '' THEN [1] ELSE [] END |
      MERGE(s:SNOMED_CT {code:row.CODE})
      MERGE(e)-[:OF_TYPE]->(s)
    )
WITH row,e
// CALL apoc.create.setLabels( e, [ 'Encounter', row.ENCOUNTERCLASS ] ) YIELD node
CALL apoc.create.setLabels( e, [ 'Encounter', toUpper(left(row.ENCOUNTERCLASS, 1)) + right(row.ENCOUNTERCLASS, size(row.ENCOUNTERCLASS) - 1) ] ) 
YIELD node
    
WITH row,e
MERGE(p:Patient {id: row.PATIENT})
MERGE (p)-[:HAS_ENCOUNTER]->(e)

WITH row,e
MERGE (provider:Provider {id:row.PROVIDER})
MERGE(e)-[:HAS_PROVIDER]->(provider)
FOREACH (ignore in CASE WHEN row.ORGANIZATION IS NOT
    NULL AND row.ORGANIZATION <> '' THEN [1] ELSE [] END |
      MERGE (o:Organization {id: row.ORGANIZATION})
      MERGE (e)-[:HAS_ORGANIZATION]->(o))
} IN TRANSACTIONS OF 1000 ROWS
''', {'encounters_data': encounters_df.to_dict('records')})

# Load Provider Data

In [None]:
provider_df = pd.read_csv(DATA_DIRECTORY + 'providers.csv')
provider_df.shape

## Create Provider Constraint

In [None]:
gds.run_cypher('''CREATE CONSTRAINT specialty_id IF NOT EXISTS FOR (n:Specialty) REQUIRE n.name IS UNIQUE''')

## Load Provider Data

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $provider_data AS row
CALL {
WITH row
MERGE (p:Provider {id: row.Id})
SET 
    p.name=row.NAME,
    p.gender=row.GENDER,
    p.address = row.ADDRESS,
    p.state = row.STATE,
    p.location = point({latitude:toFloat(row.LAT),
                        longitude:toFloat(row.LON)})
WITH row, p
MERGE (o:Organization {id: row.ORGANIZATION})
MERGE(p)-[:BELONGS_TO]->(o)

WITH row, p
MERGE (s:Specialty {name: row.SPECIALITY})
MERGE (p)-[:HAS_SPECIALTY]->(s)

WITH row, p
WHERE row.ZIP IS NOT NULL
MERGE (z:ZipCode {zip: row.ZIP})
MERGE (p)-[:HAS_ZIPCODE]->(z)

} IN TRANSACTIONS OF 1000 ROWS
''', {'provider_data': provider_df.to_dict('records')})

# Load Organization Data

In [None]:
organization_df = pd.read_csv(DATA_DIRECTORY + 'organizations.csv')

## Load Organization Data

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $organization_data AS row
CALL {
WITH row
MERGE (o:Organization {id: row.Id})
SET
    o.name = row.NAME,
    o.address = row.ADDRESS,
    o.state = row.STATE,
    o.location = point({latitude:toFloat(row.LAT),
                        longitude:toFLoat(row.LON)})

WITH row, o
WHERE row.ZIP IS NOT NULL
MERGE (z:ZipCode {zip: row.ZIP})
MERGE (o)-[:HAS_ZIPCODE]->(z)
} IN TRANSACTIONS OF 1000 ROWS
''', {'organization_data': organization_df.to_dict('records')})

# Load Medications Data

In [None]:
medications_df = pd.read_csv(DATA_DIRECTORY + 'medications.csv')
medications_df = medications_df.fillna('')
medications_df.shape

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $medications_data AS row
CALL {
WITH row
MERGE (p:Patient {id: row.PATIENT})
MERGE (d:Drug {code: row.CODE})
    SET d.description = row.DESCRIPTION
MERGE (ps:Encounter {id: row.ENCOUNTER, isEnd: false})
MERGE (ps)-[:HAS_DRUG]->(d)
MERGE (p)-[:HAS_ENCOUNTER]->(ps)

FOREACH (ignore in CASE WHEN 
                        row.REASONCODE IS NOT NULL AND
                        row.REASONCODE <> '' THEN [1] ELSE [] END | 
        MERGE (s:SNOMED_CT {code: row.CODE})
        SET s:Diagnosis, s.description = row.REASONDESCRIPTION
        MERGE (ps)-[:HAS_DIAGNOSIS]->(s)
)

WITH row, ps, p
  WHERE row.STOP IS NOT NULL and row.STOP <> ''
CREATE (pe:Encounter {id:row.ENCOUNTER, date:datetime(row.STOP)})
SET pe.isEnd=true
CREATE (p)-[:HAS_ENCOUNTER]->(pe)
CREATE (pe)-[:HAS_DRUG]->(d)
CREATE (ps)-[:HAS_END]->(pe)
} IN TRANSACTIONS OF 1000 ROWS
''', {'medications_data': medications_df.to_dict('records')})

# Load Conditions Data

In [None]:
conditions_df = pd.read_csv(DATA_DIRECTORY + 'conditions.csv')
conditions_df = conditions_df.fillna('')
conditions_df.shape

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $conditions_data AS row
CALL {
WITH row
MATCH (p:Patient {id:row.PATIENT})
MERGE (c:SNOMED_CT {code:row.CODE})
    SET c.description=row.DESCRIPTION, c:Condition

MERGE (cs:Encounter {id:row.ENCOUNTER, isEnd: false})
  ON CREATE
  SET cs.date=datetime(row.START)

MERGE (p)-[:HAS_ENCOUNTER]->(cs)
MERGE (cs)-[:HAS_CONDITION]->(c)

WITH p,c,cs,row
WHERE row.STOP IS NOT NULL and row.STOP <> ''
MERGE (ce:Encounter {id:row.ENCOUNTER,
                     date:datetime(row.STOP)})
    SET ce.isEnd=true

MERGE (p)-[:HAS_ENCOUNTER]->(ce)
MERGE (ce)-[:HAS_CONDITION]->(c)
MERGE (cs)-[:HAS_END]->(ce)
} IN TRANSACTIONS OF 1000 ROWS
''', {'conditions_data': conditions_df.to_dict('records')})

# Load Procedures Data

In [None]:
procedures_df = pd.read_csv(DATA_DIRECTORY + 'procedures.csv')
procedures_df=conditions_df.fillna('')
procedures_df.shape

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $procedures_data AS row
CALL {
WITH row
MATCH (p:Patient {id:row.PATIENT})
MERGE (c:SNOMED_CT {code:row.CODE})
    SET c.description=row.DESCRIPTION, c:Procedure

MERGE (cs:Encounter {id:row.ENCOUNTER, isEnd: false})
  ON CREATE
  SET cs.date=datetime(row.START)

MERGE (p)-[:HAS_ENCOUNTER]->(cs)
MERGE (cs)-[:HAS_PROCEDURE]->(c)
} IN TRANSACTIONS OF 1000 ROWS
''', {'procedures_data': procedures_df.to_dict('records')})

# Load Allergies Data

In [None]:
allergies_df = pd.read_csv(DATA_DIRECTORY + 'allergies.csv')
allergies_df = allergies_df.fillna('')
allergies_df.shape

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $allergies_data AS row
CALL {
WITH row
MATCH (p:Patient {id:row.PATIENT})
MERGE (c:SNOMED_CT {code:row.CODE})
    SET c.description=row.DESCRIPTION, c:Allergy

MERGE (cs:Encounter {id:row.ENCOUNTER, isEnd: false})
  ON CREATE
  SET cs.date=datetime(row.START)

MERGE (p)-[:HAS_ENCOUNTER]->(cs)
MERGE (cs)-[:ALLERGY_STARTED]->(c)

WITH p,c,cs,row
  WHERE row.STOP IS NOT NULL and row.STOP <> ''
MERGE (ce:Encounter {id:row.ENCOUNTER,
                     date:datetime(row.STOP)})
    SET ce.isEnd=true

MERGE (p)-[:HAS_ENCOUNTER]->(ce)
MERGE (ce)-[:ALLERGY_ENDED]->(c)
MERGE (cs)-[:HAS_END]->(ce)
} IN TRANSACTIONS OF 1000 ROWS
''', {'allergies_data': allergies_df.to_dict('records')})

# Load Care Plan Data

In [None]:
careplans_df = pd.read_csv(DATA_DIRECTORY + 'careplans.csv')
careplans_df=careplans_df.fillna('')
careplans_df.shape

In [None]:
gds.run_cypher('''
CYPHER runtime=slotted

UNWIND $careplans_data AS row
CALL {
WITH row
MATCH (p:Patient {id:row.PATIENT})
MERGE (cp:CarePlan {code:row.Id})
MERGE (c:SNOMED_CT {code:row.CODE})
  SET c.description=row.DESCRIPTION, c:Care

MERGE (cp)-[:HAS_CARE_TYPE]->(c)
MERGE (cs:Encounter {id:row.ENCOUNTER, isEnd: false})
  ON CREATE
  SET cs.date=datetime(row.START)

MERGE (cs)-[:HAS_CARE_TYPE]->(c)
MERGE (p)-[:HAS_ENCOUNTER]->(cs)
MERGE (cs)-[:CARE_PLAN_START]->(cp)

WITH p,cp,cs,row
  WHERE row.STOP IS NOT NULL and row.STOP <> ''
CREATE (ce:Encounter {id:row.ENCOUNTER, date:datetime(row.STOP)})
  SET ce.code=row.CODE, ce.isEnd=true

MERGE (p)-[:HAS_ENCOUNTER]->(ce)
MERGE (ce)-[:CARE_PLAN_END]->(cp)
MERGE (cs)-[:HAS_END]->(ce)
} IN TRANSACTIONS OF 1000 ROWS
''', {'careplans_data': careplans_df.to_dict('records')})