## Import Library 

In [None]:
import pandas as pd
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import json

## Read Data

In [2]:
admission = pd.read_csv('ADMISSIONS.csv')
patient = pd.read_csv('PATIENTS.csv')
services = pd.read_csv('SERVICES.csv')
transferes = pd.read_csv('TRANSFERS.csv')
labeventes = pd.read_csv('LABEVENTS.csv')
icustay = pd.read_csv('ICUSTAYS.csv')
dignoses_icd = pd.read_csv('DIAGNOSES_ICD.csv')
d_labitems = pd.read_csv('D_LABITEMS.csv')
d_icd_dignoses = pd.read_csv('D_ICD_DIAGNOSES.csv')


## Data Preprocessing

### Table --> admissions

In [20]:
admission.head()

Unnamed: 0,row_id,subject_id,hadm_id,admittime,dischtime,deathtime,admission_type,admission_location,discharge_location,insurance,language,religion,marital_status,ethnicity,edregtime,edouttime,diagnosis,hospital_expire_flag,has_chartevents_data
0,12258,10006,142345,2164-10-23 21:09:00,2164-11-01 17:15:00,,EMERGENCY,EMERGENCY ROOM ADMIT,HOME HEALTH CARE,Medicare,,CATHOLIC,SEPARATED,BLACK/AFRICAN AMERICAN,2164-10-23 16:43:00,2164-10-23 23:00:00,SEPSIS,0,1
1,12263,10011,105331,2126-08-14 22:32:00,2126-08-28 18:59:00,2126-08-28 18:59:00,EMERGENCY,TRANSFER FROM HOSP/EXTRAM,DEAD/EXPIRED,Private,,CATHOLIC,SINGLE,UNKNOWN/NOT SPECIFIED,,,HEPATITIS B,1,1
2,12265,10013,165520,2125-10-04 23:36:00,2125-10-07 15:13:00,2125-10-07 15:13:00,EMERGENCY,TRANSFER FROM HOSP/EXTRAM,DEAD/EXPIRED,Medicare,,CATHOLIC,,UNKNOWN/NOT SPECIFIED,,,SEPSIS,1,1
3,12269,10017,199207,2149-05-26 17:19:00,2149-06-03 18:42:00,,EMERGENCY,EMERGENCY ROOM ADMIT,SNF,Medicare,,CATHOLIC,DIVORCED,WHITE,2149-05-26 12:08:00,2149-05-26 19:45:00,HUMERAL FRACTURE,0,1
4,12270,10019,177759,2163-05-14 20:43:00,2163-05-15 12:00:00,2163-05-15 12:00:00,EMERGENCY,TRANSFER FROM HOSP/EXTRAM,DEAD/EXPIRED,Medicare,,CATHOLIC,DIVORCED,WHITE,,,ALCOHOLIC HEPATITIS,1,1


In [11]:
admission.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 129 entries, 0 to 128
Data columns (total 19 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   row_id                129 non-null    int64 
 1   subject_id            129 non-null    int64 
 2   hadm_id               129 non-null    int64 
 3   admittime             129 non-null    object
 4   dischtime             129 non-null    object
 5   deathtime             40 non-null     object
 6   admission_type        129 non-null    object
 7   admission_location    129 non-null    object
 8   discharge_location    129 non-null    object
 9   insurance             129 non-null    object
 10  language              81 non-null     object
 11  religion              128 non-null    object
 12  marital_status        113 non-null    object
 13  ethnicity             129 non-null    object
 14  edregtime             92 non-null     object
 15  edouttime             92 non-null     ob

In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "Admission",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "admittime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "dischtime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "deathtime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "admission_type", "type": ["string", "null"]},
        {"name": "admission_location", "type": ["string", "null"]},
        {"name": "discharge_location", "type": ["string", "null"]},
        {"name": "insurance", "type": ["string", "null"]},
        {"name": "language", "type": ["string", "null"]},
        {"name": "religion", "type": ["string", "null"]},
        {"name": "marital_status", "type": ["string", "null"]},
        {"name": "ethnicity", "type": ["string", "null"]},
        {"name": "edregtime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "edouttime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "diagnosis", "type": ["string", "null"]},
        {"name": "hospital_expire_flag", "type": ["int", "null"]},
        {"name": "has_chartevents_data", "type": ["int", "null"]}
    ]
}

