# core

> Fill in a module description here

In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import pandas as pd
from pulsepoint.core import *
from typing import Union, Callable
import pandas as pd

## Processing Data

#### Filterting Input Data:

We will need to be able to filter the input data to fit our testing needs. `_filter_dataframe` is a function to do this that takes in a pandas Dataframe and a set of filters.

In [None]:
#| export
def _filter_dataframe(df,     # A pandas DataFrame
                     filters, # dictonary or list of dictionaries
                     )-> pd.DataFrame:
    """Filter a DataFrame using a dictionary or a list of dictionaries with multiple filter conditions.
    
    Filter Examples:
    You can pass in a single value like {"State":"Wisconsin"}.
    You can also pass in a list {"Cities":["La Crosse","Madison","Eau Claire","Milwaukee"]}
    
    """
    
    if isinstance(filters, dict): filters = [filters]
    
    for filter_dict in filters:
        for column, value in filter_dict.items():

            if isinstance(value, list):      df = df[df[column].isin(value)]
            else:                            df = df[df[column] == value]
    
    return df

#### Removing Dimensions with few Observations:

In [None]:
#| export
def _rm_small_dims(df,threshold:int):
    """Remove Dimensions that have less than N observations"""
    val_drop = list(df['unique_id'].value_counts()[df['unique_id'].value_counts() < threshold].index)
    df = df[~df['unique_id'].isin(val_drop)]
    return df

#### Check Names and Data Types

In [None]:
#| export

def _name_type_check(df,dimension,date_col):
    """Check datatypes and names of columns"""
    if dimension: 
        df = df.rename(columns={date_col: 'ds', dimension: 'unique_id'})
    else:
        df = df.rename(columns={date_col: 'ds'})
        df['unique_id'] = 'Total'
    
    if df['y'].dtype != 'float64': df['y'] = df['y'].astype(float)

    return df

#### Process Metric Column:

In [None]:
#| export

def _process_metric_col(df,metric_col):

    if callable(metric_col): df['y'] = metric_col(grpd_df)
    else:
        if metric_col in df.columns: df = df.rename(columns={metric_col: 'y'})
        else: raise ValueError(f"metric_col '{metric_col}' not found in the dataframe columns.")
    
    return df

#### Putting Everthing together: `_process_data`

In [None]:
#| export

def _process_data(
    path: str, 
    dimension: str = None,
    date_col: str = 'ds', 
    metric_col: Union[str, Callable] = 'y',
    filters: list[dict] = None,
    sz_threshold = 50):
    """Filters and aggregates data"""

    df = pd.read_feather(path)
    
    if dimension and isinstance(dimension, str): idxs = [dimension, date_col]
    else:                                        idxs = [date_col]

    if filters: df = _filter_dataframe(df, filters)

    num_cols = list(df.select_dtypes(include=['int64', 'float64', 'int32', 'float32']).columns)

    if len(num_cols) == 0: raise ValueError(f"No numerical columns found. \nThe {metric_col} column(s) should be of type int or float")

    grpd_df = pd.pivot_table(df, values=num_cols, index=idxs, aggfunc='sum').reset_index()
    agg_df = _process_metric_col(grpd_df, metric_col) 
    fnl_df = _name_type_check(agg_df,dimension,date_col)
    fnl_df = _rm_small_dims(fnl_df,sz_threshold)

    return fnl_df

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()