Create an FHIR PostGre SQL data base with 6 tables:
1. Patient → demographics (name, age, gender, address).
2. Condition → diagnoses (e.g., “Type 2 Diabetes Mellitus”).
3. Observation → lab results (e.g., “HbA1c = 8.2%”).
4. MedicationRequest → prescribed meds, dosage, refills.
5. Encounter → visits/admissions (when, where, why).
6. Procedure → surgeries, imaging, interventions.

1) The patient FHIR files are located on google drive in:

PATIENT_FHIR_PATH = My Drive\210_Capstone\210_Data\210_DataSets\210_DataSets-EHR\synthea_sample_data_fhir_latest\synthea_sample_data_fhir_latest

2) My working directory where

DEV_PATH = My Drive\210_Capstone\210_Factory\210_dev

The PostGre SQL data bas and its dump should be created in DEV_PATH




# 0. Mount drive and define paths

In [1]:
# Mount Drive
from google.colab import drive
drive.mount('/content/drive')

# Paths (for dumps/output)
DEV_PATH = "/content/drive/MyDrive/210_Capstone/210_Factory/210_dev"
PATIENT_FHIR_PATH = "/content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest"


PATIENT_FHIR_PATH, DEV_PATH


Mounted at /content/drive


('/content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest/synthea_sample_data_fhir_latest',
 '/content/drive/MyDrive/210_Capstone/210_Factory/210_dev')

# 1. Install & start PostgreSQL (PGDATA under DEV_PATH)

In [4]:
!apt-get -y update
!apt-get -y install postgresql postgresql-contrib

!service postgresql start
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';"
!sudo -u postgres createdb synthea_ehr

!echo "✅ PostgreSQL installed, service started, user password set to 'postgres', and DB 'synthea_ehr' created."


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
            Get:2 https://cli.github.com/packages stable InRelease [3,917 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.81)] [Co                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:9 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [3,327 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy

# DDL