# Load and clean the data
df = pd.read_csv('ADMISSIONS.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert timestamp columns to datetime64[ns]
timestamp_columns = ["admittime", "dischtime", "deathtime", "edregtime", "edouttime"]
for col in timestamp_columns:
    if col in df.columns:
        # Assuming string format like 'YYYY-MM-DD HH:MM:SS'; adjust format if different
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d %H:%M:%S')

# Convert integer columns and handle missing values
for col in ['row_id', 'subject_id', 'hadm_id', 'hospital_expire_flag', 'has_chartevents_data']:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = [
    'admission_type', 'admission_location', 'discharge_location', 
    'insurance', 'language', 'religion', 'marital_status', 'ethnicity', 'diagnosis'
]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in timestamp_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "admissions.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

### Table --> patients

In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "Patient",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "gender", "type": ["string", "null"]},
        {"name": "dob", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "dod", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "dod_hosp", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "dod_ssn", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "expire_flag", "type": ["int", "null"]}
    ]
}

# Load and clean the data
df = patient
df = df.drop_duplicates()

# Convert date columns to Timestamp
date_columns = ["dob", "dod", "dod_hosp", "dod_ssn"]
for col in date_columns:
    if col in df.columns:
        # Adjust format or unit based on your data (e.g., 'ms' for milliseconds, 's' for seconds)
        df[col] = pd.to_datetime(df[col], errors='coerce', unit='ms' if df[col].dtype == 'int64' else None)

# Convert integer columns and handle missing values
for col in ['row_id', 'subject_id', 'expire_flag']:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0 or adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")
if 'gender' in df.columns:
    df['gender'] = df['gender'].fillna('')  # Fill NA with empty string
else:
    raise ValueError("Column gender not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in date_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
#C:\Users\ahmed\OneDrive\Desktop\w\avrotest
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","w","avrotest","patients.avro")
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

### Table --> icustay

In [10]:
icustay.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 136 entries, 0 to 135
Data columns (total 12 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   row_id          136 non-null    int64  
 1   subject_id      136 non-null    int64  
 2   hadm_id         136 non-null    int64  
 3   icustay_id      136 non-null    int64  
 4   dbsource        136 non-null    object 
 5   first_careunit  136 non-null    object 
 6   last_careunit   136 non-null    object 
 7   first_wardid    136 non-null    int64  
 8   last_wardid     136 non-null    int64  
 9   intime          136 non-null    object 
 10  outtime         136 non-null    object 
 11  los             136 non-null    float64
dtypes: float64(1), int64(6), object(5)
memory usage: 12.9+ KB


In [11]:
icustay.head(5)

Unnamed: 0,row_id,subject_id,hadm_id,icustay_id,dbsource,first_careunit,last_careunit,first_wardid,last_wardid,intime,outtime,los
0,12742,10006,142345,206504,carevue,MICU,MICU,52,52,2164-10-23 21:10:15,2164-10-25 12:21:07,1.6325
1,12747,10011,105331,232110,carevue,MICU,MICU,15,15,2126-08-14 22:34:00,2126-08-28 18:59:00,13.8507
2,12749,10013,165520,264446,carevue,MICU,MICU,15,15,2125-10-04 23:38:00,2125-10-07 15:13:52,2.6499
3,12754,10017,199207,204881,carevue,CCU,CCU,7,7,2149-05-29 18:52:29,2149-05-31 22:19:17,2.1436
4,12755,10019,177759,228977,carevue,MICU,MICU,15,15,2163-05-14 20:43:56,2163-05-16 03:47:04,1.2938


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "ICUStay",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "icustay_id", "type": ["int", "null"]},
        {"name": "dbsource", "type": ["string", "null"]},
        {"name": "first_careunit", "type": ["string", "null"]},
        {"name": "last_careunit", "type": ["string", "null"]},
        {"name": "first_wardid", "type": ["int", "null"]},
        {"name": "last_wardid", "type": ["int", "null"]},
        {"name": "intime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "outtime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "los", "type": ["double", "null"]}
    ]
}

# Load and clean the data
df = icustay #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert timestamp columns to datetime64[ns]
timestamp_columns = ["intime", "outtime"]
for col in timestamp_columns:
    if col in df.columns:
        # Assuming string format like 'YYYY-MM-DD HH:MM:SS'; adjust format if different
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d %H:%M:%S')

