# Merge
In this notebook, we will do the following tables.
```
0     PROCEDUREEVENTS_MV
1                D_ITEMS
2     MICROBIOLOGYEVENTS
3              LABEVENTS
4         INPUTEVENTS_CV
5             ADMISSIONS
6             D_LABITEMS
7         DATETIMEEVENTS
8          PRESCRIPTIONS
9         PROCEDURES_ICD
10           CHARTEVENTS
11             TRANSFERS
12         DIAGNOSES_ICD
13              SERVICES
14              DRGCODES
15          OUTPUTEVENTS
16              PATIENTS
17       D_ICD_DIAGNOSES
18              ICUSTAYS
19        INPUTEVENTS_MV
20      D_ICD_PROCEDURES
```
Some functions we need to implement are:
1. functions to read in two csv files with selected column
2. create a schema based on schema definition

In [22]:
import pandas as pd
import os


from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf


from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType

from IPython.display import display, HTML

In [23]:
display(HTML("<style>.container { width:100% !important; }</style>"))
conf = SparkConf()  # create the configuration
conf.setMaster("local")
# conf.set("spark.jars", "/Users/yixiangzhang/Desktop/postgresql-42.4.1.jar")
# conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark = SparkSession.builder\
                    .config(conf = conf)\
                    .appName('test').getOrCreate()
sc = SparkContext.getOrCreate()

23/05/24 20:31:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [24]:
spark

# Populate `Transfers` table

In [25]:
df = pd.read_parquet("./silver_data/TRANSFERS.parquet")
df

Unnamed: 0,mimic3,mimic4,column_mimic3,column_mimic4_candidate_1,candidate_1_scores,dtype_mimic3,mimic3_filepath,mimic4_filepath
124,TRANSFERS,transfers.csv,subject_id,subject_id,100,int,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...
125,TRANSFERS,transfers.csv,hadm_id,hadm_id,100,int,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...
126,TRANSFERS,transfers.csv,eventtype,eventtype,100,string,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...
127,TRANSFERS,transfers.csv,curr_careunit,careunit,90,string,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...
128,TRANSFERS,transfers.csv,intime,intime,100,datetime,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...
129,TRANSFERS,transfers.csv,outtime,outtime,100,datetime,../basic_filtered_data/mimic-iii-demo/TRANSFER...,../basic_filtered_data/mimic-iv-demo/2.2/hosp/...


In [26]:
mimic3_path = df["mimic3_filepath"].iloc[0]
mimic3_selected = df["column_mimic3"].to_list()
mimic4_path = df["mimic4_filepath"].iloc[0]
mimic4_selected = df["column_mimic4_candidate_1"].to_list()
dtypes = df["dtype_mimic3"].to_list()

In [27]:
spark = SparkSession.builder.appName("Read compressed CSV").config("spark.sql.inMemoryColumnarStorage.compressed", "true").getOrCreate()

23/05/24 20:31:38 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [28]:
df_m3 = spark.read.csv(mimic3_path, 
                       header=True,
                       inferSchema=False)


## Rename two functions




In [29]:
def rename_columns(data,from_cols,to_cols):
    """map a dataframe with original columns to destination columns

    :param _type_ data: pyspark dataframe
    :param list from_cols: list of original name
    :param list to_cols: list of destination name
    :return _type_: renamed dataframe 
    """
    
    for i in range(len(from_cols)):
        # if columns names are different, then dump in mapper
        if from_cols[i] != to_cols[i]:
            data = data.withColumnRenamed(from_cols[i],to_cols[i])
    
    return data
    

df_m3 = rename_columns(data = df_m3,
                        from_cols=mimic3_selected,
                        to_cols=mimic4_selected)

In [30]:
df_m3_selected = df_m3.select(mimic4_selected)

In [31]:
df_m3_selected.show()

+----------+-------+---------+--------+-------------------+-------------------+
|subject_id|hadm_id|eventtype|careunit|             intime|            outtime|
+----------+-------+---------+--------+-------------------+-------------------+
|     10006| 142345|    admit|    MICU|2164-10-23 21:10:15|2164-10-25 12:21:07|
|     10006| 142345| transfer|    null|2164-10-25 12:21:07|2164-11-01 17:14:27|
|     10006| 142345|discharge|    null|2164-11-01 17:14:27|               null|
|     10011| 105331|    admit|    MICU|2126-08-14 22:34:00|2126-08-28 18:59:00|
|     10011| 105331|discharge|    null|2126-08-28 18:59:00|               null|
|     10013| 165520|    admit|    MICU|2125-10-04 23:38:00|2125-10-07 15:13:52|
|     10013| 165520|discharge|    null|2125-10-07 15:13:52|               null|
|     10017| 199207|    admit|    null|2149-05-26 17:21:09|2149-05-26 19:15:46|
|     10017| 199207| transfer|    null|2149-05-26 19:15:46|2149-05-26 19:22:45|
|     10017| 199207| transfer|    null|2

In [32]:
mappers_pandas_types_to_spark_types = {
    "int": IntegerType(),
    "string": StringType(),
    "datetime": TimestampType(),
    "float": DoubleType()
}

