## Notebook-Interface for controlling the preporcessing part

### TODOs

1. Make variable summarizations for both datasets
2. send summarizations to lilian and dana
3. create two datbase tables including indices and push both tables in the database
4. write interface to use a list of variables, construct a query and gives back the resulting pandas frame
5. test the interface
6. data wrangling methods for pivoting, remove outliers, and data aggregation
7. provide interface with sample query and sample preprocessing in a separte notebook
8. document and cleanUp
9. send to niklas, merge to main branch
10. discuss further steps

In [None]:
# Link to github repo:

import os
import json
import datetime
import csv

import pandas as pd
import gzip
from tqdm.autonotebook import tqdm


'''
%load_ext lab_black
%matplotlib inline
'''
%load_ext autoreload
%autoreload 2

import sys
sys.path.append("../../")


from data_io import File_IO, Database_IO
from data_wrangling import  Data_Wrangling
from data_summarization import Data_Summarization
from setup_config import Setup_Config

## 1. Setup

In [None]:
setup = Setup_Config('config.ini') # loads a setup file with variables in .ini format
# the .ini file contains e.g. database connections and other settings.
#### ATTENTION: make sure to add this file to successfully to .gitignore to make sure it dont become pushed to the public repo.

file_io = File_IO() # handles input and output operations

data_wrangling = Data_Wrangling(data_io=file_io) # includes all the transformations
data_summarizations = Data_Summarization(data_io=file_io)



# if db not usage not wished, unccoment temporarly
database_io = Database_IO(
    host_ip=setup.config.db.host_ip, 
    port=setup.config.db.port, 
    db_user=setup.config.db.db_user, 
    db_pw=setup.config.db.db_pw, 
    db_name=setup.config.db.db_name) # use default postgres db


## 2. Merge data in one csv file per patient

In [None]:
# Approach: do it patient wise (unpack ziped files in a temp folder, summarize them, delete temp folder) -> saves storage, makes it work stepwise

# the origin folder with the unprocessed, raw data
data_set_name = 'n=101_OPENonOH_07.07.2022'
folder_raw_data =  os.path.join(setup.config.files.data_root, *['raw', data_set_name])
folder_patients = file_io.getSubfolders(folder_raw_data)

# the folder to save the preprocessed csv files
folder_merged_data =  os.path.join(setup.config.files.data_root, *['merged', data_set_name])

# a temporary directory to unzip data in without modifieng the raw dataset
folder_temp = os.path.join(setup.config.files.data_root, *['temp'])

allready_done_ids = [ file_io.get_file_or_folder_name(path=filename).split('.csv')[0] 
    for filename in file_io.find_files_with_ending(folder_merged_data, ending='.csv') ]

for patient_folder in folder_patients:

    patient_id =  file_io.get_file_or_folder_name(path=patient_folder)
    if patient_id in allready_done_ids: continue

    print(patient_id)

    # 1. unzip all gz, tar, zip files and copy to a temp dir    
    file_io.unzip_recursively(root_path=patient_folder, root_path_copy=folder_temp)
    
    # 2. now get all json files within the temp dir
    json_files = file_io.find_files_with_ending(path=folder_temp, ending='.json')

    # 3. Merge all json files in one file of predefined format
    #folder_name = patient_prefix + file_io.get_file_or_folder_name(path=patient_folder)
    patients_jsons_dict = {patient_id : json_files} # use the patient_id as the folder name
    #print(patients_jsons_dict)

    if not os.path.exists(folder_merged_data): os.makedirs(folder_merged_data)  # if not exists create dir before
    
    # the outpath becomes combined with the patients identifier passed as key to the dict
    # we discovered, that the startdate and timeasseconds are massively underpresented and drop them in the first run
    # code could also be modified in a way to include them in key value store format like the other values
    data_wrangling.group_patients_csv_data(patients_jsons_dict=patients_jsons_dict, outpath=folder_merged_data, 
                                            number_threads = 1, column_drop_list = ['startdate', 'timeasseconds']) 

In [None]:
#frame[~frame[['startdate','timeasseconds']].isnull().any(axis=1)]
#frame[~frame[['starttime']].isnull().any(axis=1)]
#frame['timeasseconds'].unique()