# Convert integer columns and handle missing values
int_columns = ["row_id", "subject_id", "hadm_id", "icustay_id", "first_wardid", "last_wardid"]
for col in int_columns:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["dbsource", "first_careunit", "last_careunit"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert los to float64 and handle missing values
if 'los' in df.columns:
    df['los'] = df['los'].fillna(0.0).astype('float64')  # Fill NA with 0.0; adjust as needed
else:
    raise ValueError("Column los not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in timestamp_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "icustay_id.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 12742, 'subject_id': 10006, 'hadm_id': 142345, 'icustay_id': 206504, 'dbsource': 'carevue', 'first_careunit': 'MICU', 'last_careunit': 'MICU', 'first_wardid': 52, 'last_wardid': 52, 'intime': datetime.datetime(2164, 10, 23, 21, 10, 15, tzinfo=datetime.timezone.utc), 'outtime': datetime.datetime(2164, 10, 25, 12, 21, 7, tzinfo=datetime.timezone.utc), 'los': 1.6325}


### Table --> dignoses_icd

In [14]:
dignoses_icd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1761 entries, 0 to 1760
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   row_id      1761 non-null   int64 
 1   subject_id  1761 non-null   int64 
 2   hadm_id     1761 non-null   int64 
 3   seq_num     1761 non-null   int64 
 4   icd9_code   1761 non-null   object
dtypes: int64(4), object(1)
memory usage: 68.9+ KB


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "Diagnosis",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "seq_num", "type": ["int", "null"]},
        {"name": "icd9_code", "type": ["string", "null"]}
    ]
}

# Load and clean the data
df = dignoses_icd #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert integer columns and handle missing values
int_columns = ["row_id", "subject_id", "hadm_id", "seq_num"]
for col in int_columns:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string column and handle missing values
if 'icd9_code' in df.columns:
    df['icd9_code'] = df['icd9_code'].fillna('').astype('string')  # Fill NA with empty string
else:
    raise ValueError("Column icd9_code not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "diagnoses.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 112344, 'subject_id': 10006, 'hadm_id': 142345, 'seq_num': 1, 'icd9_code': '99591'}


### Table --> services

In [16]:
services.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 163 entries, 0 to 162
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   row_id        163 non-null    int64 
 1   subject_id    163 non-null    int64 
 2   hadm_id       163 non-null    int64 
 3   transfertime  163 non-null    object
 4   prev_service  34 non-null     object
 5   curr_service  163 non-null    object
dtypes: int64(3), object(3)
memory usage: 7.8+ KB


In [26]:
services.head(5)

Unnamed: 0,row_id,subject_id,hadm_id,transfertime,prev_service,curr_service
0,14974,10006,142345,2164-10-23 21:10:15,,MED
1,14979,10011,105331,2126-08-14 22:34:00,,MED
2,14981,10013,165520,2125-10-04 23:38:00,,MED
3,14985,10017,199207,2149-05-26 17:21:09,,MED
4,14986,10019,177759,2163-05-14 20:43:56,,MED


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "Transfer",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "transfertime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "prev_service", "type": ["string", "null"]},
        {"name": "curr_service", "type": ["string", "null"]}
    ]
}

# Load and clean the data
df = services  # Replace with actual file path
df = df.drop_duplicates()

# Convert timestamp column to datetime64[ns]
timestamp_columns = ["transfertime"]
for col in timestamp_columns:
    if col in df.columns:
        # Assuming string format like 'YYYY-MM-DD HH:MM:SS'; adjust format if different
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d %H:%M:%S')

