In [1]:
%load_ext watermark
import pandas as pd
import numpy as np
from typing import Type, Optional, Callable
from typing import List, Dict, Union, Tuple

from review_methods_tests import collect_vitals, find_missing, find_missing_loc_dates
from review_methods_tests import use_gfrags_gfoams_gcaps, make_a_summary

import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.colors
from matplotlib.colors import LinearSegmentedColormap, ListedColormap

import setvariables as conf_
import methods_iqaasl as mi

In [2]:
def slice_data_by_date(data: pd.DataFrame, start: str, end: str):
    mask = (data.date >= start) & (data.date <= end)
    return data[mask]



# format_kwargs = dict(precision=2, thousands="'", decimal=",")

# this defines the css rules for the note-book table displays
# header_row = {'selector': 'th:nth-child(1)', 'props': f'background-color: #FFF; font-size:12px; text-align:left;'}
# even_rows = {"selector": 'tr:nth-child(even)', 'props': f'background-color: rgba(139, 69, 19, 0.08);'}
# odd_rows = {'selector': 'tr:nth-child(odd)', 'props': 'background: #FFF;'}
# table_font = {'selector': 'tr', 'props': 'font-size: 10px;'}
# table_data = {'selector': 'td', 'props': 'padding:4px; font-size:12px;'}
# table_css_styles = [even_rows, odd_rows, table_font, header_row, table_data]

def aggregate_dataframe(df: pd.DataFrame,
                        groupby_columns: List[str],
                        aggregation_functions: Dict[str, Union[str, callable]],
                        index: bool = False) -> pd.DataFrame:
    """
    Aggregate specified columns in a Pandas DataFrame using given aggregation functions.

    Args:
        df (pd.DataFrame): The input DataFrame.
        groupby_columns (List[str]): List of column names to group by.
        aggregation_functions (Dict[str, Union[str, callable]]): 
            A dictionary where keys are column names to aggregate, 
            and values are either aggregation functions (e.g., 'sum', 'mean', 'max', 'min')
            or custom aggregation functions (callable functions).
        index (bool, optional): Whether to use the groupby columns as an index.
            Default is False.

    Returns:
        pd.DataFrame: A new DataFrame with aggregated values.
    """
    grouped = df.groupby(groupby_columns, as_index=index).agg(aggregation_functions)
    
    return grouped
    
def merge_dataframes_on_column_and_index(left_df: pd.DataFrame,
                                         right_df: pd.DataFrame,
                                         left_column: str,
                                         how: str = 'inner',
                                         validate: str = 'many_to_one') -> pd.DataFrame:
    """
    Merge two DataFrames where the left DataFrame is merged on a specified column and 
    the right DataFrame is merged on its index.

    Args:
        left_df (pd.DataFrame): The left DataFrame to be merged.
        right_df (pd.DataFrame): The right DataFrame to be merged on its index.
        left_column (str): The column in the left DataFrame to merge on.
        how (str, optional): The type of merge to be performed ('left', 'right', 'outer', or 'inner'). 
            Default is 'inner'.
        validate (str, optional): Whether to perform merge validation checks. 
            Default is 'many_to_one'.

    Returns:
        pd.DataFrame: A new DataFrame resulting from the merge operation.
    """
  
    merged_df = left_df.merge(right_df, left_on=left_column, right_index=True, how=how)
    return merged_df

def get_top_x_records_with_max_quantity(df: pd.DataFrame, quantity_column: str, id_column: str, x: int):
    """
    Get the top x records with the greatest quantity and their proportion to the total from a DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame.
        quantity_column (str): The name of the quantity column.
        id_column (str): The name of the ID column.
        x (int): The number of records to return.

    Returns:
        A data frame
    """
    # Sort the DataFrame by the quantity column in descending order, take the top x records, and select the ID column
    top_x_records = df.nlargest(x, quantity_column)[[id_column, quantity_column]]
    top_x_records["%"] = top_x_records[quantity_column]/top_x_records[quantity_column].sum()
    
    return top_x_records[[id_column, quantity_column, "%"]]



def calculate_rate_per_unit(df: pd.DataFrame,
                            objects_to_check: List[str],
                            column_of_interest: str = "code",
                            groupby_columns: List[str] = ['code'],
                            method: Dict[str, str] = {"pcs_m": "median", "quantity":"sum"},
                            )-> pd.DataFrame:
    """
    Calculate the rate of occurence of object(s) for a given unit measurement. Adds the label
    'all' to each record.

    Args:
        df (pd.DataFrame): The input DataFrame with columns 'sample,' 'object,' and 'quantity.'
        objects_to_check (List[str]): The list of objects to calculate proportions for.
        column_of_interest (str): The column label of the objects being compared.
        groupby_columns Dict[str]: The columns used for the aggregation.
        method (Dict[str]): Dictionary specifying the aggregation functions for the unit_measurement.

    Returns:
        pd.DataFrame: A dataframe where index is column_of_interest and the value column is the rate
            and the label is 'all'.
    """
    # Filter the DataFrame to include rows where 'object' is in 'objects_to_check'
    filtered_df = df[df[column_of_interest].isin(objects_to_check)]

    # Calculate the total quantity for each object
    object_rates = filtered_df.groupby(groupby_columns, as_index=False).agg(method)

    # Calculate the proportion for each object
    rates = object_rates[[column_of_interest, *method.keys()]].set_index(column_of_interest, drop=True)
    rates["label"] = "all"    

    return rates


# fail rate: quantity > 0
def count_objects_with_positive_quantity(df: pd.DataFrame, value_column: str = 'quantity', object_column: str = 'code') -> Dict[str, int]:
    """
    Count how many times each object had a quantity greater than zero in a DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame with columns 'sample,' 'object,' and 'quantity.'

    Returns:
        pd.Series: A Series with the count of positive quantity occurrences for each object.
    """
    # Filter the DataFrame to include rows where quantity is greater than zero
    positive_quantity_df = df[df[value_column] > 0]
    no_count_df = df[(df[value_column] == 0)]

    # Count the occurrences of positive quantities for each object
    object_counts = positive_quantity_df[object_column].value_counts()
    failed = object_counts/df.loc_date.nunique()

    # identify the objects with a zero count
    no_counts = no_count_df[object_column].value_counts()
    zeroes = no_counts[~no_counts.index.isin(object_counts.index)]
    zeroes.loc[:] = 0

    return pd.concat([failed, zeroes])

