In [1]:
import pandas as pd
from functools import reduce
import numpy as np
import gc

### This notebook tries to

(1) reduce pandas frames' sizes

(2) merge different frames

(3) finally output a single Pandas DataFrame

The pandas dataframes we are working with are very large, to make them easier to work with we will do a variety of tricks to shrink their memory footprint.

### The mem_usage function checks the memory usage of a pandas object.

In [2]:
def mem_usage(pandas_obj):
    if isinstance(pandas_obj,pd.DataFrame):
        usage_b = pandas_obj.memory_usage(deep=True).sum()
    else: # we assume if not a df it's a series
        usage_b = pandas_obj.memory_usage(deep=True)
    usage_mb = usage_b / 1024 ** 2 # convert bytes to megabytes
    return "{:03.2f} MB".format(usage_mb)

### The reduce_mem_usage function tries to reduce a pandas object size by using 

(1) dropping empty columns 

(2) converting integer data type 

(3) converting float64 yo float32, and the final memory usage is printed.

In [3]:
def reduce_mem_usage(df, make_sparse=False):
    start_mem_usg = df.memory_usage().sum() / 1024**2 
    print("Memory usage of dataframe is :",start_mem_usg," MB")
    # First drop empty columns
    df.dropna(axis=1,how='all', inplace=True)
    for col in df.columns:
        # Print current column type
        print("******************************")
        print("Column: ",col)
        print("dtype before: ",df[col].dtype)
        if str(df[col].dtype) in ["object", "string"]:
            print(df[col].nunique(), len(df[col]), (df[col].nunique() / len(df[col])))
            if (df[col].nunique() / len(df[col])) < 0.5:
                df.loc[:,col] = df[col].astype('category')
        elif str(df[col].dtype).lower() == "int64": 
            # Make Integer/unsigned Integer datatypes
            mx = df[col].max()
            mn = df[col].min()
            try:
                if mn >= 0:
                    if mx < 255:
                        df[col] = df[col].astype(np.uint8)
                    elif mx < 65535:
                        df[col] = df[col].astype(np.uint16)
                    elif mx < 4294967295:
                        df[col] = df[col].astype(np.uint32)
                    else:
                        df[col] = df[col].astype(np.uint64)
                else:
                    if mn > np.iinfo(np.int8).min and mx < np.iinfo(np.int8).max:
                        df[col] = df[col].astype(np.int8)
                    elif mn > np.iinfo(np.int16).min and mx < np.iinfo(np.int16).max:
                        df[col] = df[col].astype(np.int16)
                    elif mn > np.iinfo(np.int32).min and mx < np.iinfo(np.int32).max:
                        df[col] = df[col].astype(np.int32)
                    elif mn > np.iinfo(np.int64).min and mx < np.iinfo(np.int64).max:
                        df[col] = df[col].astype(np.int64)
            except ValueError:
                df[col] = df[col].astype("Int64")
                # Make float datatypes 32 bit
        elif str(df[col].dtype) == "float64":
            df[col] = df[col].astype(np.float32)
            
        if (len(df[col].dropna()) / len(df[col]) < 0.25) and make_sparse:
            df[col] = pd.arrays.SparseArray(df[col], dtype = df[col].dtype)
        # Print new column type
        print("dtype after: ",df[col].dtype)
        print("******************************")
    
    # Print final result
    print("___MEMORY USAGE AFTER COMPLETION:___")
    mem_usg = df.memory_usage().sum() / 1024**2 
    print("Memory usage is: ",mem_usg," MB")
    print("This is ",100 * mem_usg / start_mem_usg,"% of the initial size")
    return df

## Fact Data
Some previous data exploration, checking for nans, num unique values, etc. has been omitted for brevity.

### The following code blocks try to reduce the DataFrame size of the data and save the reducced data into the hard disk. Note that code blocks are quite similar, and they do almost the same things except that the data frames are different.

In [4]:
# Load all the raw data queried from the db
fact_blood_product = pd.read_feather("../data/raw/fact_blood_product.feather")
fact_donation = pd.read_feather("../data/raw/fact_donation.feather")
fact_exception = pd.read_feather("../data/raw/fact_exception.feather")
fact_run = pd.read_feather("../data/raw/fact_run.feather")

ImportError: Missing optional dependency 'pyarrow'.  Use pip or conda to install pyarrow.

As of right now we don't know if some columns that share a name represent the same information.
So for now going to rename the columns to keep the information seperate.

In [None]:
fact_blood_product.rename(columns={
    "number_of_units_processed":"number_of_blood_product_units_processed"}, inplace=True)
