In [9]:
# imports
from pyspark.sql import SparkSession
from datetime import datetime
import pandas as pd
import numpy as np
import sdv
import os

new_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
os.chdir(new_dir)

print('In Predictive Model Folder:', os.getcwd())




In Predictive Model Folder: c:\Users\ashly\OneDrive\Documents\Education Material\ResearchProject\MaternalHealthResearch


Phase 1 - Data Preprocessing 
1. Create Combination of Data for HeartRate to detect activity imbalances
2. Create ManualInput Dataset 
3. Clean all datasets


Step 1: Create the HeartRate and METS merged dataset, filter null values and reduce the memory usage first

In [2]:
def filterDataset(df):
    print('Filter Dataset')

    # view details of the dataset
    print(df.head())
    print('Column Names:',df.dtypes)

    # print null values in the dataframe
    print('The sum of null values are:', df.isnull().sum())
    
    
    # drop rows with null values
    print('Count of cells BEFORE dropping null:', df.size,'\n')
    df = df.dropna() 
    print('Count of cells AFTER dropping null:', df.size, '\n')
    print('------------------------------------------------------------------')
    
    return df

In [11]:
def parseDateTime(df):
    # datetime split
    split_datetime = df['timestamp'].str.split(' ', expand=True)

    # Assign the date and time components to new columns
    df['date'] = split_datetime[0]
    df['time'] = split_datetime[1]

    df.drop(columns=['timestamp'], inplace=True)
    df[:3]
    print('------------------------------------------------------------------')
    return df

In [4]:
def reduceMemoryUsage(df, verbose=True):
    print('Reduce Memory')
    
    numerics = {
        np.int8: (np.iinfo(np.int8).min,np.iinfo(np.int8).max),
        np.int16: (np.iinfo(np.int16).min,np.iinfo(np.int16).max), 
        np.int32: (np.iinfo(np.int32).min,np.iinfo(np.int32).max), 
        np.int64: (np.iinfo(np.int64).min,np.iinfo(np.int64).max), 
        np.float16: (np.finfo(np.float16).min,np.finfo(np.float16).max), 
        np.float32: (np.finfo(np.float32).min,np.finfo(np.float32).max), 
        np.float64: (np.finfo(np.float64).min,np.finfo(np.float64).max)
        }
    types = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_memory_usage = df.memory_usage(deep=True).sum() / (1024 ** 2)
    print('Starting memory usage is {:5.5f}'.format(start_memory_usage))
    
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in types: 
            c_min = df[col].min()
            c_max = df[col].max()
            for n_key, n_value in numerics.items(): 
                if c_min > n_value[0] and c_max < n_value[1]:
                    df[col] = df[col].astype(n_key)
                    break
    
    end_memory_usage = df.memory_usage(deep=True).sum() / (1024**2)
    if verbose: 
        print('Memory usage decreased to {:5.5f} Mb ({:.5f}% reduction)'.format(end_memory_usage, 100 * (start_memory_usage - end_memory_usage) / start_memory_usage))
    print('------------------------------------------------------------------')
        
    return df

In [16]:
def createHRMetsDataset():
    ## Merge cleaned dataframes
    
    # read the heartrate and mets dataframe
    raw_data_path = f'{os.getcwd()}/data_raw/RAW-Fitabase Data 4.12.16-5.12.16/'
    df_mets = pd.read_csv(raw_data_path+'minuteMETsNarrow_merged.csv')
    df_heartrate = pd.read_csv(raw_data_path+'heartrate_seconds_merged.csv')
    
    # update columns names to be the same
    df_mets.columns = ['id', 'timestamp', 'mets']
    df_heartrate.columns = ['id', 'timestamp', 'bpm']
        
    # clean dataframes and reduce memory usage
    df_mets = filterDataset(df_mets)
    df_mets = reduceMemoryUsage(df_mets)
    df_mets = parseDateTime(df_mets)
    df_heartrate = filterDataset(df_heartrate)
    df_heartrate = reduceMemoryUsage(df_heartrate)
    df_heartrate = parseDateTime(df_heartrate)
    
    # merge dataframes using the column names and with an inner join
    df_merged_inner = pd.merge(df_mets, df_heartrate, on=['id', 'date', 'time'], how='inner')
    df_merged_right = pd.merge(df_mets, df_heartrate, on=['id', 'date', 'time'], how='right')
    interim_data_path = f'{os.getcwd()}/data_interim/'
    df_merged_inner.to_csv(f'{interim_data_path}heartrate_mets_merged_inner.csv', index=False)
    df_merged_right.to_csv(f'{interim_data_path}heartrate_mets_merged_right.csv', index=False)
    
    # create heartrate dataset of cleaned results
    df_heartrate.to_csv(f'{interim_data_path}heartrate_fiveseconds_intervals.csv', index=False)
   
    print('Heartrate and METS dataframes are merged')
    df_merged_inner[:3]
    df_merged_right[:3]