# pieces per merter for a set of data
def rate_per_unit_cumulative(df: pd.DataFrame, groupby_columns: List, object_labels: List, objects: List, agg_methods: Dict)-> pd.DataFrame:
    """
    Calculate cumulative rates per unit for specific objects and aggregation methods.

    This function takes a DataFrame and calculates cumulative rates per unit based on
    the specified groupby columns, object labels, objects of interest, and aggregation methods.

    Args:
        df (pd.DataFrame): The input DataFrame containing data for analysis.
        groupby_columns (List): List of columns to group by in the DataFrame.
        object_labels (List): List of labels to identify objects of interest.
        objects (List): List of objects for which cumulative rates are calculated.
        agg_methods (Dict): Dictionary specifying aggregation methods for calculating rates.

    Returns:
        pd.DataFrame: A DataFrame containing the cumulative rates per unit.

    Example:
        groupby_columns = ['Region', 'Year']
        object_labels = ['Object A', 'Object B']
        objects = ['A', 'B']
        agg_methods = {'Value': 'sum', 'Count': 'count'}

        cumulative_rates = rate_per_unit_cumulative(df, groupby_columns, object_labels, objects, agg_methods)
    """
    parent_summary = aggregate_dataframe(df, groupby_columns, agg_methods)
    parent_boundary_summary = calculate_rate_per_unit(parent_summary, object_labels, objects[0], objects)
    parent_boundary_summary.reset_index(drop=False, inplace=True)

    return parent_boundary_summary


def aggregate_boundaries(df: pd.DataFrame, unit_columns: list, unit_agg: dict, boundary_labels: list, boundary_columns: list, group_agg: dict)-> pd.DataFrame:
    """
    Aggregate data from a dataframe by boundaries and groups.

    Aggregates a dataframe in two steps. First, it performs
    aggregation at the 'unit' level defined by 'unit_columns' and 'unit_agg' to obtain
    test statistics. Then, it aggregates these 'unit' statistics further at the
    'boundary' level defined by 'boundary_labels' and 'boundary_columns', and computes
    the test statistics for each boundary.

    Args:
        df (pd.DataFrame): The input DataFrame containing data to be aggregated.
        unit_columns (list): List of columns for 'unit' level aggregation.
        unit_agg (dict): Dictionary specifying the aggregation functions for 'unit' level.
        boundary_labels (list): List of boundary labels to define 'boundaries' for further aggregation.
        boundary_columns (list): List of columns for 'boundary' level aggregation.
        group_agg (dict): Dictionary specifying the aggregation functions for 'boundary' level.

    Returns:
        pd.DataFrame: A DataFrame containing aggregated data at the 'boundary' level with
        additional 'label' column indicating the boundary label. 
    """
    

    unit_aggregate = aggregate_dataframe(df, unit_columns, unit_agg)
    boundary_summaries = []
    for label in boundary_labels:
        boundary_mask = unit_aggregate[unit_columns[0]] == label
        boundary_aggregate = unit_aggregate[boundary_mask].groupby(boundary_columns, as_index=False).agg(agg_groups)
        boundary_aggregate['label'] = label
        boundary_summaries.append(boundary_aggregate)

    return pd.concat(boundary_summaries)

def color_gradient(val, cmap: ListedColormap = None, min: float = 0.0, max: float = .9):
    """
    Apply a color gradient to a numerical value for cell styling.

    This function takes a numerical value 'val' and applies a color gradient based on the provided
    colormap ('cmap') and the specified range defined by 'min' and 'max'. It returns a CSS style
    for cell background color.

    Args:
        val (float): The numerical value to be colored using the gradient.
        cmap (ListedColormap, optional): The colormap to use for the color gradient. Defaults to None.
        min (float, optional): The minimum value of the data range. Defaults to 0.0.
        max (float, optional): The maximum value of the data range. Defaults to 1.0.

    Returns:
        str: A CSS style string for cell background color and text color.

    Example:
        # Apply a color gradient using a custom colormap 'cmap' to the DataFrame
        df.style.applymap(color_gradient, cmap=my_colormap, min=0.0, max=100.0)
    """
    # print(type(cmap))
    # Normalize the value to a range [0, 1] 
    # min, max should be the min max for the
    # data frame in question
    normalized_val = (val - min) / max
    
    r, g, b, a = cmap(normalized_val)

    
    # Calculate the color based on the normalized value    
    hex_color = f"rgba({int(r*255)},{int(g*255)},{int(b*255)}, .5)"
    
       
    # Return the CSS style with the background color
    return f'background-color: {hex_color}; color:black'


def boundary_summary(parent_boundary: pd.DataFrame, boundary_summary: pd.DataFrame, object_columns: List, unit: str)-> pd.DataFrame:
    """
    Create a boundary summary DataFrame based on parent and boundary summaries.

    This function combines parent and boundary summaries to create a consolidated boundary summary
    DataFrame. The aggregation is based on the specified object columns and the 'unit' of interest.

    Args:
        parent_boundary (pd.DataFrame): The parent boundary summary DataFrame.
        boundary_summaries (pd.DataFrame): The boundary summaries for individual objects.
        object_columns (List): List of columns identifying the objects.
        unit (str): The unit of interest for aggregation.

    Returns:
        pd.DataFrame: A boundary summary DataFrame that combines parent and individual object summaries.

    Example:
        parent_boundary = ...
        boundary_summaries = ...
        object_columns = ['Object']
        unit = 'pcs_m'

        boundary_result = boundary_summary(parent_boundary, boundary_summaries, object_columns, unit)
    """
    boundary_limits = pd.concat([parent_boundary, boundary_summary])
    objects = boundary_limits[object_columns[0]].nunique()
    boundaries = boundary_limits.label.nunique()
    if objects >= boundaries:
        b = boundary_limits.pivot(index=object_columns[0], columns="label", values=unit)
        b = b[[*boundary_summary.label.unique(), *parent_boundary.label.unique()]]
    else:
        b = boundary_limits.pivot(columns=object_columns[0], index="label", values=unit)
        
    return b