fact_donation.rename(columns={
    "number_of_units_processed":"number_of_donation_units_processed",
    "number_of_duplicated_units":"number_of_duplicated_donation_units",
    "number_of_skipped_barcodes":"number_of_skipped_donation_barcodes",
    "number_of_alarms":"number_of_donation_alarms",}, inplace=True)
fact_exception.rename(columns={
    "number_of_units_processed":"number_of_donation_units_processed",
    "number_of_duplicated_units":"number_of_duplicated_exception_units",
    "number_of_skipped_barcodes":"number_of_skipped_exception_barcodes",
    "number_of_alarms":"number_of_exception_alarms",}, inplace=True)
fact_run.rename(columns={
    "number_of_barcodes_skipped":"number_of_skipped_run_barcodes"}, inplace=True)

### fact_blood_product & dim_blood_product

### Convert columns to best possible dtypes in the DataFrame

In [None]:
fact_blood_product = fact_blood_product.convert_dtypes()
fact_blood_product.info(verbose=True, null_counts=True)

In [None]:
# fact_blood_product has a column with all NaNs get rid of it
fact_blood_product.dropna(axis=1,how='all', inplace=True)

Now lets replace the dim_blood_product_id with the actuall blood products, this is essentially just a string replacement.

In [None]:
dim_blood_product = pd.read_feather("../data/raw/dim_blood_product.feather")
blood_products = dim_blood_product.set_index('dim_blood_product_id')['blood_product_type'].to_dict()
fact_blood_product["blood_product"] = fact_blood_product["dim_blood_product_id"].replace(blood_products)
fact_blood_product.drop(columns=['dim_blood_product_id'], inplace = True)
del dim_blood_product
gc.collect()

### Print the first several rows in the fact_blood_product frame

In [None]:
fact_blood_product.head()

Now lets clean up the datatypes

In [None]:
fact_blood_product = reduce_mem_usage(fact_blood_product)

In [None]:
fact_blood_product.info(verbose=True, null_counts=True)

In [None]:
fact_blood_product.to_feather("../data/interim/fact_blood_product.feather")

## fact_donation

In [None]:
fact_donation = fact_donation.convert_dtypes()
fact_donation.info(verbose=True, null_counts=True)

In [None]:
fact_donation = reduce_mem_usage(fact_donation)

In [None]:
fact_donation.info(verbose=True, null_counts=True)

In [None]:
fact_donation.to_feather("../data/interim/fact_donation.feather")

## fact_exception

In [None]:
fact_exception = fact_exception.convert_dtypes()
fact_exception.info(verbose=True, null_counts=True)

In [None]:
fact_exception = reduce_mem_usage(fact_exception)

In [None]:
fact_exception.info(verbose=True, null_counts=True)

In [None]:
fact_exception.to_feather("../data/interim/fact_exception.feather")

## fact_run

In [None]:
fact_run = fact_run.convert_dtypes()
fact_run.info(verbose=True, null_counts=True)

In [None]:
fact_run = reduce_mem_usage(fact_run)

In [None]:
fact_run.info(verbose=True, null_counts=True)

In [None]:
fact_run.to_feather("../data/interim/fact_run.feather")

## Merging fact data

### This section merges all available data into a single data frame by outer joining, and final output is a single Pandas DataFrame.

In [None]:
interim_fact_data_1 = pd.merge(fact_blood_product, fact_donation, how='outer',
                    left_on = ['dim_run_date', 'dim_device_id', 'dim_donation_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id'], 
                    right_on = ['dim_run_date', 'dim_device_id', 'dim_donation_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id']) 

del fact_blood_product
del fact_donation
gc.collect()

In [None]:
interim_fact_data_2 = pd.merge(fact_exception, fact_run, how='outer',
                    left_on = ['dim_run_date', 'dim_device_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id'], 
                    right_on = ['dim_run_date', 'dim_device_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id']) 

del fact_exception
del fact_run
gc.collect()

In [None]:
fact_data = pd.merge(interim_fact_data_1, interim_fact_data_2, how='outer',
                    left_on = ['dim_run_date', 'dim_device_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id'], 
                    right_on = ['dim_run_date', 'dim_device_id', 'dim_run_id', 'dim_facility_id', 'dim_configuration_id', 'dim_operator_id']) 

del interim_fact_data_1
del interim_fact_data_2
gc.collect()

In [None]:
fact_data = fact_data.convert_dtypes()
fact_data.info(verbose=True, null_counts=True)