# Convert integer columns and handle missing values
int_columns = ["row_id", "subject_id", "hadm_id"]
for col in int_columns:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["prev_service", "curr_service"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in timestamp_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "transfers.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 14974, 'subject_id': 10006, 'hadm_id': 142345, 'transfertime': datetime.datetime(2164, 10, 23, 21, 10, 15, tzinfo=datetime.timezone.utc), 'prev_service': '', 'curr_service': 'MED'}


### Table --> transferes

In [18]:
transferes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 524 entries, 0 to 523
Data columns (total 13 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   row_id         524 non-null    int64  
 1   subject_id     524 non-null    int64  
 2   hadm_id        524 non-null    int64  
 3   icustay_id     167 non-null    float64
 4   dbsource       524 non-null    object 
 5   eventtype      524 non-null    object 
 6   prev_careunit  167 non-null    object 
 7   curr_careunit  167 non-null    object 
 8   prev_wardid    395 non-null    float64
 9   curr_wardid    395 non-null    float64
 10  intime         524 non-null    object 
 11  outtime        395 non-null    object 
 12  los            395 non-null    float64
dtypes: float64(4), int64(3), object(6)
memory usage: 53.3+ KB


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "CareEvent",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "icustay_id", "type": ["int", "null"]},
        {"name": "dbsource", "type": ["string", "null"]},
        {"name": "eventtype", "type": ["string", "null"]},
        {"name": "prev_careunit", "type": ["string", "null"]},
        {"name": "curr_careunit", "type": ["string", "null"]},
        {"name": "prev_wardid", "type": ["int", "null"]},
        {"name": "curr_wardid", "type": ["int", "null"]},
        {"name": "intime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "outtime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "los", "type": ["double", "null"]}
    ]
}

# Load and clean the data
df = transferes #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert timestamp columns to datetime64[ns]
timestamp_columns = ["intime", "outtime"]
for col in timestamp_columns:
    if col in df.columns:
        # Assuming string format like 'YYYY-MM-DD HH:MM:SS'; adjust format if different
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d %H:%M:%S')

# Convert integer columns and handle missing values
int_columns = ["row_id", "subject_id", "hadm_id", "icustay_id", "prev_wardid", "curr_wardid"]
for col in int_columns:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('int32')  # Convert float to int, fill NA with 0
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["dbsource", "eventtype", "prev_careunit", "curr_careunit"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert los to float64 and handle missing values
if 'los' in df.columns:
    df['los'] = df['los'].fillna(0.0).astype('float64')  # Fill NA with 0.0
else:
    raise ValueError("Column los not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in timestamp_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "transferes.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 54440, 'subject_id': 10006, 'hadm_id': 142345, 'icustay_id': 206504, 'dbsource': 'carevue', 'eventtype': 'admit', 'prev_careunit': '', 'curr_careunit': 'MICU', 'prev_wardid': 0, 'curr_wardid': 52, 'intime': datetime.datetime(2164, 10, 23, 21, 10, 15, tzinfo=datetime.timezone.utc), 'outtime': datetime.datetime(2164, 10, 25, 12, 21, 7, tzinfo=datetime.timezone.utc), 'los': 39.18}


### Table --> labeventes

In [19]:
labeventes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 76074 entries, 0 to 76073
Data columns (total 9 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   row_id      76074 non-null  int64  
 1   subject_id  76074 non-null  int64  
 2   hadm_id     61812 non-null  float64
 3   itemid      76074 non-null  int64  
 4   charttime   76074 non-null  object 
 5   value       76070 non-null  object 
 6   valuenum    67030 non-null  float64
 7   valueuom    66669 non-null  object 
 8   flag        29737 non-null  object 
dtypes: float64(2), int64(3), object(4)
memory usage: 5.2+ MB


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "ChartEvent",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "subject_id", "type": ["int", "null"]},
        {"name": "hadm_id", "type": ["int", "null"]},
        {"name": "itemid", "type": ["int", "null"]},
        {"name": "charttime", "type": [{"type": "long", "logicalType": "timestamp-millis"}, "null"]},
        {"name": "value", "type": ["string", "null"]},
        {"name": "valuenum", "type": ["double", "null"]},
        {"name": "valueuom", "type": ["string", "null"]},
        {"name": "flag", "type": ["string", "null"]}
    ]
}

# Load and clean the data
df = labeventes #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert timestamp column to datetime64[ns]
timestamp_columns = ["charttime"]
for col in timestamp_columns:
    if col in df.columns:
        # Assuming string format like 'YYYY-MM-DD HH:MM:SS'; adjust format if different
        df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d %H:%M:%S')