def translate_word(X: str, amap: pd.DataFrame, lan: str):
    """
    Translate a word or phrase using a language mapping DataFrame.

    This function takes a word or phrase 'X' and attempts to translate it into another language
    specified by 'lan' using a language mapping DataFrame 'map'. If the word is found in the index
    of the mapping DataFrame, the translation is returned; otherwise, the original word is returned.

    Args:
        X (str): The word or phrase to be translated.
        map (pd.DataFrame): A DataFrame containing language mappings.
        lan (str): The target language code for translation.

    Returns:
        str: The translated word or phrase, or the original word if not found in the mapping.

    Example:
        # Create a DataFrame for language mapping
        language_map = pd.DataFrame({'English': ['apple', 'banana', 'cherry'],
                                    'French': ['pomme', 'banane', 'cerise']})

        # Translate a word into French
        translated_word = translate_word('apple', language_map, 'French')
    """    
    if X in amap.index:
        return amap.loc[X,  lan]
    else:
        return X

def capitalize_index(X):
    return X.title()

def translate_for_display(df: pd.DataFrame, amap: pd.DataFrame, lan: str):
    """
    Translate column names and index labels of a DataFrame for display.

    This function takes a DataFrame 'df' and translates its column names and index labels using a
    language mapping DataFrame 'map' for display in a specified language 'lan'. The translated
    column names are used as new column names in the DataFrame, and the index labels are replaced
    with their translations.

    Args:
        df (pd.DataFrame): The input DataFrame for translation.
        map (pd.DataFrame): A DataFrame containing language mappings.
        lan (str): The target language code for translation.

    Returns:
        pd.DataFrame: The DataFrame with translated column names and index labels for display.

    Example:
        # Create a DataFrame to be translated
        data = {'apple': [1, 2, 3], 'banana': [4, 5, 6]}
        original_df = pd.DataFrame(data)

        # Create a language mapping DataFrame
        language_map = pd.DataFrame({'English': ['apple', 'banana'],
                                    'French': ['pomme', 'banane']})

        # Translate the column names and index labels for display in French
        translated_df = translate_for_display(original_df, language_map, 'French')
    """
    
    old_columns = df.columns    
    changed_c = [x for x in old_columns if x in amap.index]
        
    new_columns = {x: amap.loc[x, lan] for x in changed_c}
    df.rename(columns=new_columns, inplace=True)
    
    new_index = {x:translate_word(x, amap, lan) for x in df.index}
    df['new_index'] = df.index.map(lambda x: new_index[x])
    df.set_index('new_index', drop=True, inplace=True)
    df.index.name = None
    df.columns.name = None
    
    return df

def translated_and_style_for_display(df, amap, lan, gradient: bool = True):
    """
    Translate, style, and format a DataFrame for display.

    This function translates column names and index labels, applies styling, and optionally
    adds a color gradient to a DataFrame to prepare it for display in a specified language 'lan'.
    
    Args:
        df (pd.DataFrame): The input DataFrame to be translated and styled.
        map (pd.DataFrame): A DataFrame containing language mappings.
        lan (str): The target language code for translation.
        gradient (bool, optional): Whether to apply a color gradient to the DataFrame. Defaults to True.

    Returns:
        Styler: A styled DataFrame ready for display with translated labels and styling.

    Example:
        # Create a DataFrame to be translated and styled
        data = {'apple': [1, 2, 3], 'banana': [4, 5, 6]}
        original_df = pd.DataFrame(data)

        # Create a language mapping DataFrame
        language_map = pd.DataFrame({'English': ['apple', 'banana'],
                                    'French': ['pomme', 'banane']})

        # Translate, style, and format the DataFrame for display in French
        styled_df = translated_and_style_for_display(original_df, language_map, 'French', gradient=True)
    """
    d = translate_for_display(df, amap, lan)
    d = d.style.format(**format_kwargs).set_table_styles(table_css_styles)
    if gradient:
        d = d.applymap(color_gradient, cmap=newcmp)
    return d.format_index(str.title, axis=1).format_index(str.title, axis=0)


def display_tabular_data_by_column_values(df, column_one: dict, column_two: dict, index: str):
    """
    Display tabular data based on column values.

    This function filters a DataFrame 'df' to include rows where either 'column_one' or 'column_two'
    meet specified conditions. The resulting DataFrame is then set to have 'index' as the index, and
    the index name is removed for cleaner tabular display.

    Args:
        df (pd.DataFrame): The input DataFrame containing tabular data.
        column_one (dict): A dictionary specifying the column and value condition for 'column_one'.
        column_two (dict): A dictionary specifying the column and value condition for 'column_two'.
        index (str): The column to be set as the index for the resulting DataFrame.

    Returns:
        pd.DataFrame: The filtered DataFrame with 'index' as the index and the index name removed.

    Example:
        # Create a sample DataFrame 'data_df'
        data_df = pd.DataFrame({'Name': ['Alice', 'Bob', 'Charlie'],
                                'Age': [25, 30, 35],
                                'Salary': [50000, 60000, 70000]})

        # Define filtering conditions for 'Age' and 'Salary'
        column_one = {'column': 'Age', 'val': 30}
        column_two = {'column': 'Salary', 'val': 65000}

        # Display filtered tabular data by 'Name' where either 'Age' or 'Salary' meets the conditions
        filtered_data = display_tabular_data_by_column_values(data_df, column_one, column_two, 'Name')
    """
    d = df[(df[column_one["column"]] >= column_one["val"]) | (df[column_two["column"]] >= column_two["val"])].copy()
    d.set_index(index, inplace=True, drop=True)
    d.index.name = None       
    return d