In [None]:
fact_data = reduce_mem_usage(fact_data)
fact_data.to_feather("../data/interim/fact_data.feather")

In [None]:
fact_data.info(verbose=True, null_counts=True)

## Restart Kernal
Do this to free up memory

In [None]:
fact_data = pd.read_feather("../data/interim/fact_data.feather")

## dim_configuration

In [None]:
dim_configuration = pd.read_feather("../data/raw/dim_configuration.feather")
dim_configuration["configuration_status"] = dim_configuration["configuration_status"].astype("category")
dim_configuration.info()

In [None]:
fact_data = fact_data.merge(dim_configuration, how='outer',
                    left_on = ['dim_configuration_id'], 
                    right_on = ['dim_configuration_id']) 
fact_data.drop(columns=['dim_configuration_id'], inplace = True)

In [None]:
fact_data.info()

## dim_device

In [None]:
dim_device = pd.read_feather("../data/raw/dim_device.feather")
# drop device_serial_number, device_name, and device_type_name(which is only 'REVEOS') I don't imagine they contain anything useful.
dim_device.drop(columns=['device_serial_number', 'device_name', 'device_type_name'], inplace = True)
dim_device["device_software_version"] = dim_device["device_software_version"].astype("category")
dim_device["device_language_name"] = dim_device["device_language_name"].astype("category")
dim_device.info()

In [None]:
fact_data = fact_data.merge(dim_device, how='outer',
                    left_on = ['dim_device_id'], 
                    right_on = ['dim_device_id']) 
fact_data.drop(columns=['dim_device_id'], inplace = True)
fact_data.info()

## dim_donation

In [None]:
dim_donation = pd.read_feather("../data/raw/dim_donation.feather")
dim_donation["donation_status"] = dim_donation["donation_status"].astype("category")
#dim_donation["bucket_number"] = dim_donation["bucket_number"].astype("category")
dim_donation["unit_number_lifetime_status"] = dim_donation["unit_number_lifetime_status"].astype("category")
dim_donation["welding_status"] = dim_donation["welding_status"].astype("category")
dim_donation.info()

In [None]:
fact_data = fact_data.merge(dim_donation, how='outer',
                    left_on = ['dim_donation_id'], 
                    right_on = ['dim_donation_id']) 
fact_data.drop(columns=['dim_donation_id'], inplace = True)
fact_data.info()

## dim_exception

In [None]:
dim_exception = pd.read_feather("../data/raw/dim_exception.feather")
#dim_exception["bucket_number"] = dim_exception["bucket_number"].astype("category")
dim_exception["exception_type"] = dim_exception["exception_type"].astype("category")
dim_exception["exception_state"] = dim_exception["exception_state"].astype("category")
dim_exception.drop(columns=['run_data_message_entry_id'], inplace = True)
dim_exception.info()

In [None]:
fact_data = fact_data.merge(dim_exception, how='outer',
                    left_on = ['dim_exception_id', 'bucket_number', 'run_datetime'], 
                    right_on = ['dim_exception_id', 'bucket_number', 'run_datetime']) 
fact_data.drop(columns=['dim_exception_id'], inplace = True)
fact_data.info()

## dim_facility
I am omitting this for now, facility names are almost unique and I don't see them being more helpful than the dim_faciliy_id. Also not sure what the facility time_zone will add.

## dim_operator

In [None]:
dim_operator = pd.read_feather("../data/raw/dim_operator.feather")
dim_operator.info()

In [None]:
fact_data = fact_data.merge(dim_operator, how='outer',
                    left_on = ['dim_operator_id', 'operator_id'], 
                    right_on = ['dim_operator_id', 'operator_id']) 
fact_data.drop(columns=['dim_operator_id'], inplace = True)
fact_data.info()

## dim_run

In [None]:
dim_run = pd.read_feather("../data/raw/dim_run.feather")
# dim_run has a column with all NaNs get rid of it
dim_run.dropna(axis=1,how='all', inplace=True)
dim_run.drop(columns=['file_name'], inplace = True)
dim_run.info()

In [None]:
fact_data = fact_data.merge(dim_run, how='outer',
                    left_on = ['dim_run_id', 'operator_id'], 
                    right_on = ['dim_run_id', 'operator_id']) 
fact_data.drop(columns=['dim_run_id'], inplace = True)
fact_data.info()

Fix some remaining datatypes and general cleanup of the dataframe

In [None]:
fact_data["bucket_number"] = fact_data["bucket_number"].astype('category')

In [None]:
fact_data = reduce_mem_usage(fact_data)