In [33]:
# create a mapper for datatypes
def create_schema(pandas_types, mappers, column_names):
    """
    map pandas data types such as int and float to pyspark dtypes such as .
    
    :param list pandas_types: pandas_types
    :param dict mappers: a mapper
    :param list column_names: column names you wish to put in the schema
    :return pyspark.sql.types.StructType: _description_
    """
    # map from pandas datatype to pyspark dtype
    data_types = [mappers[type] for type in pandas_types]    
    
    # create fields
    fields = [StructField(name, dataType) for name, dataType in zip(column_names, data_types)]
    schema = StructType(fields)

    return schema

transfers_schema = create_schema(pandas_types =df["dtype_mimic3"].to_list(),
                                 mappers = mappers_pandas_types_to_spark_types,
                                 column_names=mimic4_selected)

transfers_schema

StructType([StructField('subject_id', IntegerType(), True), StructField('hadm_id', IntegerType(), True), StructField('eventtype', StringType(), True), StructField('careunit', StringType(), True), StructField('intime', TimestampType(), True), StructField('outtime', TimestampType(), True)])

In [34]:
df_m3_selected.printSchema()

root
 |-- subject_id: string (nullable = true)
 |-- hadm_id: string (nullable = true)
 |-- eventtype: string (nullable = true)
 |-- careunit: string (nullable = true)
 |-- intime: string (nullable = true)
 |-- outtime: string (nullable = true)



In [35]:
def cast_schema(df,schema):
    """

    :param _type_ df: _description_
    :param _type_ schema: _description_
    :return _type_: _description_
    """
    df_casted = df.select([df[field.name].cast(field.dataType) for field in schema.fields])
    
    return df_casted

df_m3_casted = cast_schema(df = df_m3_selected,
                           schema = transfers_schema
                           )

# df_m3_casted = df_m3_selected.select([df_m3_selected[field.name].cast(field.dataType) for field in transfers_schema.fields])


In [36]:
df_m3_casted.printSchema()

root
 |-- subject_id: integer (nullable = true)
 |-- hadm_id: integer (nullable = true)
 |-- eventtype: string (nullable = true)
 |-- careunit: string (nullable = true)
 |-- intime: timestamp (nullable = true)
 |-- outtime: timestamp (nullable = true)



## Do the same thing for mimic4

In [37]:
df_m4 = spark.read.csv(mimic4_path, 
                       header=True,
                       inferSchema=False)

df_m4_selected = df_m4.select(mimic4_selected)

In [38]:
df_m4_selected.printSchema()

root
 |-- subject_id: string (nullable = true)
 |-- hadm_id: string (nullable = true)
 |-- eventtype: string (nullable = true)
 |-- careunit: string (nullable = true)
 |-- intime: string (nullable = true)
 |-- outtime: string (nullable = true)



In [39]:
df_m4_casted = cast_schema(df = df_m4_selected,
                           schema = transfers_schema
                           )
df_m4_casted.printSchema()

root
 |-- subject_id: integer (nullable = true)
 |-- hadm_id: integer (nullable = true)
 |-- eventtype: string (nullable = true)
 |-- careunit: string (nullable = true)
 |-- intime: timestamp (nullable = true)
 |-- outtime: timestamp (nullable = true)



# Merge these two dataframes

In [40]:
df_transfer_merged = df_m4_casted.coalesce(1).union(df_m3_casted.coalesce(1))
df_transfer_merged.show()

+----------+--------+---------+--------+-------------------+-------+
|subject_id| hadm_id|eventtype|careunit|             intime|outtime|
+----------+--------+---------+--------+-------------------+-------+
|  10009049|22995465|discharge|    null|2174-05-31 14:21:47|   null|
|  10025612|23403708|discharge|    null|2125-10-03 12:25:27|   null|
|  10020786|23488445|discharge|    null|2189-06-13 17:25:44|   null|
|  10014078|25809882|discharge|    null|2166-08-26 14:49:42|   null|
|  10039831|26924951|discharge|    null|2116-01-02 14:35:02|   null|
|  10029484|20764029|discharge|    null|2160-11-11 11:40:33|   null|
|  10019568|28710730|discharge|    null|2120-02-02 15:40:32|   null|
|  10020640|27984218|discharge|    null|2153-02-20 13:53:45|   null|
|  10002495|24982426|discharge|    null|2141-05-29 17:42:52|   null|
|  10007058|22954658|discharge|    null|2167-11-11 14:26:31|   null|
|  10004422|21255400|discharge|    null|2111-01-25 15:34:47|   null|
|  10037975|27617929|discharge|   

# Write to database

In [41]:
def write_to_db(data_frame, table_name, db_name, db_usrname, db_pssword,port):
    data_frame.write.format('jdbc').options(
              url=f'jdbc:postgresql://localhost:{port}/{db_name}',
              driver='org.postgresql.Driver',
              dbtable=table_name,
              user=db_usrname,
              password=db_pssword).\
              mode('overwrite') \
              .save()
    
    out_message = f"Data frame {data_frame} has been appended to table {table_name} in the PostgreSQL database."
    return out_message

In [42]:
write_to_db(data_frame=df_transfer_merged,
            table_name="transfers",
            db_name="mimic",
            db_usrname="mimic",
            db_pssword="mimic",
            port=6432)

'Data frame DataFrame[subject_id: int, hadm_id: int, eventtype: string, careunit: string, intime: timestamp, outtime: timestamp] has been appended to table transfers in the PostgreSQL database.'