## 3. Create Summarization of data (based on merged csv files)

In [None]:
# 3. use the merged files in predefined csv format calculate some statistics

folder = '/Users/me/Desktop/Code/VS-Projects/charite/OpenHumansDataTools/data/merged/n=101_OPENonOH_07.07.2022'

file_names = file_io.get_csv_files(folder=folder)

# summarization part in files
#outpath='/Users/me/Desktop/Code/VS-Projects/charite/OpenHumansDataTools/temp/summarizations'
#data_summarizations.summarize_data(csv_files=file_names, outpath=outpath)

## 4. Insert data in postgres database
Before a postgres db must be created on a server or localhost.
The database must be specified in the config.ini file then.
Test and activate the database connection in the settings section above.
Assuming the database connection is working properbly, the following code creates new database tables, indices and inserts csv data in the tables.

In [None]:
# 5. insert data from csv files in new database table

# 1. get csv file paths
# 2. create new db table
# 2b optional check if columns are in all csv files the same as well as the type ...
# 3. insert data from all csv files recognized in the given file path to db


# 1. get csv file paths
file_paths = file_io.get_csv_files(folder=folder)

# choose a name for the target table, where the data will be saved (in current schema of the database specified in the config.ini file)
target_table_name = 'open_uploaded_all'

# 2. create new db table
### creating a new database table, assuming typs are the same in all frames

# create statically saves some time instead of dynamically getting column types
#first_frame = file_io.read_csv_pandas(file_paths[0])
#type_dict = database_io.get_df_column_types(first_frame) # dictionary of recognized columns and types

# path,value,value_str,starttime,startdate,duration,isValid,timeasseconds,patient_id
target_typ_dict= {
    'path': str,
    'value': float,
    'value_str': str,
    'starttime': datetime.datetime,
    'startdate': datetime.datetime,
    'duration': float,
    'isValid': bool,
    'timeasseconds' : int,
    'patient_id': int
}

database_io.create_new_db_table(table_name=target_table_name, type_dict=target_typ_dict)

# creates indices, which enable faster filtering of the database tables, e.g. with sql commands
# TODO: If this function runs again, delete existsing index and overwrite. Currently we have to delete the db table manually before or recreate the index.
database_io.create_new_index(table_name=target_table_name, index_name='open_uploaded_all_path', index_cols=['path'] )
database_io.create_new_index(table_name=target_table_name, index_name='open_uploaded_all_patient_id', index_cols=['patient_id'] )

In [None]:
import numpy as np

'''
def convert_dataframe_type(dataframe, target_typ_dict):
    """ Converts the columns of a pandas dataframe to the specified types.
    
    Args:
        dataframe (pd.DataFrame): The pandas dataframe to be converted.
        target_typ_dict (dict): The dictionary containing the target column type information.
        Custom procedure for datetime objects.
    """
    for col, typ in target_typ_dict.items():
        #dataframe[col] = dataframe[col].astype(typ) if typ != datetime.datetime else pd.to_datetime(dataframe[col])
        #dataframe[col] = dataframe[col].replace([np.inf, -np.inf], None).astype(typ) if typ != datetime.datetime else pd.to_datetime(dataframe[col])

        dataframe[col] = dataframe[col].replace((np.nan, ''), (None, None)).astype(typ) if typ != datetime.datetime else pd.to_datetime(dataframe[col])
        
    return dataframe
'''


def convert_datatypes(target_typ_dict, df):    
    """
    This function converts the columns of a given pandas dataframe to their corresponding datatypes, 
    as specified in a given target_typ_dict. If a conversion is not possible, the value is replaced by None.
    """
    for key, value in target_typ_dict.items():
        if value == str:
            df[key] = df[key].astype(str)
        elif value == float:
            df[key] = df[key].astype(float)
        elif value == datetime.datetime:
            df[key] = pd.to_datetime(df[key], errors="coerce")
        elif value == bool:
            df[key] = df[key].astype(bool)
        elif value == int:
            df[key] = pd.to_numeric(df[key], downcast="integer", errors="coerce")
    return df

