# Merging everything with Dask

In [18]:
!pip install dask
!pip install pandas
!pip install dask[dataframe]
!pip install "dask[distributed]" --upgrade

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Collecting distributed==2024.12.1 (from dask[distributed])
  Downloading distributed-2024.12.1-py3-none-any.whl.metadata (3.3 kB)
Collecting msgpack>=1.0.2 (from distributed==2024.12.1->dask[distributed])
  Downloading msgpack-1.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.4 kB)
Collecting sortedcontainers>=2.0.5 (from distributed==2024.12.1->dask[distributed])
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib>=1.6.0 (from distributed==2024.12.1->dask[distributed])
  Downloading tblib-3.0.0-py3-none-any.whl.metadata (25 kB)
Collecting zict>=3.0.0 (from distributed==2024.12.1->dask[distributed])
  Downloading 

In [1]:
import dask.dataframe as dd
import pandas as pd

def rename_columns(df, prefix, exceptions=None, dataset_type=None):
    """
    Renames the columns of a DataFrame by adding a prefix, replacing spaces with underscores, and converting to lowercase.
    Special handling for renaming 'caseID' based on dataset type (inpatient or out'patient).

    Parameters:
        df (DataFrame): The input DataFrame.
        prefix (str): Prefix to add to column names.
        exceptions (list, optional): List of column names to exclude from renaming.
        dataset_type (str, optional): Type of dataset ('inpatient' or 'outpatient') for special 'caseID' handling.

    Returns:
        DataFrame: The updated DataFrame with renamed columns.
    """
    if exceptions is None:
        exceptions = []

    def rename_column(col_name):
        if col_name in exceptions:
            return col_name
        if col_name == "caseID":
            if dataset_type == "inpatient":
                return "inpatient_caseID"
            elif dataset_type == "outpatient":
                return "outpatient_caseID"
        return f"{prefix}_{col_name.strip().replace(' ', '_').lower()}"

    df = df.rename(columns=rename_column)
    return df

# Define base path for the input CSV files
base_path = 'testdata'

# Define file paths for the input CSV files
file_paths = {
    'insurance_data': f'{base_path}/test.insurance_data.csv',
    'insurants': f'{base_path}/test.insurants.csv',
    'inpatient_cases': f'{base_path}/test.inpatient_cases.csv',
    'inpatient_diagnosis': f'{base_path}/test.inpatient_diagnosis.csv',
    'inpatient_procedures': f'{base_path}/test.inpatient_procedures.csv',
    'inpatient_fees': f'{base_path}/test.inpatient_fees.csv',
    'outpatient_cases': f'{base_path}/test.outpatient_cases.csv',
    'outpatient_diagnosis': f'{base_path}/test.outpatient_diagnosis.csv',
    'outpatient_fees': f'{base_path}/test.outpatient_fees.csv',
    'outpatient_procedures': f'{base_path}/test.outpatient_procedures.csv',
    'drugs': f'{base_path}/test.drugs.csv'
}

# Define dtypes for each file
dtypes = {
    'insurance_data': {
        'pid': int,
        'death': 'Int64',
        'regional_code': 'Int64'
    },
    'insurants': {
        'pid': int,
        'year_of_birth': int,
        'gender': int,
    },
    'inpatient_cases': {
        'pid': int,
        'caseID': 'Int64',
        'cause of admission': 'str',
        'cause of discharge': 'str',
        'outpatient treatment': 'Int64',
        'department admission': str,
        'department discharge': str
    },
    'inpatient_diagnosis': {
        'pid': int,
        'caseID': 'Int64',
        'diagnosis': str,
        'type of diagnosis': str,
        'is main diagnosis': 'Int64',
        'localisation': 'Int64'
    },
    'inpatient_fees': {
        'pid': int,
        'caseID': 'Int64',
        'billing code': str,
        'amount due': float,
        'quantity': 'Int64'
    },
    'inpatient_procedures': {
        'pid': int,
        'caseID': 'Int64',
        'procedure code': str,
        'localisation': 'Int64',
    },
    'outpatient_cases': {
        'pid': int,
        'caseID': 'Int64',
        'practice code': str,
        'amount due': float,
        'year': 'Int64',
        'quarter': 'Int64'
    },
    'outpatient_diagnosis': {
        'pid': int,
        'caseID': 'Int64',
        'diagnosis': str,
        'qualification': str,
        'localisation': 'Int64'
    },
    'outpatient_fees': {
        'pid': int,
        'caseID': 'Int64',
        'physican code': str,
        'specialty code': str,
        'billing code': str,
        'quantity': 'Int64',
    },
    'outpatient_procedures': {
        'pid': int,
        'caseID': 'Int64',
        'procedure code': str,
        'localisation': 'Int64',
    },
    'drugs': {
        'pid': int,
        'pharma central number': str,
        'specialty of prescriber': str,
        'physican code': str,
        'practice code': str,
        'outpatient_diagnosis': str,
        'outpatient_diagnosis_qualification': str,
        'outpatient_diagnosis_localisation': 'Int64',
    }
}