In [5]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━[0m [32m2.5/3.0 MB[0m [31m74.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m48.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [6]:
import psycopg2
import os

# Database connection details
DB_NAME = "synthea_ehr"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"

# The DDL script for creating tables and indexes
sql_commands = """
-- Create the patients table
CREATE TABLE IF NOT EXISTS patients (
    patient_id VARCHAR(255) PRIMARY KEY,
    birth_date DATE,
    sex VARCHAR(50),
    race VARCHAR(50),
    ethnicity VARCHAR(50)
);

-- Create the encounters table
CREATE TABLE IF NOT EXISTS encounters (
    encounter_id VARCHAR(255) PRIMARY KEY,
    patient_id VARCHAR(255) REFERENCES patients(patient_id),
    start_datetime TIMESTAMP WITH TIME ZONE,
    end_datetime TIMESTAMP WITH TIME ZONE,
    class VARCHAR(50),
    reason_text TEXT
);

-- Create the conditions table
CREATE TABLE IF NOT EXISTS conditions (
    condition_id VARCHAR(255) PRIMARY KEY,
    patient_id VARCHAR(255) REFERENCES patients(patient_id),
    encounter_id VARCHAR(255) REFERENCES encounters(encounter_id),
    code VARCHAR(50),
    display TEXT,
    onset_datetime TIMESTAMP WITH TIME ZONE,
    abatement_datetime TIMESTAMP WITH TIME ZONE
);

-- Create the observations table
CREATE TABLE IF NOT EXISTS observations (
    observation_id VARCHAR(255) PRIMARY KEY,
    patient_id VARCHAR(255) REFERENCES patients(patient_id),
    encounter_id VARCHAR(255) REFERENCES encounters(encounter_id),
    loinc_code VARCHAR(50),
    display TEXT,
    value_num DECIMAL,
    value_unit VARCHAR(50),
    effective_datetime TIMESTAMP WITH TIME ZONE
);

-- Create the medication_requests table
CREATE TABLE IF NOT EXISTS medication_requests (
    med_request_id VARCHAR(255) PRIMARY KEY,
    patient_id VARCHAR(255) REFERENCES patients(patient_id),
    encounter_id VARCHAR(255) REFERENCES encounters(encounter_id),
    med_name TEXT,
    dose DECIMAL,
    route TEXT,
    start_datetime TIMESTAMP WITH TIME ZONE,
    end_datetime TIMESTAMP WITH TIME ZONE,
    refills INTEGER
);

-- Create the procedures table
CREATE TABLE IF NOT EXISTS procedures (
    procedure_id VARCHAR(255) PRIMARY KEY,
    patient_id VARCHAR(255) REFERENCES patients(patient_id),
    encounter_id VARCHAR(255) REFERENCES encounters(encounter_id),
    code VARCHAR(50),
    display TEXT,
    performed_datetime TIMESTAMP WITH TIME ZONE
);

-- Create indexes for performance
CREATE INDEX IF NOT EXISTS idx_encounters_patient_id ON encounters(patient_id);
CREATE INDEX IF NOT EXISTS idx_conditions_patient_id ON conditions(patient_id);
CREATE INDEX IF NOT EXISTS idx_observations_patient_id ON observations(patient_id);
CREATE INDEX IF NOT EXISTS idx_meds_patient_id ON medication_requests(patient_id);
CREATE INDEX IF NOT EXISTS idx_procedures_patient_id ON procedures(patient_id);

CREATE INDEX IF NOT EXISTS idx_conditions_datetime ON conditions(onset_datetime);
CREATE INDEX IF NOT EXISTS idx_observations_datetime ON observations(effective_datetime);
CREATE INDEX IF NOT EXISTS idx_meds_start_datetime ON medication_requests(start_datetime);
CREATE INDEX IF NOT EXISTS idx_procedures_datetime ON procedures(performed_datetime);

-- Create indexes on code columns for fuzzy searches
CREATE INDEX IF NOT EXISTS idx_conditions_code ON conditions(code);
CREATE INDEX IF NOT EXISTS idx_observations_loinc_code ON observations(loinc_code);
CREATE INDEX IF NOT EXISTS idx_procedures_code ON procedures(code);
"""

try:
    # Connect to the database
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")

    # Create a cursor object
    cursor = conn.cursor()

    # Execute the DDL commands
    cursor.execute(sql_commands)

    # Commit the transaction
    conn.commit()

    print("Tables and indexes created successfully.")

except psycopg2.OperationalError as e:
    print(f"Connection error: {e}")
    print("Make sure the PostgreSQL server is running and the database exists.")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Close the cursor and connection
    if 'cursor' in locals():
        cursor.close()
    if 'conn' in locals():
        conn.close()

Tables and indexes created successfully.


In [7]:
### Table check
import psycopg2
import os

# Database connection details
DB_NAME = "synthea_ehr"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"

try:
    # Connect to the database
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
    cursor = conn.cursor()

    # SQL query to get table and column information
    query = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_schema = 'public'
    ORDER BY table_name, ordinal_position;
    """

    cursor.execute(query)
    results = cursor.fetchall()

    if not results:
        print("No tables found in the database.")
    else:
        current_table = ""
        for row in results:
            table_name, column_name, data_type = row
            if table_name != current_table:
                print(f"\n--- {table_name} ---")
                current_table = table_name
            print(f"  - {column_name} ({data_type})")

except psycopg2.OperationalError as e:
    print(f"Connection error: {e}")
    print("Please ensure the PostgreSQL server is running and the database exists.")

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    if 'cursor' in locals():
        cursor.close()
    if 'conn' in locals():
        conn.close()


--- conditions ---
  - condition_id (character varying)
  - patient_id (character varying)
  - encounter_id (character varying)
  - code (character varying)
  - display (text)
  - onset_datetime (timestamp with time zone)
  - abatement_datetime (timestamp with time zone)

--- encounters ---
  - encounter_id (character varying)
  - patient_id (character varying)
  - start_datetime (timestamp with time zone)
  - end_datetime (timestamp with time zone)
  - class (character varying)
  - reason_text (text)

--- medication_requests ---
  - med_request_id (character varying)
  - patient_id (character varying)
  - encounter_id (character varying)
  - med_name (text)
  - dose (numeric)
  - route (text)
  - start_datetime (timestamp with time zone)
  - end_datetime (timestamp with time zone)
  - refills (integer)

--- observations ---
  - observation_id (character varying)
  - patient_id (character varying)
  - encounter_id (character varying)
  - loinc_code (character varying)
  - display (tex

# Load tables from Patient json records loaded on google drive

In [13]:
from google.colab import drive
drive.mount('/content/drive')
!pip install psycopg2-binary

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
PATIENT_FHIR_PATH = "/content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest"

In [21]:
!cd /content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest

In [22]:
!ls

drive  sample_data


In [23]:
PATIENT_FHIR_PATH

'/content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest'

In [25]:
import psycopg2
import json
import os
import re

# Database connection details from your previous steps
DB_NAME = "synthea_ehr"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"

# The directory containing your patient FHIR JSON files on your mounted Google Drive
# THIS PATH HAS BEEN UPDATED WITH YOUR PROVIDED INFORMATION
PATIENT_FHIR_PATH = "/content/drive/MyDrive/210_Capstone/210_Data/210_DataSets/210_DataSets-EHR/synthea_sample_data_fhir_latest"

def get_db_connection():
    """Establishes and returns a database connection."""
    try:
        conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
        return conn
    except psycopg2.OperationalError as e:
        print(f"Connection error: {e}")
        print("Please ensure the PostgreSQL server is running and the database exists.")
        return None

def extract_reference_id(reference_string):
    """Extracts the UUID from a FHIR reference string."""
    match = re.search(r'urn:uuid:([a-f0-9-]+)', reference_string)
    return match.group(1) if match else None

def extract_race_ethnicity(extensions):
    """Parses race and ethnicity from the patient's extensions."""
    race = 'Unknown'
    ethnicity = 'Unknown'
    if extensions:
        for ext in extensions:
            if ext.get('url') == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
                for sub_ext in ext.get('extension', []):
                    if sub_ext.get('url') == "text":
                        race = sub_ext.get('valueString', 'Unknown')
            elif ext.get('url') == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
                for sub_ext in ext.get('extension', []):
                    if sub_ext.get('url') == "text":
                        ethnicity = sub_ext.get('valueString', 'Unknown')
    return race, ethnicity

def process_fhir_resource(cursor, resource):
    """Processes a single FHIR resource and inserts it into the database."""
    resource_type = resource.get('resourceType')

    if resource_type == "Patient":
        print("    -> Inserting into 'patients' table...")
        patient_id = resource.get('id')
        birth_date = resource.get('birthDate')
        sex = resource.get('gender')
        race, ethnicity = extract_race_ethnicity(resource.get('extension'))
        insert_patient_query = """
        INSERT INTO patients (patient_id, birth_date, sex, race, ethnicity)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (patient_id) DO NOTHING;
        """
        cursor.execute(insert_patient_query, (patient_id, birth_date, sex, race, ethnicity))

    elif resource_type == "Encounter":
        print("    -> Inserting into 'encounters' table...")
        encounter_id = resource.get('id')
        patient_id = extract_reference_id(resource.get('subject', {}).get('reference'))
        start_datetime = resource.get('period', {}).get('start')
        end_datetime = resource.get('period', {}).get('end')
        class_code = resource.get('class', {}).get('code')
        reason_text = resource.get('reasonCode', [{}])[0].get('display')
        insert_encounter_query = """
        INSERT INTO encounters (encounter_id, patient_id, start_datetime, end_datetime, class, reason_text)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (encounter_id) DO NOTHING;
        """
        cursor.execute(insert_encounter_query, (encounter_id, patient_id, start_datetime, end_datetime, class_code, reason_text))

    elif resource_type == "Condition":
        print("    -> Inserting into 'conditions' table...")
        condition_id = resource.get('id')
        patient_id = extract_reference_id(resource.get('subject', {}).get('reference'))
        encounter_id = extract_reference_id(resource.get('encounter', {}).get('reference'))
        code = resource.get('code', {}).get('coding', [{}])[0].get('code')
        display = resource.get('code', {}).get('coding', [{}])[0].get('display')
        onset_datetime = resource.get('onsetDateTime')
        abatement_datetime = resource.get('abatementDateTime')
        insert_condition_query = """
        INSERT INTO conditions (condition_id, patient_id, encounter_id, code, display, onset_datetime, abatement_datetime)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (condition_id) DO NOTHING;
        """
        cursor.execute(insert_condition_query, (condition_id, patient_id, encounter_id, code, display, onset_datetime, abatement_datetime))

    elif resource_type == "Observation":
        print("    -> Inserting into 'observations' table...")
        observation_id = resource.get('id')
        patient_id = extract_reference_id(resource.get('subject', {}).get('reference'))
        encounter_id = extract_reference_id(resource.get('encounter', {}).get('reference'))
        loinc_code = resource.get('code', {}).get('coding', [{}])[0].get('code')
        display = resource.get('code', {}).get('coding', [{}])[0].get('display')
        value_num = resource.get('valueQuantity', {}).get('value')
        value_unit = resource.get('valueQuantity', {}).get('unit')
        effective_datetime = resource.get('effectiveDateTime')
        insert_observation_query = """
        INSERT INTO observations (observation_id, patient_id, encounter_id, loinc_code, display, value_num, value_unit, effective_datetime)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (observation_id) DO NOTHING;
        """
        cursor.execute(insert_observation_query, (observation_id, patient_id, encounter_id, loinc_code, display, value_num, value_unit, effective_datetime))

    elif resource_type == "MedicationRequest":
        print("    -> Inserting into 'medication_requests' table...")
        med_request_id = resource.get('id')
        patient_id = extract_reference_id(resource.get('subject', {}).get('reference'))
        encounter_id = extract_reference_id(resource.get('encounter', {}).get('reference'))
        med_name = resource.get('medicationReference', {}).get('display') or resource.get('medicationCodeableConcept', {}).get('text')
        dose = resource.get('dosageInstruction', [{}])[0].get('doseAndRate', [{}])[0].get('doseQuantity', {}).get('value')
        route = resource.get('dosageInstruction', [{}])[0].get('route', {}).get('text')
        start_datetime = resource.get('dispenseRequest', {}).get('validityPeriod', {}).get('start')
        end_datetime = resource.get('dispenseRequest', {}).get('validityPeriod', {}).get('end')
        refills = resource.get('dispenseRequest', {}).get('numberOfRepeatsAllowed')
        insert_med_request_query = """
        INSERT INTO medication_requests (med_request_id, patient_id, encounter_id, med_name, dose, route, start_datetime, end_datetime, refills)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (med_request_id) DO NOTHING;
        """
        cursor.execute(insert_med_request_query, (med_request_id, patient_id, encounter_id, med_name, dose, route, start_datetime, end_datetime, refills))

    elif resource_type == "Procedure":
        print("    -> Inserting into 'procedures' table...")
        procedure_id = resource.get('id')
        patient_id = extract_reference_id(resource.get('subject', {}).get('reference'))
        encounter_id = extract_reference_id(resource.get('encounter', {}).get('reference'))
        code = resource.get('code', {}).get('coding', [{}])[0].get('code')
        display = resource.get('code', {}).get('coding', [{}])[0].get('display')
        performed_datetime = resource.get('performedDateTime') or resource.get('performedPeriod', {}).get('start')
        insert_procedure_query = """
        INSERT INTO procedures (procedure_id, patient_id, encounter_id, code, display, performed_datetime)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (procedure_id) DO NOTHING;
        """
        cursor.execute(insert_procedure_query, (procedure_id, patient_id, encounter_id, code, display, performed_datetime))

def main():
    """Main function to process all FHIR files in the specified directory."""
    conn = get_db_connection()
    if not conn:
        return

    try:
        cursor = conn.cursor()
        for filename in os.listdir(PATIENT_FHIR_PATH):
            if filename.endswith(".json"):
                file_path = os.path.join(PATIENT_FHIR_PATH, filename)
                print(f"✅ Now processing file: {filename}")
                with open(file_path, 'r') as f:
                    bundle = json.load(f)

                for entry in bundle.get('entry', []):
                    resource = entry.get('resource', {})
                    process_fhir_resource(cursor, resource)

                conn.commit()
                print(f"✅ Successfully ingested all data from {filename}.")

    except FileNotFoundError:
        print(f"❌ Error: The directory '{PATIENT_FHIR_PATH}' was not found.")
        print("Please check the path and make sure your Google Drive is mounted correctly.")
    except json.JSONDecodeError as e:
        print(f"❌ Error decoding JSON file: {e}")
        conn.rollback()
    except Exception as e:
        print(f"❌ An error occurred: {e}")
        conn.rollback()
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

if __name__ == "__main__":
    main()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Inserting into 'observations' table...
    -> Insertin

In [26]:
import psycopg2
import pandas as pd
import os

def get_db_connection():
    """Establishes and returns an active database connection."""
    # Database connection details
    DB_NAME = "synthea_ehr"
    DB_USER = "postgres"
    DB_PASSWORD = "postgres"
    DB_HOST = "localhost"

    try:
        conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
        return conn
    except psycopg2.OperationalError as e:
        print(f"Connection error: {e}")
        print("Please ensure the PostgreSQL server is running and the database exists.")
        return None

def check_inserted_tables():
    """
    Fetches and displays a summary of record counts and unique patient IDs
    for each table in the synthea_ehr database.
    """
    conn = get_db_connection()
    if conn is None:
        return

    try:
        cursor = conn.cursor()

        # SQL query to get record and unique patient counts for all tables
        sql_query = """
        SELECT
            'patients' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM patients
        UNION ALL
        SELECT
            'encounters' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM encounters
        UNION ALL
        SELECT
            'conditions' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM conditions
        UNION ALL
        SELECT
            'observations' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM observations
        UNION ALL
        SELECT
            'medication_requests' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM medication_requests
        UNION ALL
        SELECT
            'procedures' AS table_name,
            COUNT(*) AS record_count,
            COUNT(DISTINCT patient_id) AS unique_patient_id_count
        FROM procedures;
        """

        cursor.execute(sql_query)
        results = cursor.fetchall()

        # Create a pandas DataFrame for a nice summary table
        df = pd.DataFrame(results, columns=['Table Name', 'Total Records', 'Unique Patient IDs'])

        # Display the DataFrame
        print("Summary of Records and Unique Patient IDs per Table:")
        print(df)

    except psycopg2.ProgrammingError as e:
        print(f"Database error: {e}")
        print("Please ensure all tables have been created and populated correctly.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()



In [27]:
# Call the function directly as requested
check_inserted_tables()

Summary of Records and Unique Patient IDs per Table:
            Table Name  Total Records  Unique Patient IDs
0             patients            111                 111
1           encounters           5924                 111
2           conditions           4140                 111
3         observations          60597                 111
4  medication_requests           4926                 107
5           procedures          17993                 111


# Dump the database to google drive for later use.

persist your PostgreSQL database to Google Drive. The standard way to do this is by creating a database dump—a single file containing all the SQL commands needed to recreate your tables and data. You can then save this file to your Google Drive and use it to restore the database in a future session.



*   Back up saved to: Backup saved to: /content/drive/MyDrive/210_Capstone/210_Factory/210_dev/synthea_ehr_backup.sql
*   List item



In [31]:
DEV_PATH

'/content/drive/MyDrive/210_Capstone/210_Factory/210_dev'

In [32]:
import subprocess
import os

# Database connection details
DB_NAME = "synthea_ehr"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"

# Path on Google Drive to save the backup file
BACKUP_PATH = DEV_PATH + "/synthea_ehr_backup.sql"

def persist_database():
    """Dumps the synthea_ehr database to a file on Google Drive."""
    try:
        print("Starting database backup...")

        # Use subprocess to run the pg_dump command
        # The environment variables are passed to the subprocess for authentication
        env = os.environ.copy()
        env['PGPASSWORD'] = DB_PASSWORD

        command = [
            'pg_dump',
            '--host', DB_HOST,
            '--username', DB_USER,
            '--dbname', DB_NAME,
            '--format', 'plain',  # Creates a single SQL script file
            '--file', BACKUP_PATH  # Output file path
        ]

        process = subprocess.run(command, env=env, check=True, capture_output=True, text=True)
        print("✅ Database backup successful!")
        print(f"Backup saved to: {BACKUP_PATH}")

    except FileNotFoundError:
        print("❌ Error: pg_dump command not found. Please ensure PostgreSQL client tools are installed.")
        print("You can try running: !apt-get update && !apt-get install -y postgresql-client")
    except subprocess.CalledProcessError as e:
        print("❌ Error during backup process.")
        print(f"Subprocess returned non-zero exit code: {e.returncode}")
        print(f"STDOUT:\n{e.stdout}")
        print(f"STDERR:\n{e.stderr}")
    except Exception as e:
        print(f"❌ An unexpected error occurred: {e}")

persist_database()

Starting database backup...
✅ Database backup successful!
Backup saved to: /content/drive/MyDrive/210_Capstone/210_Factory/210_dev/synthea_ehr_backup.sql


# Restore databse

In [33]:
import subprocess
import os

# Database connection details
DB_NAME = "synthea_ehr"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"

# Path on Google Drive to the backup file
BACKUP_PATH = DEV_PATH + "/synthea_ehr_backup.sql"

def restore_database():
    """Restores the synthea_ehr database from a file on Google Drive."""
    try:
        print("Starting database restore...")

        # First, drop and re-create the database to ensure a clean state
        print("Dropping and re-creating the database for a clean restore...")
        env = os.environ.copy()
        env['PGPASSWORD'] = DB_PASSWORD

        # Command to drop the database
        drop_command = [
            'dropdb',
            '--host', DB_HOST,
            '--username', DB_USER,
            DB_NAME
        ]
        # This will fail if the DB doesn't exist, so we don't check for errors
        subprocess.run(drop_command, env=env, check=False, capture_output=True, text=True)

        # Command to create the database
        create_command = [
            'createdb',
            '--host', DB_HOST,
            '--username', DB_USER,
            DB_NAME
        ]
        subprocess.run(create_command, env=env, check=True, capture_output=True, text=True)
        print("Database re-created successfully.")

        # Use subprocess to run the psql command to restore the backup
        command = [
            'psql',
            '--host', DB_HOST,
            '--username', DB_USER,
            '--dbname', DB_NAME,
            '--file', BACKUP_PATH
        ]

        process = subprocess.run(command, env=env, check=True, capture_output=True, text=True)
        print("✅ Database restore successful!")

    except FileNotFoundError:
        print("❌ Error: psql or dropdb/createdb commands not found. Please ensure PostgreSQL client tools are installed.")
        print("You can try running: !apt-get update && !apt-get install -y postgresql-client")
    except subprocess.CalledProcessError as e:
        print("❌ Error during restore process.")
        print(f"Subprocess returned non-zero exit code: {e.returncode}")
        print(f"STDOUT:\n{e.stdout}")
        print(f"STDERR:\n{e.stderr}")
    except Exception as e:
        print(f"❌ An unexpected error occurred: {e}")

restore_database()

Starting database restore...
Dropping and re-creating the database for a clean restore...
Database re-created successfully.
✅ Database restore successful!