def summary_of_parent_and_child_features(df: pd.DataFrame,
                                         cumulative_columns: List = None,
                                         boundary_labels: List = None,
                                         object_labels: List = None,
                                         object_columns: List = None,
                                         unit_agg: dict = None,
                                         unit_columns: List = None,
                                         agg_groups: dict = None)-> pd.DataFrame:
    """
    Generate a summary of parent and child features based on a DataFrame.

    This function computes a summary of parent and child features based on the provided DataFrame 'df'.
    It calculates cumulative values, aggregates boundary summaries, and generates a comprehensive summary
    DataFrame that includes both parent and child features.

    Args:
        df (pd.DataFrame): The input DataFrame containing data for analysis.
        cumulative_columns (List, optional): List of columns to be considered for cumulative values.
        boundary_labels (List, optional): List of labels for boundary summaries.
        object_labels (List, optional): List of labels for individual objects.
        object_columns (List, optional): List of columns identifying objects.
        unit_agg (dict, optional): Aggregation methods for unit summaries.
        unit_columns (List, optional): List of columns for unit summaries.
        agg_groups (dict, optional): Aggregation methods for boundary summaries.

    Returns:
        pd.DataFrame: A summary of parent and child features with comprehensive information.

    Example:
        # Define parameters for generating the summary
        cumulative_columns = ['quantity', 'total_weight']
        boundary_labels = ['Boundary 1', 'Boundary 2']
        object_labels = ['Object 1', 'Object 2']
        object_columns = ['object_id', 'object_name']
        unit_agg = {'quantity': 'sum', 'total_weight': 'mean'}
        unit_columns = ['unit_id', 'unit_name']
        agg_groups = {'quantity': 'sum', 'total_weight': 'mean'}

        # Generate the summary of parent and child features
        summary_df = summary_of_parent_and_child_features(data_df, cumulative_columns, boundary_labels,
                                                         object_labels, object_columns, unit_agg, unit_columns, agg_groups)
    """
                                            

    parent_boundary = rate_per_unit_cumulative(df, cumulative_columns, object_labels, object_columns, unit_agg)
    boundary_summaries = aggregate_boundaries(df, unit_columns, unit_agg, boundary_labels, object_columns, agg_groups)
    x = boundary_summary(parent_boundary, boundary_summaries, object_columns, "pcs_m")

    return x

In [3]:
def collect_survey_data_for_report(a_func: Callable = None, **kwargs)-> pd.DataFrame:
    
    if a_func is not None:
        return a_func(**kwargs)
    else:
        survey_files = conf_.survey_files
        data = mi.combine_survey_files(survey_files)
    return data

def collect_env_data_for_report(a_func: Callable = None, **kwargs)-> pd.DataFrame:
    
    if a_func is not None:
        return a_func(**kwargs)
    else:
        codes = pd.read_csv(conf_.code_data).set_index("code")
        beaches = pd.read_csv(conf_.beach_data).set_index("slug")
        land_cover = pd.read_csv(conf_.land_cover_data)
        land_use = pd.read_csv(conf_.land_use_data)
        streets = pd.read_csv(conf_.street_data)
        river_intersect_lakes = pd.read_csv(conf_.intersection_attributes)
        
    return codes, beaches, land_cover, land_use, streets, river_intersect_lakes

def language_maps(func: Callable = None, **kwargs):
    if func is not None:
        return func(**kwargs)
    else:
        maps = {k: pd.read_csv(v).set_index('en') for k, v in conf_.language_maps.items()}
        return maps

def check_for_top_label(alabel: str = None, df: pd.DataFrame = None, a_map: pd.DataFrame = None)-> pd.DataFrame:
    if alabel in df.columns:
        return df
    else:
        new_map = a_map[alabel]
        newdf = df.merge(new_map, left_on='slug', right_index=True, validate='many_to_one')
        return newdf
    
def use_parent_groups_or_gfrags(df, label: str = None, gfrags: bool = True, parent_group: bool = False, func: Callable = None, **kwargs)-> pd.DataFrame:
    
    if func is not None:
        return func(**kwargs)
    if gfrags and parent_group:
        d = use_gfrags_gfoams_gcaps(df, codes)
    if gfrags and not parent_group:
        d = use_gfrags_gfoams_gcaps(df, codes)
    
    # the surveys need to be aggregated to the object level
    # after changeing code names there will be duplicates on
    # the columns loc_date and code. Which is not allowed.
    groupby_cols = list(set([label, *conf_.code_result_columns]))
    d = aggregate_dataframe(d, groupby_cols, conf_.unit_agg)
    
    return d
def add_column_to_work_data(df, key: str = 'slug', feature: str = None, amap: pd.DataFrame = None)-> pd.DataFrame:
    
    d = df.merge(amap[feature], left_on=key, right_index=True, validate='many_to_one')
    
    return d

def add_columns_to_work_data(df, keys_features)-> pd.DataFrame:
    
    d = df.copy()
    
    for k_f in keys_features:
        d = add_column_to_work_data(d, key=k_f['key'], feature=k_f['feature'], amap=k_f['map'])
    
    d.reset_index(inplace=True, drop=True)
    return d

def report_data(a_start, df, add_columns: List = None, use_gfrags: bool = True):
    
    # the first input variable sets the limit ot the report
    # that means we are interested about the summary of this data
    # or something contained withing it. This variable is used
    # the reporting process
    top_label = [list(a_start.keys())[0], list(a_start.values())[0]]
    
    # slice the survey data by the provided date
    # do this straight away is save memory
    w_d = slice_data_by_date(df.copy(), start=a_start['start_date'], end=a_start['end_date'])
    
    # check for and add to the survey data the group
    # and label for this report if it is missing
    w_d = check_for_top_label(top_label[0], df=w_d, a_map=beaches)
    
    # use gfrags or add columns to the survey data
    # by default the feature_type and code groupname is
    # is added to the survey data.
    if use_gfrags:
        w_d = use_parent_groups_or_gfrags(w_d, label=top_label[0])
    
    if add_columns is not None:
        w_d = add_columns_to_work_data(w_d, add_columns)
    
    # this is the data for report
    w_df = w_d[w_d[top_label[0]].isin([top_label[1]])].copy()
    
    return top_label, a_start['language'], w_df, w_d


geo_h = conf_.geo_h

def categorize_work_data(df, labels, columns_of_interest: List[str] = geo_h, sample_id: str = 'loc_date'):
    # print(labels)
    
    data = df[df[labels[0]] == labels[1]].copy()
    
    summaries = columns_of_interest
    
    if labels[0] == columns_of_interest[-1]:
        summaries = columns_of_interest[:-2]
    if labels[0] == columns_of_interest[-2]:
        summaries = [*columns_of_interest[:-2], columns_of_interest[-1]]
    
    new_columns = list(set([sample_id, *summaries]))
    d = data[new_columns].copy()
    
    res = {}
    for an_attribute in new_columns:
        datt = d[an_attribute].unique()
        res.update({an_attribute: datt})
    
    res['samples'] = res.pop('loc_date')
    
    return {labels[1]:res}