In [None]:
fact_data.to_feather("../data/interim/fact_data.feather")

## Restart Kernal

In [None]:
dim_custom_data_01 = pd.read_feather("../data/raw/dim_custom_data_01.feather")
dim_custom_data_02 = pd.read_feather("../data/raw/dim_custom_data_02.feather")
dim_custom_data_03 = pd.read_feather("../data/raw/dim_custom_data_03.feather")
dim_custom_data_04 = pd.read_feather("../data/raw/dim_custom_data_04.feather")
dim_custom_flag = pd.read_feather("../data/raw/dim_custom_flag.feather")

In [None]:
dim_custom_data_01.iloc[:,2:] = dim_custom_data_01.iloc[:,2:].apply(pd.to_numeric, errors='raise')
dim_custom_data_01 = dim_custom_data_01.convert_dtypes()
dim_custom_data_01 = reduce_mem_usage(dim_custom_data_01)

In [None]:
dim_custom_data_02.iloc[:,2:] = dim_custom_data_02.iloc[:,2:].apply(pd.to_numeric, errors='raise')
dim_custom_data_02 = dim_custom_data_02.convert_dtypes()
dim_custom_data_02 = reduce_mem_usage(dim_custom_data_02)

In [None]:
dim_custom_data_03.iloc[:,2:] = dim_custom_data_03.iloc[:,2:].apply(pd.to_numeric, errors='raise')
dim_custom_data_03 = dim_custom_data_03.convert_dtypes()
dim_custom_data_03 = reduce_mem_usage(dim_custom_data_03)

In [None]:
dim_custom_data_04.iloc[:,2:] = dim_custom_data_04.iloc[:,2:].apply(pd.to_numeric, errors='raise')
dim_custom_data_04 = dim_custom_data_04.convert_dtypes()
dim_custom_data_04 = reduce_mem_usage(dim_custom_data_04)

In [None]:
dim_custom_flag.iloc[:,2:] = dim_custom_flag.iloc[:,2:].apply(pd.to_numeric, errors='raise')
dim_custom_flag = dim_custom_flag.convert_dtypes()
dim_custom_flag = reduce_mem_usage(dim_custom_flag)

In [None]:
custom_data = reduce(
                lambda x, y: pd.merge(
                    x, 
                    y,
                    how='outer',
                    left_on = ['run_data_donation_id', 'dim_custom_data_id'], 
                    right_on = ['run_data_donation_id', 'dim_custom_data_id']), 
                    [dim_custom_data_01, dim_custom_data_02, dim_custom_data_03, dim_custom_data_04, dim_custom_flag]
                )
custom_data.drop(columns=['run_data_donation_id'], inplace = True)
custom_data = reduce_mem_usage(custom_data)
custom_data.to_feather("../data/interim/custom_data.feather")

### Restart Kernel

Next lets merge *all* the data. We need to make the dataframes as small as possible because they will be huge in memory when pandas tries to merge them. To that end we're going to make everything a sparse category data type. We will go back and fix them after the merge.

In [None]:
custom_data = pd.read_feather("../data/interim/custom_data.feather")
fact_data = pd.read_feather("../data/interim/fact_data.feather")

In [None]:
for col in custom_data.columns[1:]:
    custom_data[col] = custom_data[col].astype('category')

In [None]:
for col in fact_data.columns:
    if col != "dim_custom_data_id":
        fact_data[col] = fact_data[col].astype('category')

In [None]:
final_data = pd.merge(fact_data, custom_data, how='outer',
                    left_on = ['dim_custom_data_id'], 
                    right_on = ['dim_custom_data_id']) 
final_data.drop(columns=['dim_custom_data_id'], inplace = True)
fact_data = None
cusomtom_data = None

In [None]:
final_data.to_feather("../data/interim/fact_and_custom_data.feather")

In [None]:
final_data.info(verbose=True)

In [None]:
final_data["product_volume"].describe()

In [None]:
final_data["product_volume"].unique()

In [None]:
np.array_equal(final_data["product_volume"].astype(float), final_data["product_volume"].astype("Int64"))

In [None]:
63314271 / final_data.shape[0]

In [None]:
final_data["run_date"] = pd.to_datetime(final_data["dim_run_date"], infer_datetime_format=True)
final_data.drop(columns=['dim_run_date'], inplace = True)

In [None]:
final_data["run_date"] = pd.arrays.SparseArray(final_data["run_date"], dtype = final_data["run_date"].dtype)

In [None]:
final_data.convert_dtypes()

In [None]:
final_data.head()