# 🧃 Excel Preprocessing

### 📚 Libraries

In [None]:
import json
from datetime import datetime

import pandas as pd

### 🛼 Data Ingestion

In [None]:
rips_path = '../data/input/rips.xlsx'
transaction_df = pd.read_excel(rips_path, sheet_name='FACTURA')
users_df = pd.read_excel(rips_path, sheet_name='USUARIO')
appointments_df = pd.read_excel(rips_path, sheet_name='CONSULTA')
procedures_df = pd.read_excel(rips_path, sheet_name='PROCEDIMIENTOS')

In [None]:
rules_path = '../data/input/reglas.xlsx'
rules_df = pd.read_excel(rules_path, sheet_name='JSON')

### 🥋 Rules parsing

In [None]:
def parse_rules(rules_df):
    """Parse the rules from the rules dataframe into a dictionary."""
    rules = {}
    rules_df = rules_df.copy().map(
        lambda x: x.strip() if isinstance(x, str) else x
    )
    for idx in rules_df.index:
        df_table = rules_df.loc[idx, 'TABLA']
        table_map = {
            'FACTURA': 'transaction',
            'USUARIO': 'users',
            'CONSULTA': 'appointments',
            'PROCEDIMIENTOS': 'procedures',
        }
        try:
            table = table_map[df_table]
        except KeyError:
            raise ValueError(
                f"Table not recognized on row {idx + 2}: {df_dtype}"
            )

        field = rules_df.loc[idx, 'COLUMNA'].strip()
        if field == '':
            raise ValueError(
                f"Field name is empty on row {idx + 2}"
            )

        df_dtype = rules_df.loc[idx, 'TIPO DE DATO']
        dtype_map = {
            'entero': int,
            'decimal': float,
            'texto': str,
        }
        try:
            dtype = dtype_map[df_dtype]
        except KeyError:
            raise ValueError(
                f"Data type not recognized on row {idx + 2}: {df_dtype}"
            )

        df_null_treatment = rules_df.loc[idx, 'FORMATO NULO']
        if df_null_treatment == 'PROHIBIDO':
            can_null = False
            null_value = False
        else:
            can_null = True
            if df_null_treatment == 'BLANCO':
                null_value = ''
            elif df_null_treatment == 'NULO':
                null_value = None
            else:
                raise ValueError(
                    f"Null format invalid on row {idx + 2}: {df_null_treatment}"
                )

        df_date_format = rules_df.loc[idx, 'FORMATO FECHA']
        if pd.isna(df_date_format):
            date_format = None
        else:
            date_format = df_date_format
            try:
                now = datetime.now()
                datetime.strptime(now.strftime(date_format), date_format)
            except ValueError:
                raise ValueError(
                    f"Date format invalid on row {idx + 2}: {date_format}"
                )
        if table not in rules:
            rules[table] = {}
        rules[table].update({
            field: {
                'dtype': dtype,
                'can_null': can_null,
                'null_value': null_value,
                'date_format': date_format
            }
        })
    return rules

In [None]:
rules = parse_rules(rules_df)
rules

### 🎛️ RIPS processing

In [None]:
def process_datetime(timestamp):
    has_time = timestamp.hour > 0 or timestamp.minute > 0
    if has_time:
        ts_str = timestamp.strftime('%Y-%m-%d %H:%M:%S')
    else:
        ts_str = timestamp.strftime('%Y-%m-%d')
    return ts_str


def create_transaction(transaction_df):
    try:
        transaction = transaction_df.loc[0].to_dict()
        for key in transaction:
            if pd.isnull(transaction[key]):
                transaction[key] = ""
    except (ValueError, KeyError):
        transaction = {col: "" for col in transaction_df.columns}
    return transaction


def create_user(users, index=0):
    user = users.loc[index].to_dict()
    for key in user:
        if pd.isnull(user[key]):
            user[key] = ""
            continue
        if 'fecha' in key.lower():
            user[key] = process_datetime(user[key])
    return user


def create_appointment(appointments, index=0):
    appointment = appointments.loc[index].to_dict()
    for key in appointment:
        if pd.isnull(appointment[key]):
            appointment[key] = ""
            continue 
        if 'fecha' in key.lower():
            appointment[key] = process_datetime(appointment[key])
    return appointment


def create_procedure(procedures, index=0):
    procedure = procedures.loc[index].to_dict()
    for key in procedure:
        if pd.isnull(procedure[key]):
            procedure[key] = ""
            continue
        if 'fecha' in key.lower():
            procedure[key] = process_datetime(procedure[key])
    return procedure

In [None]:
main_dict = create_transaction(transaction_df)

main_dict['usuarios'] = []
for i in range(len(users_df)):
    user = create_user(users_df, i)
    user['servicios'] = {}
    main_dict['usuarios'].append(user)

In [None]:
id_key = 'DocumentoIdentificacionUS'
id_map = {id_key: 'numDocumentoIdentificacion'}

for i in range(len(appointments_df)):
    appointment = create_appointment(appointments_df, i)
    user_id = appointment[id_key]
    del appointment[id_key]
    
    for user in main_dict['usuarios']:
        if user[id_map[id_key]] == user_id:
            if 'consultas' not in user['servicios']:
                user['servicios'].update({'consultas': []})
            user['servicios']['consultas'].append(appointment)
            break

In [None]:
for i in range(len(procedures_df)):
    procedure = create_appointment(procedures_df, i)
    user_id = procedure[id_key]
    del procedure[id_key]

    for user in main_dict['usuarios']:
        if user[id_map[id_key]] == user_id:
            if 'procedimientos' not in user['servicios']:
                user['servicios'].update({'procedimientos': []})
            user['servicios']['procedimientos'].append(procedure)
            break

In [None]:
print(json.dumps(main_dict, indent=2))

In [None]:
with open('output.json', 'w') as f:
    json.dump(main_dict, f, indent=4)