def a_summary_of_one_vector(df, unit_columns, unit_agg, describe='pcs_m'):
    
    sample_totals = aggregate_dataframe(df, unit_columns, unit_agg)
    sample_summary = sample_totals[describe].describe()
    sample_summary["total"] = sample_totals.quantity.sum()
    sample_summary = pd.DataFrame(sample_summary)
    sample_summary[describe] = sample_summary[describe].astype(object)
    sample_summary.loc['count', describe] = int(sample_summary.loc['count', describe])
    sample_summary.loc['total', describe] = int(sample_summary.loc['total',describe])
    
    return sample_summary

# a report class

## basic requirements

1. define the limits of the request
   * temporal
   * geographic (includes features and parent boundaries)
   * object types
   * level of aggregation

2. define what codes are being used

The default setting is to combine all the fragmented plastics into one group (all sizes) and the same for fragmented expanded polystyrene and plastic bottle tops. This results in three codes that represent objects that are very similar. This topic has been addressed many times. These groups register not-trivial quantities at most surveys. However, the differentiation of these objects into their respective subgroups ie. plastic caps for drinnking v/s plastic caps for household cleaners is not considered a priority by all groups that have collected data in the past.

* Gfrags
* Gfoams
* Gcaps

3. define the reporting language

The reporting language can be either French, German or English. We would like italian but we have no resource for that service in Ticino.

__Note:__ The reporting language is only applied at the moment of display. The column names, feature labels and other underlying identifying criteria for the data remain unchanged. The column name definitions and translations are in the _random variables_ section.
   
From the testing_data_models note book it is shown that given the following set of variables summary reports and test statistics can be generated for any combination of data:

   * `df (pd.DataFrame)`: The input DataFrame containing data for analysis.
   * `cumulative_columns (List, optional)`: List of columns to be considered for cumulative values.
   * `boundary_labels (List, optional)`: List of labels for boundary summaries.
   * `object_labels (List, optional)`: List of labels for individual objects.
   * `object_columns (List, optional)`: List of columns identifying objects.
   * `unit_agg (dict, optional)`: Aggregation methods for unit summaries.
   * `unit_columns (List, optional)`: List of columns for unit summaries.
   * `agg_groups (dict, optional)`: Aggregation methods for boundary summaries.


### Work data

A report can be defined by providing the temporal and geographic bounds of interest. Below is the current method. 

```python

# request
canton = 'Bern'
start_date = '2019-01-01'
end_date = '2022-01-01'
language = 'fr'

# starting data, can be MySQL or NoSQL calls or API calls
# the three methods accept a callable as long as the output
# is pd.DataFrame 
c_l = language_maps()
surveys = collect_survey_data_for_report()
codes, beaches, land_cover, land_use, streets, river_intersect_lakes = collect_env_data_for_report()

# temporal and geographic boundaries
# user defined input in dictionary format
boundaries = dict(canton='Bern', language='fr', start_date='2019-01-01', end_date='2022-01-01')

# columns to be added to the survey data that are not stored with the survey data. Note that codes
# and beaches are part of the initial data. These operations can be performed either on the
# server side or the client side. The codes and beach data are small and can be stored on a 
# remote device. The same for data from any single region. If using a browser IndexedDB is very
# efficient for this type of operation. The index of <codes> contains the values of surveys.code
# and the index of <beaches> contains the values of surveys.slug
add_columns = [
    {'key':'code', 'feature':'groupname', 'map':codes},
    {'key':'slug', 'feature':'feature_type', 'map':beaches}
]

# the level and label of the report
# the language for display
# the data for the report and all other
# available data from the data range
top_label, language, w_df, w_di = report_data(boundaries, surveys, add_columns=add_columns)

# define the language map
l_map = c_l[language].copy()
w_df.head()
```

Which produces the following untranslated output.

In [4]:
# starting data, can be MySQL or NoSQL calls
# the three methods accept Callables, as long
# as the out put is pd.DataFrame
c_l = language_maps()
surveys = collect_survey_data_for_report()
codes, beaches, land_cover, land_use, streets, river_intersect_lakes = collect_env_data_for_report()

# temporal and geographic boundaries
# user defined input
boundaries = dict(canton='Valais', language='fr', start_date='2019-01-01', end_date='2022-01-01')

# columns to be added to the survey data
# not stored with the survey data. Note that codes
# and beaches are part of the initial data. The index of
# codes contains the values of surveys.code and the index
# of beaches contains the values of surves.slug
add_columns = [
    {'key':'code', 'feature':'groupname', 'map':codes},
    {'key':'slug', 'feature':'feature_type', 'map':beaches}
]

# the level and label of the report
# the language for display
# the data for the report and all other
# from the data range
top_label, language, w_df, w_di = report_data(boundaries, surveys, add_columns=add_columns)

# define the language map
l_map = c_l[language].copy()
w_df.head().style.set_table_styles(conf_.table_css_styles)

KeyError: "None of ['code'] are in the columns"

### Reporting categories

The first variable of the input is used to define the hierarchy of the report. For administrative purposes a vertical approach that reflects areas of responsibility is important. For estimating values the geographic/topographic attributes are more important.

The survey data is labeled for these purposes. The columns `parent_boundary`, `feature_type` and `feature_name` are the topographic features. 

1. `parent_boundary`: the name of the: river basin, catchment area, park, name of geograhphic region or other zone defined by swiss geo admin.
2. `feature_type`: lake, river or park
3. `feature_name`: the name of the lake, river or park

The `geo_h` array sets the order for reporting. Reports for cantons can contain subreports for all the values in the array, by default the cantonal results will reference the IQAASL report for threshold or prior results. Reports for cities will contain only geographic categories with reference to cantonal results.