In [None]:
first_frame = file_io.read_csv_pandas(file_paths[0])
first_frame.head()

In [None]:
frame = convert_datatypes(target_typ_dict, first_frame)

In [None]:
frame.head(5) # check if it is has worked properbly

In [None]:
# insert data to database
database_io.append_frame_to_sql_table(table_name='open_uploaded_all',data=frame.head(5))

In [None]:
# 3. insert data from all csv files recognized in the given file path to db
'''
for file_path in file_paths:
    frame = file_io.read_csv_pandas(file_path)
    database_io.append_frame_to_sql_table(table_name=target_table_name,data=frame)

for file_path in file_paths:
    frame = file_io.read_csv_pandas(file_path)
    type_dict = database_io.get_df_column_types(first_frame)
    print(type_dict)
'''    

In [None]:
# 4. TODO: merge the summarization files to one summarization file

folder = '/Users/me/Desktop/Code/VS-Projects/charite/OpenHumansDataTools/temp/summarizations'
file_paths = file_io.get_csv_files(folder=folder)

frames = []
for file_path in file_paths: frames.append( file_io.read_csv_pandas(file_path) )
merged_frame = data_summarizations.merge_pandas_frames(frames)

In [None]:
outpath='/Users/me/Desktop/Code/VS-Projects/charite/OpenHumansDataTools/temp/summarizations/summarization.csv'
file_io.pandas_to_csv(frame=merged_frame, outpath=outpath)

## 5. What is next?

In the next additional notebook we provide an user interface to efficiently use the database in order to filter the required paths out.

In [None]:
# 5. database setup

# Next TODO s: Creeate postgres DB (Done)
# Implement data io to read/write from to db
# filter and insert data into db
# create indices to optimize performance and optimize docker postgres performance

# provide an interface to access the data


In [None]:
# 5. Filter relevant data based on txt file path selection and insert in postgres db table

In [None]:
# 6. use csv file or database to access data

In [None]:
# 7. transform table in pivoted cleaned and accessable table (in postgres with login)

## Playground

Code snipts here are work in progress.

In [None]:
# 4. merge the summarizations

In [None]:
#merged_stats = merge_csv_files(stat_file_names)
#merged_stats

### Playgorund - Chat GPT Snippets

In [None]:
import numpy as np

aggregation = {
    "value_count": "sum", 
    "mean": lambda x: np.sum(x)/np.sum(merged_stats["value_count"]),
    "max": "max",
    "min": "min",
    "mean_duration": "mean"
}

df_aggregated = merged_stats.groupby("path").agg(aggregation)
df_aggregated

#return df_aggregated

In [None]:

# group by path and aggregate
out = df.groupby('path').agg({'value_count': 'sum', 'mean': lambda x: sum(x)/sum(df['value_count']),
                              'max': 'max', 'min': 'min', 'mean_duration': 'mean'})

In [None]:
df = pd.read_csv(file_names[0])

In [None]:
import numpy as np
import pandas as pd

# define list of file names
csv_files = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']

# create empty dataframe
merged_df = pd.DataFrame()

# loop through files
for csv in csv_files:
    # read csv
    df = pd.read_csv(csv)
    
    # calculate aggregates
    df['mean'] = df['mean'] * df['value_count']
    df['min'] = df['min'] * df['value_count']
    df['max'] = df['max'] * df['value_count']
    df['mean_duration'] = df['mean_duration'] * df['value_count']
    
    # group by path
    df_grouped = df.groupby('path').sum()
    
    # calculate new aggregates
    df_grouped['mean'] = df_grouped['mean'] / df_grouped['value_count']
    df_grouped['min'] = df_grouped['min'] / df_grouped['value_count']
    df_grouped['max'] = df_grouped['max'] / df_grouped['value_count']
    df_grouped['mean_duration'] = df_grouped['mean_duration'] / df_grouped['value_count']
    
    # append to merged dataframe
    merged_df = pd.concat([merged_df, df_grouped])
    
# drop duplicates and reset index
merged_df = merged_df.drop_duplicates().reset_index(drop=True)

# display dataframe
print(merged_df)