# Convert integer columns and handle missing values
int_columns = ["row_id", "subject_id", "hadm_id", "itemid"]
for col in int_columns:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype('int32')  # Convert float to int for hadm_id
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["value", "valueuom", "flag"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert valuenum to float64 and handle missing values
if 'valuenum' in df.columns:
    df['valuenum'] = df['valuenum'].fillna(0.0).astype('float64')  # Fill NA with 0.0
else:
    raise ValueError("Column valuenum not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')
for record in records:
    for col in timestamp_columns:
        if col in record and not pd.isna(record[col]):
            record[col] = int(record[col].value // 10**6)  # Convert to milliseconds
        else:
            record[col] = None

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "labeventes.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 6244563, 'subject_id': 10006, 'hadm_id': 0, 'itemid': 50868, 'charttime': datetime.datetime(2164, 9, 24, 20, 21, tzinfo=datetime.timezone.utc), 'value': '19', 'valuenum': 19.0, 'valueuom': 'mEq/L', 'flag': ''}


### Table --> d_icd_dignoses

In [22]:
d_icd_dignoses.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14567 entries, 0 to 14566
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   row_id       14567 non-null  int64 
 1   icd9_code    14567 non-null  object
 2   short_title  14567 non-null  object
 3   long_title   14567 non-null  object
dtypes: int64(1), object(3)
memory usage: 455.3+ KB


In [None]:
import pandas as pd
import json
from fastavro import writer, parse_schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "ICD9Dictionary",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "icd9_code", "type": ["string", "null"]},
        {"name": "short_title", "type": ["string", "null"]},
        {"name": "long_title", "type": ["string", "null"]}
    ]
}

# Load and clean the data
df = d_icd_dignoses #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert integer column and handle missing values
int_columns = ["row_id"]
for col in int_columns:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["icd9_code", "short_title", "long_title"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')

# Write to Avro file
parsed_schema = parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest","d_icd_dignoses.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 1, 'icd9_code': '01716', 'short_title': 'Erythem nod tb-oth test', 'long_title': 'Erythema nodosum with hypersensitivity reaction in tuberculosis, tubercle bacilli not found by bacteriological or histological examination, but tuberculosis confirmed by other methods [inoculation of animals]'}


### Table --> d_labitems

In [23]:
d_labitems.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 753 entries, 0 to 752
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   row_id      753 non-null    int64 
 1   itemid      753 non-null    int64 
 2   label       753 non-null    object
 3   fluid       753 non-null    object
 4   category    753 non-null    object
 5   loinc_code  585 non-null    object
dtypes: int64(2), object(4)
memory usage: 35.4+ KB


In [None]:
import pandas as pd
import json
from fastavro import writer, schema
import os

# Avro schema definition
avro_schema = {
    "type": "record",
    "name": "ItemDictionary",
    "fields": [
        {"name": "row_id", "type": ["int", "null"]},
        {"name": "itemid", "type": ["int", "null"]},
        {"name": "label", "type": ["string", "null"]},
        {"name": "fluid", "type": ["string", "null"]},
        {"name": "category", "type": ["string", "null"]},
        {"name": "loinc_code", "type": ["string", "null"]}
    ]
}

# Load and clean the data
df = d_labitems #pd.read_csv('your_input_file.csv')  # Replace with actual file path
df = df.drop_duplicates()

# Convert integer columns and handle missing values
int_columns = ["row_id", "itemid"]
for col in int_columns:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype('int32')  # Fill NA with 0; adjust as needed
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert string columns and handle missing values
string_columns = ["label", "fluid", "category", "loinc_code"]
for col in string_columns:
    if col in df.columns:
        df[col] = df[col].fillna('').astype('string')  # Fill NA with empty string
    else:
        raise ValueError(f"Column {col} not found in DataFrame")

# Convert DataFrame to records for Avro
records = df.to_dict('records')

# Write to Avro file
parsed_schema = schema.parse_schema(avro_schema)
output_path = os.path.join("C:", "Users", "ahmed", "OneDrive","Desktop","avrotest", "d_labitems.avro")
os.makedirs(os.path.dirname(output_path), exist_ok=True)  # Create directory if it doesn't exist
with open(output_path, "wb") as f:
    writer(f, parsed_schema, records)

# Verify the Avro file
from fastavro import reader
with open(output_path, 'rb') as f:
    for record in reader(f):
        print(record)  # Print first record to check
        break

{'row_id': 1, 'itemid': 50800, 'label': 'SPECIMEN TYPE', 'fluid': 'BLOOD', 'category': 'BLOOD GAS', 'loinc_code': ''}