```python


geo_h = ['parent_boundary', 'feature_type',  'feature_name','canton', 'city']


def categorize_work_data(df, labels, columns_of_interest: List[str] = geo_h, sample_id: str = 'loc_date'):
    # print(labels)
    
    data = df[df[labels[0]] == labels[1]].copy()
    
    summaries = columns_of_interest
    
    # if city is selected the available boundaries
    # are geographic. A city is in only one canton
    # if canton is selected then city becomes a category
    # for which a report can be produced    
    if labels[0] == columns_of_interest[-1]:
        summaries = columns_of_interest[:-2]
    if labels[0] == columns_of_interest[-2]:
        summaries = [*columns_of_interest[:-2], columns_of_interest[-1]]
    
    new_columns = list(set([sample_id, *summaries]))
    d = data[new_columns].copy()
    
    res = {}
    for an_attribute in new_columns:
        datt = d[an_attribute].unique()
        res.update({an_attribute: datt})
    
    res['samples'] = res.pop('loc_date')
    
    return {labels[1]:res}

# this categorizes the survey data into search terms
# the available data or reporting categories are retrieved
# by getting the length of the array for each category
# if the category is not present then the data is not available
parent_categories = categorize_work_data(w_df, top_label)
p_vals = parent_categories[boundaries[top_label[0]]]

# the type and number of reports available
reporting_categories = {k:len(v) for k, v in p_vals.items()}
reporting_categories
```

Which gives the following result:

In [None]:
# this categorizes the survey data into search terms
# the available data or reporting categories are retrieved
# by getting the length of the array for each category
# if the category is not present then the data is not available
parent_categories = categorize_work_data(w_df, top_label)
p_vals = parent_categories[boundaries[top_label[0]]]

# the type and number of reports available
reporting_categories = {k:len(v) for k, v in p_vals.items()}
reporting_categories

In [None]:
river_features = categorize_work_data(w_df[w_df.feature_type == p_vals['feature_type'][0]], top_label)
lake_features

In [None]:
break

In [None]:
# beaches.loc[['lavey-les-bains-2','lavey-les-bains'] , 'canton'] = 'Vaud'

In [None]:
# beaches.to_csv("data/end_process/codes.csv", index=True)

In [None]:
# the big picture


def identify_parent_child(label, geo_labels):
    child_ = geo_labels[label] + 1
    parent_ = geo_labels[label] - 1
    if child_ > max(geo_labels.values()):
        child_ = max(geo_labels.values())
    if parent_ < 0:
        parent_ = 0
    return child_, parent_
        
    

# print(geo_h[parent], parent, geo_h[child], child)


def display_summary_of_work_data(a_summary: dict = None, a_map: pd.DataFrame = None, not_translated: List[str] = ['samples'], **kwargs):
    
    d = a_summary.copy()
    # the first item in the array must be equal to sample
    # this eliminates the integer values from the translation
    # we can select on data type also
    anum = len(d[not_translated[0]])
    d.update({not_translated[0]: anum})
    
    t = list(d.keys())
    
    translate = [x for x in t if x not in not_translated]
    
    for label in translate:
        d.update({label: len(d[label])})
            
    return d

def translate_array(X, func: Callable = translate_word, **kwargs):
    return [func(x, **kwargs) for x in X]

geo_labels = dict(zip(geo_h, np.arange(len(geo_h))))

child, parent = identify_parent_child(top_label[0], geo_labels)

# parent_categories = categorize_work_data(w_df, top_label)
# canton_class = list(summary_canton.keys())
# p_categories = parent_categories[top_label[1]]

# summary_canton = display_summary_of_work_data(parent_categories[top_label[1]], a_map=c_l, lan='fr')

# summary_canton



In [5]:
codes

NameError: name 'codes' is not defined

In [None]:
parent_categories = categorize_work_data(w_df, top_label)
parent_categories[boundaries['canton']]

In [None]:

p_categories = parent_categories[top_label[1]]
p_categories[canton_class[0]]

In [None]:
p_categories[canton_class[1]]

In [None]:
p_categories[canton_class[2]]

In [None]:
p_categories[canton_class[3]]

In [None]:



children = w_df[geo_h[child]].unique()
parents = w_df[geo_h[parent]].unique()
chil_label = list(tuple(zip([*[geo_h[child]]*len(children)], children)))
p_labels = list(tuple(zip([*[geo_h[parent]]*len(children)], parents)))

print(child)
print(geo_h[child])
print(children)
print(parent)
print(geo_h[parent])
print(parents)

In [None]:
child_sum = {}
for alabel in p_labels:
    
    child_sum.update(categorize_work_data(w_df, alabel))

display_sums = {x:display_summary_of_work_data(child_sum[x], a_map=c_l, lan='fr') for x in child_sum.keys()}

pd.DataFrame(display_sums)

In [None]:
summary_of_child_data = {}
pair_label = list(tuple(zip([*[geo_h[parent]]*len(parents)], parents)))

# d = w_df.columns
# print(d)
for alabel in chil_label:    
    summary_of_child_data.update(categorize_work_data(w_df, alabel))

    display_sums = {x:display_summary_of_work_data(summary_of_child_data[x], a_map=c_l, lan='fr') for x in summary_of_child_data.keys()}

pd.DataFrame(display_sums).T

In [None]:
unit_columns = ["loc_date", "slug", top_label[0]]
# a_summary_of_one_vector(w_df[w_df.parent_boundary.isin(summary_of_parent_data['p']['parent_boundary'])].copy(), unit_columns, conf_r.unit_agg, describe='pcs_m')

## Aggregate a set of data by sample (location and date)

Use the loc_date column in the survey data. Use the IQAASL period and four river baisns test against the federal report.

### Before aggregating does the number of locations, cities, samples and quantity match the federal report?

__The feature types include lakes and rivers, alpes were condsidered separately__

From https://hammerdirt-analyst.github.io/IQAASL-End-0f-Sampling-2021/lakes_rivers.html#

1. cities = yes
2. samples = yes
3. locations = yes
4. quantity = No it is short 50 pieces
5. start and end date = yes

### Number of lakes, rivers, parcs, cities and cantons

In [None]:
summary_of_work_data = categorize_work_data(code_result_df, code_result_df.canton.unique())
summary_of_work_data = summary_of_work_data[['samples', 'cities', 'lakes', 'rivers', 'parks', 'quantity']]
translated_and_style_for_display(summary_of_work_data, l_mapi, lang, gradient=False)

### aggregate to sample

The assessments are made on a per sample basis. That means that we can look at an individual object value at each sample. The sum of all the individual objects in a survey is the total for that survey. Dividing the totals by the length of the survey gives the assessment metric: _pieces of trash per meter_.

1. Are the quantiles of the current data  = to the federal report? Yes
2. Are the material totals = to the federal report? No,plastics if off by 50 pcs
3. Are the fail rates of the most common objects = to the federal report? Yes
4. Is the % of total of the most common objects = to the fedral report? yes
5. Is the median pieces/meter of the most common objects = to the federal report? yes
6. Is the quantity of the most common objects = to the federal report? yes