parse_dates = {
    'insurance_data': ['from', 'to'],
    'inpatient_cases': ['date of admission', 'date of discharge'],
    'inpatient_fees': ['from', 'to'],
    'inpatient_procedures': ['date of procedure'],
    'outpatient_cases': ['from', 'to'],
    'outpatient_fees': ['date'],
    'drugs': ['date of prescription', 'date of dispense']
}

# Read and process each CSV
dataframes = {}
for table_name, file_path in file_paths.items():
    # Determine dataset type
    dataset_type = "inpatient" if "inpatient" in table_name else "outpatient" if "outpatient" in table_name else None

    # Read CSV
    df = dd.read_csv(
        file_path,
        sep='\t',
        dtype=dtypes.get(table_name, None),
        parse_dates=parse_dates.get(table_name, None),
        assume_missing=True
    )

    # Rename columns
    df = rename_columns(df, prefix=table_name, exceptions=['pid'], dataset_type=dataset_type)

    # Store the processed DataFrame
    dataframes[table_name] = df

In [2]:
dataframes

{'insurance_data': Dask DataFrame Structure:
                  pid insurance_data_from insurance_data_to insurance_data_death insurance_data_regional_code
 npartitions=1                                                                                               
                int64      datetime64[ns]    datetime64[ns]                Int64                        Int64
                  ...                 ...               ...                  ...                          ...
 Dask Name: operation, 3 expressions
 Expr=RenameFrame(frame=ArrowStringConversion(frame=FromMapProjectable(8298d3d)), columns=<function rename_columns.<locals>.rename_column at 0x7aad45fc6ac0>),
 'insurants': Dask DataFrame Structure:
                  pid insurants_year_of_birth insurants_gender
 npartitions=1                                                
                int64                 float64            int64
                  ...                     ...              ...
 Dask Name: operation, 3 ex

In [3]:
dataframes['outpatient_cases']

Unnamed: 0_level_0,pid,outpatient_caseID,outpatient_cases_practice_code,outpatient_cases_from,outpatient_cases_to,outpatient_cases_amount_due,outpatient_cases_year,outpatient_cases_quarter
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
,int64,Int64,string,datetime64[ns],datetime64[ns],float64,Int64,Int64
,...,...,...,...,...,...,...,...


In [4]:
print(dataframes['outpatient_fees'].dtypes)

pid                                         int64
outpatient_caseID                           Int64
outpatient_fees_physican_code     string[pyarrow]
outpatient_fees_specialty_code    string[pyarrow]
outpatient_fees_billing_code      string[pyarrow]
outpatient_fees_quantity                    Int64
outpatient_fees_date               datetime64[ns]
dtype: object


In [5]:
df_result = dataframes['inpatient_cases'].compute()

In [6]:
df_result

Unnamed: 0,pid,inpatient_caseID,inpatient_cases_date_of_admission,inpatient_cases_date_of_discharge,inpatient_cases_cause_of_admission,inpatient_cases_cause_of_discharge,inpatient_cases_outpatient_treatment,inpatient_cases_department_admission,inpatient_cases_department_discharge
0,6,5556089,2019-06-14,2019-06-14,,,1,,
1,6,1568766,2019-08-30,2019-08-30,,,1,,
2,6,2714453,2019-03-30,2019-03-30,,,1,,
3,6,364774,2018-10-27,2018-10-27,,,1,,
4,6,11633323,2018-07-06,2018-07-07,0101,06,0,0100,0100
...,...,...,...,...,...,...,...,...,...
721,986,9750153,2019-11-21,2019-11-21,,,1,,
722,986,9117719,2019-10-25,2019-10-26,0107,01,0,0100,0100
723,987,6300871,2019-01-10,2019-01-13,0101,01,0,3400,3400
724,995,370981,2018-12-29,2018-12-29,,,1,,


In [7]:
# Merge datasets step by step
# Example: Merging 'insurance_data' with 'insurants'
df_merged = dd.merge(dataframes['insurance_data'], dataframes['insurants'], on='pid', how='left')
df_merged = dd.merge(df_merged, dataframes['outpatient_cases'], on='pid', how='left')

In [8]:
# Perform computations, filtering, or additional processing as needed
# Example: Compute the result to avoid lazy evaluation
df_result = df_merged.compute()

In [9]:
df_result

Unnamed: 0,pid,insurance_data_from,insurance_data_to,insurance_data_death,insurance_data_regional_code,insurants_year_of_birth,insurants_gender,outpatient_caseID,outpatient_cases_practice_code,outpatient_cases_from,outpatient_cases_to,outpatient_cases_amount_due,outpatient_cases_year,outpatient_cases_quarter
0,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,2018-09-13,113.259265,2018,3
1,724,2019-01-01,2019-12-31,0,9,1985.0,1,54227018,838242784,2018-07-01,2018-07-01,46.886284,2018,3
2,724,2019-01-01,2019-12-31,0,9,1985.0,1,73873110,838242784,2019-04-05,2019-04-12,58.886200,2019,2
3,183,2018-01-01,2018-12-31,0,5,1987.0,1,,,NaT,NaT,,,
4,831,2020-01-01,2020-12-31,0,16,1977.0,2,,,NaT,NaT,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
54776,986,2019-01-01,2019-12-31,0,5,1989.0,1,133596529,722488739,2019-08-01,2019-08-10,136.975700,2019,3
54777,986,2019-01-01,2019-12-31,0,5,1989.0,1,85527165,922391990,2020-04-03,2020-04-27,49.116323,2020,2
54778,986,2019-01-01,2019-12-31,0,5,1989.0,1,1004537983,922391990,2020-07-01,2020-09-30,95.000000,2020,3
54779,986,2019-01-01,2019-12-31,0,5,1989.0,1,109579845,836390643,2020-04-01,2020-04-01,20.775885,2020,2


In [10]:
# Merge outpatient_diagnosis on both pid and outpatient_caseID
df_merged = dd.merge(df_merged, dataframes['outpatient_diagnosis'], on=['pid', 'outpatient_caseID'], how='left')

# Merge outpatient_procedures on both pid and outpatient_caseID
df_merged = dd.merge(df_merged, dataframes['outpatient_procedures'], on=['pid', 'outpatient_caseID'], how='left')

# Merge outpatient_fees on both pid and outpatient_caseID
df_merged = dd.merge(df_merged, dataframes['outpatient_fees'], on=['pid', 'outpatient_caseID'], how='left')

In [11]:
df_merged_outpatient = df_merged

In [12]:
df_result = df_merged.compute()
df_result

Unnamed: 0,pid,insurance_data_from,insurance_data_to,insurance_data_death,insurance_data_regional_code,insurants_year_of_birth,insurants_gender,outpatient_caseID,outpatient_cases_practice_code,outpatient_cases_from,...,outpatient_diagnosis_qualification,outpatient_diagnosis_localisation,outpatient_procedures_procedure_code,outpatient_procedures_localisation,outpatient_procedures_date_of_procedure,outpatient_fees_physican_code,outpatient_fees_specialty_code,outpatient_fees_billing_code,outpatient_fees_quantity,outpatient_fees_date
0,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,...,G,,,,,979483231,31,13691R,1,2018-09-13
1,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,...,G,,,,,979483231,31,33042,1,2018-09-13
2,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,...,G,,,,,979483231,31,32413,1,2018-09-13
3,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,...,G,,,,,979483231,31,32030,1,2018-09-13
4,724,2019-01-01,2019-12-31,0,9,1985.0,1,113432713,687404132,2018-07-01,...,G,,,,,979483231,31,32443,1,2018-09-13
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1586140,986,2019-01-01,2019-12-31,0,5,1989.0,1,1004537983,922391990,2020-07-01,...,G,,,,,,,H0000,1,2020-09-03
1586141,986,2019-01-01,2019-12-31,0,5,1989.0,1,1004537983,922391990,2020-07-01,...,G,,,,,,,H0000,1,2020-08-21
1586142,986,2019-01-01,2019-12-31,0,5,1989.0,1,1004537983,922391990,2020-07-01,...,G,,,,,,,H0000,1,2020-08-09
1586143,986,2019-01-01,2019-12-31,0,5,1989.0,1,109579845,836390643,2020-04-01,...,G,,,,,621898700,00,01212,1,2020-04-01


In [13]:
# Merge inpatient_cases
df_merged = dd.merge(df_merged, dataframes['inpatient_cases'], on=['pid'], how='left')

# Merge inpatient_diagnosis on both pid and inpatient_caseID
df_merged = dd.merge(df_merged, dataframes['inpatient_diagnosis'], on=['pid', 'inpatient_caseID'], how='left')

# Merge inpatient_procedures on both pid and inpatient_caseID
df_merged = dd.merge(df_merged, dataframes['inpatient_procedures'], on=['pid', 'inpatient_caseID'], how='left')

# Merge inpatient_fees on both pid and inpatient_caseID
df_merged = dd.merge(df_merged, dataframes['inpatient_fees'], on=['pid', 'inpatient_caseID'], how='left')

# Merge drugs on pid only (no caseID in drugs)
df_merged = dd.merge(df_merged, dataframes['drugs'], on='pid', how='left')

In [14]:
#print(df_merged.memory_usage(deep=True).compute())  # Total memory used

# Partition the dataframe
- Split the dataframe into smaller partitions/chunks otherwise the kernel crashes due to lack of memory
- Testing it with 10 partitions a the moment

In [15]:
df_merged.npartitions

1

In [16]:
df_merged = df_merged.repartition(npartitions=100)
df_merged.npartitions

100

In [None]:
df_merged.map_partitions(len).compute()

In [20]:
df_merged.get_partition(0)
# gleicher command
# df_merged.partitions[0]

Unnamed: 0_level_0,pid,insurance_data_from,insurance_data_to,insurance_data_death,insurance_data_regional_code,insurants_year_of_birth,insurants_gender,outpatient_caseID,outpatient_cases_practice_code,outpatient_cases_from,outpatient_cases_to,outpatient_cases_amount_due,outpatient_cases_year,outpatient_cases_quarter,outpatient_diagnosis_diagnosis,outpatient_diagnosis_qualification,outpatient_diagnosis_localisation,outpatient_procedures_procedure_code,outpatient_procedures_localisation,outpatient_procedures_date_of_procedure,outpatient_fees_physican_code,outpatient_fees_specialty_code,outpatient_fees_billing_code,outpatient_fees_quantity,outpatient_fees_date,inpatient_caseID,inpatient_cases_date_of_admission,inpatient_cases_date_of_discharge,inpatient_cases_cause_of_admission,inpatient_cases_cause_of_discharge,inpatient_cases_outpatient_treatment,inpatient_cases_department_admission,inpatient_cases_department_discharge,inpatient_diagnosis_diagnosis,inpatient_diagnosis_type_of_diagnosis,inpatient_diagnosis_is_main_diagnosis,inpatient_diagnosis_localisation,inpatient_procedures_procedure_code,inpatient_procedures_localisation,inpatient_procedures_date_of_procedure,inpatient_fees_from,inpatient_fees_to,inpatient_fees_billing_code,inpatient_fees_amount_due,inpatient_fees_quantity,drugs_date_of_prescription,drugs_date_of_dispense,drugs_pharma_central_number,drugs_specialty_of_prescriber,drugs_physican_code,drugs_practice_code,drugs_quantity,drugs_amount_due,drugs_atc,drugs_ddd
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1
,int64,datetime64[ns],datetime64[ns],Int64,Int64,float64,int64,Int64,string,datetime64[ns],datetime64[ns],float64,Int64,Int64,string,string,Int64,string,Int64,string,string,string,string,Int64,datetime64[ns],Int64,datetime64[ns],datetime64[ns],string,string,Int64,string,string,string,string,Int64,float64,string,Int64,datetime64[ns],datetime64[ns],datetime64[ns],string,float64,Int64,datetime64[ns],datetime64[ns],string,string,string,string,float64,float64,string,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [21]:
df_result_part_1_100 = df_merged.get_partition(0).compute
df_result_part_1_100

<bound method FrameBase.compute of Dask DataFrame Structure:
                 pid insurance_data_from insurance_data_to insurance_data_death insurance_data_regional_code insurants_year_of_birth insurants_gender outpatient_caseID outpatient_cases_practice_code outpatient_cases_from outpatient_cases_to outpatient_cases_amount_due outpatient_cases_year outpatient_cases_quarter outpatient_diagnosis_diagnosis outpatient_diagnosis_qualification outpatient_diagnosis_localisation outpatient_procedures_procedure_code outpatient_procedures_localisation outpatient_procedures_date_of_procedure outpatient_fees_physican_code outpatient_fees_specialty_code outpatient_fees_billing_code outpatient_fees_quantity outpatient_fees_date inpatient_caseID inpatient_cases_date_of_admission inpatient_cases_date_of_discharge inpatient_cases_cause_of_admission inpatient_cases_cause_of_discharge inpatient_cases_outpatient_treatment inpatient_cases_department_admission inpatient_cases_department_discharge inpatient

# Manage dask workers on local cluster

In [29]:
# load/import classes
from dask.distributed import Client, LocalCluster

# set up cluster with 4 workers. Each worker uses 1 thread and has a 64GB memory limit.
cluster = LocalCluster(n_workers=2, 
                       threads_per_worker=10,
                       memory_limit='30GB')
client = Client(cluster)

# have a look at your workers
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40593 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:40593/status,

0,1
Dashboard: http://127.0.0.1:40593/status,Workers: 2
Total threads: 20,Total memory: 55.88 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41967,Workers: 2
Dashboard: http://127.0.0.1:40593/status,Total threads: 20
Started: Just now,Total memory: 55.88 GiB

0,1
Comm: tcp://127.0.0.1:37775,Total threads: 10
Dashboard: http://127.0.0.1:46591/status,Memory: 27.94 GiB
Nanny: tcp://127.0.0.1:42175,
Local directory: /tmp/dask-scratch-space/worker-_g7o_dd5,Local directory: /tmp/dask-scratch-space/worker-_g7o_dd5

0,1
Comm: tcp://127.0.0.1:46689,Total threads: 10
Dashboard: http://127.0.0.1:41075/status,Memory: 27.94 GiB
Nanny: tcp://127.0.0.1:37705,
Local directory: /tmp/dask-scratch-space/worker-2ck1zbr0,Local directory: /tmp/dask-scratch-space/worker-2ck1zbr0


In [None]:
# Split dask dataframe into 10

In [24]:
# save to parquet
output_dir = './testdata_merged_parquet_output'
name_function = lambda x: f"testdata-merged-{x}.parquet"
df_merged.to_parquet(output_dir, name_function=name_function, engine='pyarrow')

# close workers and cluster
client.close()# close workers and cluster




KeyboardInterrupt: 

In [28]:
# client.close()
cluster.close()
cluster.close()

2025-01-12 16:59:46,331 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.12/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.12/site-packages/distributed/worker.py", line 1269, in heartbeat
    response = await retry_operation(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jovyan/.local/lib/python3.12/site-packages/distributed/utils_comm.py", line 441, in retry_operation
    return await retry(
           ^^^^^^^^^^^^
  File "/home/jovyan/.local/lib/python3.12/site-packages/distributed/utils_comm.py", line 420, in retry
    return await

In [None]:
df_result = df_merged.compute()
df_result

In [None]:
df_result.loc[df_result['pid'] == 1]