In [17]:
## create the heartrate_mets_merged.csv file with processed data
createHRMetsDataset()

Filter Dataset
           id              timestamp  mets
0  1503960366  4/12/2016 12:00:00 AM    10
1  1503960366  4/12/2016 12:01:00 AM    10
2  1503960366  4/12/2016 12:02:00 AM    10
3  1503960366  4/12/2016 12:03:00 AM    10
4  1503960366  4/12/2016 12:04:00 AM    10
Column Names: id            int64
timestamp    object
mets          int64
dtype: object
The sum of null values are: id           0
timestamp    0
mets         0
dtype: int64
Count of cells BEFORE dropping null: 3976740 

Count of cells AFTER dropping null: 3976740 

------------------------------------------------------------------
Reduce Memory
Starting memory usage is 117.53323
Memory usage decreased to 109.94820 Mb (6.45352% reduction)
------------------------------------------------------------------
------------------------------------------------------------------
Filter Dataset
           id             timestamp  bpm
0  2022484408  4/12/2016 7:21:00 AM   97
1  2022484408  4/12/2016 7:21:05 AM  102
2  202248440

Step 2: Create ManualInput Dataset
Columns = blood pressure, weight, calorie consumption, symptoms, mental health

In [10]:
def generateManualInput():
    # schema = {
    # 'blood_pressure': 'gaussian_kde',
    # 'glucose': 'beta',
    # 'weight': 'bounded',
    # 'calorie_consumption': 'poisson',
    # 'symptoms_code': 'categorical',
    # 'mental_health_code': 'categorical' 
    # }

    # create layout of dataset (this is daily manual input april 12 - may 12 2016)
    # collect all the id numbers display each for each day
    print('Complete Tomorrow')    

In [11]:
generateManualInput()

Complete Tomorrow


Step 3: Sleep & Actvity Tracking Dataset Cleaning (Select the most important attributes and convert each dataset into interim ones)

In [30]:
def processContextualDatasets():
    raw_data_path = f'{os.getcwd()}/data_raw/RAW-Fitabase Data 4.12.16-5.12.16/'
    
    ## Cleaning SleepDay Dataset
    df_sleepDay = pd.read_csv(raw_data_path+'sleepDay_merged.csv')
    df_sleepDay.columns = ['id', 'timestamp', 'total_sleep_records', 'total_minutes_asleep', 'total_time_inbed']
    df_sleepDay.drop(columns=['total_time_inbed'], inplace=True)
    
    df_sleepDay = filterDataset(df_sleepDay)
    df_sleepDay = reduceMemoryUsage(df_sleepDay)
    df_sleepDay = parseDateTime(df_sleepDay)
    df_sleepDay.drop(columns=['time'], inplace=True)
    
    # Write Dataset to csv 
    interim_data_path = f'{os.getcwd()}/data_interim/'
    df_sleepDay.to_csv(f'{interim_data_path}daily_sleep.csv', index=False)
    
    ## Cleaning Activity Dataset
    df_dailyActivity = pd.read_csv(f'{raw_data_path}dailyActivity_merged.csv')
    df_dailyActivity = df_dailyActivity.iloc[:,[0,1,2,3,10,11,12,13,14]]
    df_dailyActivity.columns = ['id', 'date', 'total_steps', 'total_distance_miles', 'very_active_minutes', 'fairly_active_minutes', 'lightly_active_minutes', 'sedentary_minutes', 'calories']

    df_dailyActivity = filterDataset(df_dailyActivity)
    df_dailyActivity = reduceMemoryUsage(df_dailyActivity)
    
    # merge sleep day and activity day
    df_merged = pd.merge(df_sleepDay, df_dailyActivity, on=['id', 'date'], how='inner')
    df_merged.to_csv(f'{interim_data_path}daily_sleep_activity.csv', index=False)
    
    print('Daily Activity and Sleep is Merged')
    df_merged[:3]

In [32]:
processContextualDatasets()

Filter Dataset
           id              timestamp  total_sleep_records  \
0  1503960366  4/12/2016 12:00:00 AM                    1   
1  1503960366  4/13/2016 12:00:00 AM                    2   
2  1503960366  4/15/2016 12:00:00 AM                    1   
3  1503960366  4/16/2016 12:00:00 AM                    2   
4  1503960366  4/17/2016 12:00:00 AM                    1   

   total_minutes_asleep  
0                   327  
1                   384  
2                   412  
3                   340  
4                   700  
Column Names: id                       int64
timestamp               object
total_sleep_records      int64
total_minutes_asleep     int64
dtype: object
The sum of null values are: id                      0
timestamp               0
total_sleep_records     0
total_minutes_asleep    0
dtype: int64
Count of cells BEFORE dropping null: 1652 

Count of cells AFTER dropping null: 1652 

------------------------------------------------------------------
Reduce Memo