#### The summary of survey totals

fig 1.5 in IQAASL

In [None]:
# the sample is the basic unit
# loc_date is the unique identifier for each sample
unit_columns = ["loc_date", "slug", "parent_boundary"]

# the quantiles of the sample-total pcs/m  
vector_summary = a_summary_of_one_vector(code_result_df.copy(), unit_columns, unit_agg, describe='pcs_m')

translated_and_style_for_display(vector_summary,l_mapi, lang, gradient=False)

#### Material totals and proportions

fig 1.5 iqaal

In [None]:
# add the material label to each code
merged_result = merge_dataframes_on_column_and_index(code_result_df.copy(), codes["material"], 'code', how='inner', validate=True)

# sum the materials for the data frame
materials = aggregate_dataframe(merged_result.copy(), ["material"], {"quantity":"sum"})

# add 5 of total for display
materials["%"] = materials.quantity/materials.quantity.sum()

translated_and_style_for_display(materials.set_index('material', drop=True),l_mapi, lang, gradient=False)

#### Quantity, median pcs/m, fail rate, and % of total

Sumary results for all the codes in the parent_boundary

In [None]:
# sum the cumulative quantity for each code and calculate the median pcs/meter
code_totals = aggregate_dataframe(code_result_df.copy(), ["code"], {"quantity":"sum", "pcs_m":"median"})

# collect 
abundant = get_top_x_records_with_max_quantity(code_totals.copy(), "quantity", "code", len(code_totals))

# identify the objects that were found in at least 50% of the samples
# calculate the quantity per sample for each code and sample
occurrences = aggregate_dataframe(code_result_df, ["loc_date", "code"], {"quantity":"sum"})

# count the number of times that an object was counted > 0
# and divide it by the total number of samples 
event_counts  = count_objects_with_positive_quantity(occurrences)

# calculate the rate of occurence per unit of measure
rates = calculate_rate_per_unit(code_result_df, code_result_df.code.unique())

# add the unit rates and fail rates
abundance = merge_dataframes_on_column_and_index(abundant, rates["pcs_m"], left_column="code", validate="one_to_one")
abundance["fail rate"] = abundance.code.apply(lambda x: event_counts.loc[x])

# this is the complete inventory with summary
# statistics for each objecabundance.sort_values(by="quantity", inplace=True, ascending=False)
abundance.reset_index(inplace=True, drop=True)

In [None]:
# codes = pd.read_csv(code_data)
# codes.rename(columns={"en":"code"}, inplace=True)
# codes.set_index("code", drop=True, inplace=True)
# codes.drop('code', inplace=True)
# codes.to_csv('data/end_process/codes.csv', index=True)

### The most common objects

fig 1.6 iqaasl

In [None]:
# arguments to slice the data by column
column_one = {
    'column': 'quantity',
    'val': abundance.loc[10, 'quantity']
}

column_two = {
    'column':'fail rate',
    'val': 0.5
}

# use the inventory to find the most common objects
the_most_common = display_tabular_data_by_column_values(abundance.copy(), column_one, column_two, 'code')

translated_and_style_for_display(the_most_common.copy(),l_mapi, lang, gradient=False)

### Results by groupname and feature boundary

In [None]:
cumulative_columns = ["loc_date", "groupname"]
unit_columns = ["parent_boundary", "loc_date", "groupname"]
object_labels = code_result_df.groupname.unique()
object_columns = ["groupname"]
boundary_labels = code_result_df.parent_boundary.unique()

args = {
    'cumulative_columns':cumulative_columns,
    'object_labels':object_labels,
    'boundary_labels':boundary_labels,
    'object_columns':object_columns,
    'unit_agg':unit_agg,
    'unit_columns':unit_columns,
    'agg_groups':agg_groups
}

tix = summary_of_parent_and_child_features(code_result_df.copy(), **args)
translated_and_style_for_display(tix,l_mapi, lang, gradient=True)

### Most common codes by feature boundary

In [None]:
cumulative_columns = ["loc_date", "code"]
unit_columns = ["parent_boundary", "loc_date", "code"]
codes_of_interest = the_most_common.index
object_columns = ["code"]
boundary_labels = code_result_df.parent_boundary.unique()

data = code_result_df[code_result_df.code.isin(codes_of_interest)].copy()

args = {
    'cumulative_columns':cumulative_columns,
    'object_labels':codes_of_interest,
    'boundary_labels':boundary_labels,
    'object_columns':object_columns,
    'unit_agg':unit_agg,
    'unit_columns':unit_columns,
    'agg_groups':agg_groups
}

tix = summary_of_parent_and_child_features(data.copy(), **args)

translated_and_style_for_display(tix,l_mapi, lang, gradient=True)

### Most common codes by canton



In [None]:
unit_columns = ["canton", "loc_date", "code"]
object_columns = ["code"]
boundary_labels = code_result_df.canton.unique()

data = code_result_df[code_result_df.code.isin(codes_of_interest)].copy()

args = {
    'cumulative_columns':cumulative_columns,
    'object_labels':codes_of_interest,
    'boundary_labels':boundary_labels,
    'object_columns':object_columns,
    'unit_agg':unit_agg,
    'unit_columns':unit_columns,
    'agg_groups':agg_groups
}

tix = summary_of_parent_and_child_features(data, **args)

translated_and_style_for_display(tix.T,l_mapi, lang, gradient=True)

### Most common codes: canton-municipal

#### Bern

In [None]:
canton = "Bern"

with_cantons = code_result_df[code_result_df.canton == canton].copy()

unit_columns = ["city", "loc_date", "code"]
# the column that holds the labels of interest
object_columns = ["code"]
# the labels of interest for the boundary conditions
boundary_labels = with_cantons.city.unique()

ddata = with_cantons[(with_cantons.code.isin(codes_of_interest)) & (with_cantons.canton == "Bern")].copy()

args = {
    'cumulative_columns':cumulative_columns,
    'object_labels':codes_of_interest,
    'boundary_labels':boundary_labels,
    'object_columns':object_columns,
    'unit_agg':unit_agg,
    'unit_columns':unit_columns,
    'agg_groups':agg_groups
}

tix = summary_of_parent_and_child_features(ddata, **args)
translated_and_style_for_display(tix.T,l_mapi, lang, gradient=True)

#### Valais

In [None]:
canton = "Valais"

with_cantons = code_result_df[code_result_df.canton == canton].copy()

unit_columns = ["city", "loc_date", "code"]
# the column that holds the labels of interest
object_columns = ["code"]
# the labels of interest for the boundary conditions
boundary_labels = with_cantons.city.unique()

ddata = with_cantons[(with_cantons.code.isin(codes_of_interest))].copy()

args = {
    'cumulative_columns':cumulative_columns,
    'object_labels':codes_of_interest,
    'boundary_labels':boundary_labels,
    'object_columns':object_columns,
    'unit_agg':unit_agg,
    'unit_columns':unit_columns,
    'agg_groups':agg_groups
}

tix = summary_of_parent_and_child_features(ddata, **args)
translated_and_style_for_display(tix,l_mapi, lang, gradient=True)

In [None]:
%watermark -a hammerdirt-analyst -co --iversions

In [None]:
# import re

# def remove_special_characters_and_lowercase(input_string):
#     # Use a regular expression to remove non-alphanumeric characters
#     clean_string = re.sub(r'[^a-zA-Z0-9]', '', input_string)
#     # Convert the cleaned string to lowercase
#     lowercase_string = clean_string.lower()
#     return lowercase_string

# # Example usage:
# input_string = "Hello*.$, World! 123"
# result = remove_special_characters_and_lowercase(input_string)

# def is_iso_date(date_string):
#     # Define a regular expression pattern for ISO date (YYYY-MM-DD)
#     iso_date_pattern = r'^\d{4}-\d{2}-\d{2}$'

#     if re.match(iso_date_pattern, date_string):
#         return date_string
#     else:
#         return False



# def validate_a_string_format(input_str, expected_format: str = r'^\d{4}-\d{2}-\d{2}$', string_rep: str = 'YYYY-MM-DD'):
#     # Define the expected format using a regular expression pattern
    
#     if not re.match(expected_format, input_str):
#         raise ValueError(f"Is not the right format. Should be: {string_rep}")
    
#     return input_str
    

# def collect_survey_data_for_report(a_function: Callable = None, **kwargs)-> pd.DataFrame:
#     # if a function is provided returns the called function
#     # the function should return a data rame with the same column
#     # names as the standard set of data otherwise the default method
#     # and file locations are are used.
#     if a_function:
#         return a_function(**kwargs)
#     else:
#         surveys = combine_survey_files(conf_r.survey_files)
#         return surveys
        
    

# def collect_env_data_for_report(a_function: Callable = None, **kwargs)-> pd.DataFrame:
#     # if a function is provided returns the called function
#     # the function should return a data rame with the same column
#     # names as the standard set of data otherwise the default method
#     # and file locations are are used.
#     if a_function:
#         return a_function(**kwargs)
#     if 'all' in kwargs.keys():
#         codes = pd.read_csv(conf_r.code_data).set_index("code")
#         beaches = pd.read_csv(conf_r.beach_data).set_index("slug")
#         land_cover = pd.read_csv(conf_r.land_cover_data)
#         land_use = pd.read_csv(conf_r.land_use_data)
#         streets = pd.read_csv(conf_r.street_data)
#         river_intersect_lakes = pd.read_csv(conf_r.intersection_attributes)
    
    
    
#     return codes, beaches, land_cover, land_use, streets, river_intersect_lakes
            
    

In [None]:
# def apply_gradient_style(df, gradient_func: Callable = None, **kwargs):
#     """
#     Apply a custom gradient style to a DataFrame using a specified gradient function.

#     Args:
#         df (pd.DataFrame): The input DataFrame.
#         gradient_func (callable): A function that takes a value and returns CSS styles for styling.

#     Returns:
#         pd.io.formats.style.Styler: A styled representation of the DataFrame.

#     Example:
#         def gradient(val):
#             cmap = 'coolwarm'
#             vmin = df.values.min().min()
#             vmax = df.values.max().max()
#             norm = plt.Normalize(vmin, vmax)
#             colormap = plt.cm.get_cmap(cmap)
#             color = colormap(norm(val))
#             hex_color = "#{:02x}{:02x}{:02x}".format(int(color[0] * 255), int(color[1] * 255), int(color[2] * 255))
#             return f'background-color: {hex_color}'

#         styled_df = apply_gradient_style(df, gradient)
#     """
#     # Use the .style.applymap method to apply the gradient function to each cell in the DataFrame
#     styled_df = df.style.applymap(gradient_func, **kwargs)

#     return styled_df
    
# def apply_table_style_format(df, style: List = None, format_kwargs: Dict = None):
    
       
#     return df.style.set_table_styles(style).format(**format_kwargs)

# a decorator for data frames
# def apply_style_to_dataframe(mymethod, **kwargs):
#     """
#     Apply styling to a DataFrame using a custom method or gradient function.
    
#     This decorator function allows you to apply styling to a DataFrame returned by a function.
#     It takes a custom method and a gradient function as input and applies these functions to
#     the DataFrame to style it based on specified criteria.
    
#     Args:
#     mymethod (callable): A custom method that applies styling to the DataFrame.
#     gradient_func (callable): A function that defines a color gradient for styling.
#     **kwargs: Additional keyword arguments to pass to the custom method.
    
#     Returns:
#     callable: A decorator function that can be used to style a DataFrame returned by a function.
    
#     Example:
#     def my_custom_styling(df, gradient_func, **kwargs):
#         # Apply styling to the DataFrame using the gradient function and optional kwargs
#         styled_df = df.style.applymap(gradient_func, **kwargs)
#         return styled_df
    
#     @apply_style_to_dataframe(my_custom_styling, my_gradient_function, colormap='coolwarm')
#     def create_dataframe():
#         # Create and return a DataFrame
#         ...
    
#     styled_result = create_dataframe()
#     """
#     def decorator(function):
    
#         def wrapper(*args):
#             # Call the original function to get the DataFrame
#             df = function(*args)
                                 
#             return mymethod(df,**kwargs)
        
#         return wrapper
#     return decorator


# @apply_style_to_dataframe(apply_gradient_style, gradient_func=color_gradient, min=0, max=1, cmap=newcmp)