In [236]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
import re
from typing import Union, Optional, List, Dict, Any, Tuple, Set



df = pd.read_csv(
    'test_data.csv',
    parse_dates=['date_added', 'date_modified']
)




In [239]:
def apply_price_range_filter(df: pd.DataFrame, price_col: str, **price_kwargs) -> pd.DataFrame:
   
    if price_col not in df.columns:
        print(f"Warning: Price column '{price_col}' not found!")
        return df.copy()

    mask = pd.Series(True, index=df.index)

    if 'price_gt' in price_kwargs:
        mask &= df[price_col] > price_kwargs['price_gt']
        print(f"Applied price_gt filter: > {price_kwargs['price_gt']}")

    if 'price_gte' in price_kwargs:
        mask &= df[price_col] >= price_kwargs['price_gte']
        print(f"Applied price_gte filter: >= {price_kwargs['price_gte']}")

    if 'price_lt' in price_kwargs:
        mask &= df[price_col] < price_kwargs['price_lt']
        print(f"Applied price_lt filter: < {price_kwargs['price_lt']}")

    if 'price_lte' in price_kwargs:
        mask &= df[price_col] <= price_kwargs['price_lte']
        print(f"Applied price_lte filter: <= {price_kwargs['price_lte']}")

    if 'price_between' in price_kwargs:
        min_val, max_val = price_kwargs['price_between']
        mask &= (df[price_col] >= min_val) & (df[price_col] <= max_val)
        print(f"Applied price_between filter: {min_val} <= price <= {max_val}")

    return mask


In [None]:
from typing import Optional, Tuple
import pandas as pd
from datetime import timedelta

def apply_date_range_filter(
    df: pd.DataFrame,
    date_col: str,
    handle_nulls: str = 'exclude',
    timezone: Optional[str] = None,
    date_format: Optional[str] = None,
    verbose: bool = False,
    *,
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
    date_after: Optional[str] = None,
    date_before: Optional[str] = None,
    date_between: Optional[Tuple[str, str]] = None,
    days_ago: Optional[int] = None,
    date_today: Optional[bool] = None
) -> pd.Series:
    
    # --- Example simplified implementation ---

    # Defensive checks (you can expand this)
    if date_col not in df.columns:
        raise KeyError(f"Column '{date_col}' not found in DataFrame.")

    date_series = df[date_col]

    # Convert to datetime if not already
    if not pd.api.types.is_datetime64_any_dtype(date_series):
        date_series = pd.to_datetime(date_series, format=date_format, errors='coerce')

    # Handle nulls
    if handle_nulls == 'exclude':
        mask = ~date_series.isna()
    elif handle_nulls == 'include':
        mask = pd.Series(True, index=df.index)
    elif handle_nulls == 'error' and date_series.isna().any():
        raise ValueError(f"Null values found in '{date_col}'")

    else:
        mask = pd.Series(True, index=df.index)

    # Apply filters one by one (only if they exist)
    if date_from:
        mask &= date_series >= pd.to_datetime(date_from)
    if date_to:
        mask &= date_series <= pd.to_datetime(date_to)
    if date_after:
        mask &= date_series > pd.to_datetime(date_after)
    if date_before:
        mask &= date_series < pd.to_datetime(date_before)
    if date_between:
        start, end = date_between
        mask &= (date_series >= pd.to_datetime(start)) & (date_series <= pd.to_datetime(end))
    if days_ago is not None:
        cutoff = pd.Timestamp.now(tz=timezone) - timedelta(days=days_ago)
        mask &= date_series >= cutoff
    if date_today:
        today = pd.Timestamp.now(tz=timezone).normalize()
        tomorrow = today + timedelta(days=1)
        mask &= (date_series >= today) & (date_series < tomorrow)

    return mask


In [243]:
def detect_range_parameters(kwargs: dict) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
    """
    Separate range parameters from regular filter parameters.
    
    Args:
        kwargs: Dictionary of filter parameters
    
    Returns:
        Tuple containing:
        - price_params: Dictionary of price range parameters
        - date_params: Dictionary of date range parameters  
        - regular_params: Dictionary of regular filter parameters
    
    Possible Outputs and Scenarios:
    
    1. ALL EMPTY DICTIONARIES:
       - Input: {} or None values only
       - Output: ({}, {}, {})
    
    2. ONLY PRICE PARAMETERS:
       - Input: {'price_min': 1000, 'price_max': 5000}
       - Output: ({'price_min': 1000, 'price_max': 5000}, {}, {})
    
    3. ONLY DATE PARAMETERS:
       - Input: {'date_from': '2025-01-01', 'date_added_min': '2025-02-01'}
       - Output: ({}, {'date_from': '2025-01-01', 'date_added_min': '2025-02-01'}, {})
    
    4. ONLY REGULAR PARAMETERS:
       - Input: {'brand': 'Teledyne', 'category_id': 5}
       - Output: ({}, {}, {'brand': 'Teledyne', 'category_id': 5})
    
    5. MIXED PARAMETERS:
       - Input: {'price_min': 1000, 'date_from': '2025-01-01', 'brand': 'Teledyne'}
       - Output: ({'price_min': 1000}, {'date_from': '2025-01-01'}, {'brand': 'Teledyne'})
    
    6. EDGE CASES:
       - None values: Preserved in their respective categories
       - Empty strings: Preserved in their respective categories
       - Case sensitivity: Exact match required
    """
    
    # Handle edge case: None input
    if kwargs is None:
        return {}, {}, {}
    
    # Handle edge case: empty dict
    if not kwargs:
        return {}, {}, {}
    
    price_prefixes = {
        'price_min', 'price_max', 'price_gt', 'price_gte', 
        'price_lt', 'price_lte', 'price_between'
    }
    
    date_prefixes = {
        'date_from', 'date_to', 'date_after', 'date_before', 
        'date_between', 'days_ago', 'date_today'
    }
    
    # Column-specific date range patterns
    date_column_patterns = ['date_added_', 'date_modified_']
    
    price_params = {}
    date_params = {}
    regular_params = {}
    
    for key, value in kwargs.items():
        # Convert key to string to handle potential non-string keys
        key_str = str(key)
        
        if key_str in price_prefixes:
            price_params[key] = value
        elif key_str in date_prefixes:
            date_params[key] = value
        elif any(key_str.startswith(pattern) for pattern in date_column_patterns):
            # Handle column-specific date ranges like date_added_from, date_modified_to
            date_params[key] = value
        else:
            regular_params[key] = value
    
    return price_params, date_params, regular_params


In [244]:
# Include the filter implementation here
def apply_regular_filters(df: pd.DataFrame, **kwargs) -> pd.Series:
    """
    Apply regular filtering logic with improved error handling and edge case management.
    
    Args:
        df: Input DataFrame
        **kwargs: Filter conditions where key=column_name, value=filter_value
        
    Returns:
        pd.Series: Boolean mask for filtering
        
    Features:
        - Case-insensitive string matching for object columns
        - Partial matching for strings
        - Multiple value support (OR logic)
        - Exact matching for numeric columns
        - Proper handling of None/NaN values
        - No modification of original DataFrame
    """
    if df.empty:
        print("Warning: DataFrame is empty")
        return pd.Series(dtype=bool, index=df.index, name='empty_mask')

    if not kwargs:
        print("No filter conditions provided")
        return pd.Series(True, index=df.index, name='no_filter_mask')

    condition_masks = []
    for col, value in kwargs.items():
        if col not in df.columns:
            print(f"Warning: Column '{col}' not found in DataFrame. Available columns: {list(df.columns)}")
            continue

        if df[col].isnull().all():
            print(f"Warning: Column '{col}' contains only NaN values. Skipping condition.")
            continue

        condition_mask = _process_single_condition(df, col, value)
        if condition_mask is not None:
            condition_masks.append(condition_mask)

    if not condition_masks:
        print("Warning: No valid filter conditions found. Returning empty result.")
        return pd.Series(False, index=df.index, name='no_valid_conditions')

    final_mask = condition_masks[0]
    for mask in condition_masks[1:]:
        final_mask = final_mask & mask

    matches = final_mask.sum()
    print(f"Filter applied: {matches} out of {len(df)} rows match all conditions")
    return final_mask

def _process_single_condition(df: pd.DataFrame, col: str, value: Any) -> Union[pd.Series, None]:
    """Process a single filter condition."""
    col_dtype = df[col].dtype

    if value is None or (isinstance(value, float) and pd.isna(value)):
        return df[col].isna()

    if isinstance(value, (list, tuple, set)):
        return _handle_multiple_values(df, col, value, col_dtype)

    return _handle_single_value(df, col, value, col_dtype)

def _handle_multiple_values(df: pd.DataFrame, col: str, values: Union[List, Tuple, Set], col_dtype) -> Union[pd.Series, None]:
    """Handle multiple values with OR logic."""
    non_none_values = [v for v in values if v is not None and not (isinstance(v, float) and pd.isna(v))]
    none_values = [v for v in values if v is None or (isinstance(v, float) and pd.isna(v))]

    masks = []

    if non_none_values:
        if pd.api.types.is_object_dtype(col_dtype):
            value_masks = [_handle_single_value(df, col, v, col_dtype) for v in non_none_values]
            value_masks = [m for m in value_masks if m is not None]
            if value_masks:
                combined_mask = value_masks[0]
                for mask in value_masks[1:]:
                    combined_mask = combined_mask | mask
                masks.append(combined_mask)
        else:
            masks.append(df[col].isin(non_none_values))

    if none_values:
        masks.append(df[col].isna())

    if not masks:
        print(f"Warning: No valid values found in list for column '{col}'")
        return None

    final_mask = masks[0]
    for mask in masks[1:]:
        final_mask = final_mask | mask

    return final_mask

def _handle_single_value(df: pd.DataFrame, col: str, value: Any, col_dtype) -> Union[pd.Series, None]:
    """Handle a single value."""
    if pd.api.types.is_object_dtype(col_dtype):
        working_series = df[col].fillna('').astype(str).str.lower()
        escaped_value = re.escape(str(value).lower())
        mask = working_series.str.contains(escaped_value, na=False, regex=True)
        if not mask.any():
            print(f"Warning: Value '{value}' not found in column '{col}'. No matches.")
            return pd.Series(False, index=df.index)
        return mask
    else:
        try:
            if pd.api.types.is_numeric_dtype(col_dtype):
                converted_value = pd.to_numeric(value, errors='coerce')
                if pd.isna(converted_value):
                    print(f"Warning: Value '{value}' cannot be converted to numeric for column '{col}'")
                    return pd.Series(False, index=df.index)
                mask = df[col] == converted_value
            else:
                mask = df[col] == value

            if not mask.any():
                print(f"Warning: Value '{value}' not found in column '{col}'. No matches.")
                return pd.Series(False, index=df.index)

            return mask
        except Exception as e:
            print(f"Warning: Error processing value '{value}' for column '{col}': {str(e)}")
            return pd.Series(False, index=df.index)



In [246]:
def filter_products_enhanced(df: pd.DataFrame, **kwargs) -> pd.DataFrame:
    """
    Filter products in the DataFrame using combined price range, date range, and regular filters.

    This function applies advanced filtering logic to the input DataFrame:
    - Price filters on the 'price' column (min/max, greater than, less than).
    - Date filters on 'date_added' and 'date_modified' columns, with optional prefixes:
      - General date filters apply to 'date_added'.
      - Prefixed filters apply specifically to 'date_added' or 'date_modified'.
    - Regular filters on specific DataFrame columns such as 'brand', 'category_id', etc.

    The filters are combined with AND logic, meaning rows must satisfy all provided conditions.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing product data.

    **kwargs : dict
        Filter parameters which may include:

        Price filters (all optional):
            - price_min (float): Minimum price (inclusive).
            - price_max (float): Maximum price (inclusive).
            - price_gt (float): Price strictly greater than this value.
            - price_gte (float): Price greater than or equal to this value.
            - price_lt (float): Price strictly less than this value.
            - price_lte (float): Price less than or equal to this value.

        Date filters on 'date_added' and 'date_modified' (all optional):
            - date_from (str): Minimum date (inclusive) for 'date_added'.
            - date_to (str): Maximum date (inclusive) for 'date_added'.
            - date_after (str): Date strictly after (exclusive) for 'date_added'.
            - date_before (str): Date strictly before (exclusive) for 'date_added'.
            - date_added_min (str): Minimum date for 'date_added' column.
            - date_added_max (str): Maximum date for 'date_added' column.
            - date_modified_min (str): Minimum date for 'date_modified' column.
            - date_modified_max (str): Maximum date for 'date_modified' column.

        Regular filters (example keys, adjust as needed):
            - brand (str): Case-insensitive partial string match on brand.
            - category_id (int): Exact match on category ID.
            - color (str): Case-insensitive partial string match on color.

    Returns
    -------
    pd.DataFrame
        Filtered DataFrame containing only rows that match all given filter criteria.

    Warnings
    --------
    - If the DataFrame is empty or no filter kwargs are provided, the full DataFrame is returned.
    - Invalid filter keys that do not correspond to DataFrame columns are ignored with a warning.
    - If no rows match the filter criteria, an empty DataFrame is returned with a warning.

    Examples
    --------
    >>> filter_products_enhanced(df, price_min=100, price_max=500, brand='sony')
    >>> filter_products_enhanced(df, date_added_min='2025-01-01', category_id=12)
    >>> filter_products_enhanced(df, price_gt=50, date_from='2025-03-01', color='red')
    """
    if df.empty:
        print("Warning: The DataFrame is empty.")
        return df.copy()
    
    if not kwargs:
        print("No filter conditions provided. Returning full DataFrame.")
        return df.copy()
    
    working_df = df.copy()
    
    price_params, date_params, regular_params = detect_range_parameters(kwargs)
    
    final_mask = pd.Series(True, index=working_df.index)
    
    if price_params:
        price_mask = apply_price_range_filter(working_df, 'price', **price_params)
        final_mask &= price_mask
    
    if date_params:
        general_date_params = {k: v for k, v in date_params.items() 
                               if not k.startswith(('date_added_', 'date_modified_'))}
        date_added_params = {k.replace('date_added_', ''): v for k, v in date_params.items() 
                            if k.startswith('date_added_')}
        date_modified_params = {k.replace('date_modified_', ''): v for k, v in date_params.items() 
                               if k.startswith('date_modified_')}
        
        if general_date_params:
            date_mask = apply_date_range_filter(working_df, 'date_added', **general_date_params)
            final_mask &= date_mask
        
        if date_added_params:
            date_mask = apply_date_range_filter(working_df, 'date_added', **date_added_params)
            final_mask &= date_mask
        
        if date_modified_params:
            date_mask = apply_date_range_filter(working_df, 'date_modified', **date_modified_params)
            final_mask &= date_mask
    
    # Remove keys from regular_params that do not exist in DataFrame columns
    valid_regular_params = {k: v for k, v in regular_params.items() if k in working_df.columns}
    invalid_keys = set(regular_params.keys()) - set(valid_regular_params.keys())
    if invalid_keys:
        print(f"Warning: The following filter columns do not exist and will be ignored: {invalid_keys}")
    
    if valid_regular_params:
        regular_mask = apply_regular_filters(working_df, **valid_regular_params)
        final_mask &= regular_mask
    
    result_df = working_df[final_mask]
    
    if result_df.empty:
        print("Warning: No rows match the given filter conditions.")
    
    return result_df


In [254]:
def analyze_data(df: pd.DataFrame, 
                 group_by_column: Optional[Union[str, List[str]]] = None,
                 agg_dict: Optional[Dict[str, Union[str, List[str]]]] = None,
                 count_column: Optional[str] = None,
                 include_group_size: bool = False,
                 convert_numeric: bool = True) -> pd.DataFrame:
    """
    Perform grouped or ungrouped analysis on a DataFrame.
    
    Parameters:
    -----------
    df : pd.DataFrame
        Input DataFrame to analyze
    group_by_column : str, list of str, or None
        Column(s) to group by. If None, performs ungrouped analysis.
    agg_dict : dict or None
        Dictionary mapping column names to aggregation functions.
        If None and count_column is None, returns descriptive statistics.
    count_column : str or None
        Column to perform value counts on within each group.
    include_group_size : bool
        Whether to include group size in the output (only for grouped analysis).
    convert_numeric : bool
        Whether to attempt converting string columns to numeric if they contain numeric values.
    
    Returns:
    --------
    pd.DataFrame
        Analysis results with consistent structure.
    """
    
    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    
    if df.empty:
        print("Warning: Input DataFrame is empty.")
        return pd.DataFrame()
    
    # Convert numeric strings to numeric if requested
    if convert_numeric:
        df = _convert_numeric_strings(df)
    
    # Validate mutually exclusive parameters
    if agg_dict is not None and count_column is not None:
        raise ValueError("Cannot specify both agg_dict and count_column. Choose one.")
    
    # Determine if this is grouped or ungrouped analysis
    is_grouped = group_by_column is not None
    
    if is_grouped:
        return _grouped_analysis(df, group_by_column, agg_dict, count_column, include_group_size)
    else:
        return _ungrouped_analysis(df, agg_dict, count_column)


def _grouped_analysis(df: pd.DataFrame, 
                      group_by_column: Union[str, List[str]],
                      agg_dict: Optional[Dict[str, Union[str, List[str]]]],
                      count_column: Optional[str],
                      include_group_size: bool) -> pd.DataFrame:
    # Normalize group_by_column to list
    if isinstance(group_by_column, str):
        group_by_column = [group_by_column]

    # Validate grouping columns
    if not all(col in df.columns for col in group_by_column):
        missing = [col for col in group_by_column if col not in df.columns]
        raise ValueError(f"Grouping columns not found in DataFrame: {missing}")

    if df[group_by_column].isnull().any().any():
        print("Warning: Missing values found in grouping columns. Dropping them.")
        df = df.dropna(subset=group_by_column)
        if df.empty:
            print("Warning: No data remaining after dropping missing group values.")
            return pd.DataFrame()

    try:
        grouped = df.groupby(group_by_column, dropna=False)
    except Exception as e:
        raise ValueError(f"Error creating groups: {str(e)}")

    if agg_dict:
        result = _apply_aggregations(grouped, agg_dict, df.columns)
    elif count_column:
        result = _apply_value_counts(grouped, count_column, df.columns, group_by_column)
    else:
        # ✅ NEW: Just return the raw grouped rows (i.e. original DataFrame)
        return df

    if include_group_size and not result.empty:
        group_sizes = grouped.size().reset_index(name='group_size')
        result = pd.merge(result, group_sizes, on=group_by_column, how='left')

    return result



def _convert_numeric_strings(df: pd.DataFrame) -> pd.DataFrame:
    """Convert string columns to numeric if they contain numeric values."""
    df = df.copy()
    
    for col in df.columns:
        if df[col].dtype == 'object':
            # Try to convert to numeric
            try:
                # Remove any whitespace first
                if isinstance(df[col].iloc[0], str):
                    df[col] = df[col].str.strip()
                
                # Try converting to numeric
                numeric_series = pd.to_numeric(df[col], errors='coerce')
                
                # If most values converted successfully, use the numeric version
                if numeric_series.notna().sum() > len(df) * 0.5:  # At least 50% are numeric
                    df[col] = numeric_series
                    print(f"Converted column '{col}' from string to numeric.")
                    
            except (AttributeError, ValueError):
                # If conversion fails, keep as string
                continue
    
    return df


def _ungrouped_analysis(df: pd.DataFrame,
                       agg_dict: Optional[Dict[str, Union[str, List[str]]]],
                       count_column: Optional[str]) -> pd.DataFrame:
    """Internal function for ungrouped analysis."""
    
    if agg_dict:
        # Validate columns in agg_dict
        for col in agg_dict.keys():
            if col not in df.columns:
                raise ValueError(f"Column '{col}' in agg_dict not found in DataFrame.")
        
        result = _apply_ungrouped_aggregations(df, agg_dict)
    elif count_column:
        if count_column not in df.columns:
            raise ValueError(f"Count column '{count_column}' not found in DataFrame.")
        
        result = df[count_column].value_counts().reset_index()
        result.columns = [count_column, 'count']
    else:
        # Default: descriptive statistics for numeric columns
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) == 0:
            print("Warning: No numeric columns found for default analysis.")
            return pd.DataFrame({'message': ['No numeric columns available for analysis']})
        
        result = df[numeric_cols].describe().T.reset_index()
        result.rename(columns={'index': 'column'}, inplace=True)
    
    return result


def _apply_aggregations(grouped, agg_dict: Dict[str, Union[str, List[str]]], df_columns) -> pd.DataFrame:
    """Apply aggregations to grouped data."""
    
    # Validate columns exist
    for col in agg_dict.keys():
        if col not in df_columns:
            raise ValueError(f"Column '{col}' in agg_dict not found in DataFrame.")
    
    # Validate aggregation functions
    valid_aggs = ['mean', 'median', 'sum', 'count', 'std', 'var', 'min', 'max', 'first', 'last', 'nunique']
    
    for col, aggs in agg_dict.items():
        # Handle different types of aggregation specifications
        if isinstance(aggs, str):
            # Single string aggregation
            if aggs not in valid_aggs:
                raise ValueError(f"Invalid aggregation function: {aggs}")
        elif callable(aggs):
            # Single callable function - skip validation, pandas will handle it
            continue
        elif isinstance(aggs, list):
            # List of aggregations
            for agg in aggs:
                if isinstance(agg, str) and agg not in valid_aggs:
                    raise ValueError(f"Invalid aggregation function: {agg}")
                elif not isinstance(agg, str) and not callable(agg):
                    raise ValueError(f"Aggregation function must be a string or callable: {agg}")
        else:
            # Single non-callable, non-string, non-list item
            if not callable(aggs):
                raise ValueError(f"Aggregation function must be a string or callable: {aggs}")
    
    try:
        # Process the aggregation dictionary to handle custom functions
        processed_agg_dict = {}
        for col, aggs in agg_dict.items():
            if isinstance(aggs, str):
                processed_agg_dict[col] = aggs
            elif callable(aggs):
                # Single callable function
                processed_agg_dict[col] = aggs
            elif isinstance(aggs, list):
                # List of functions
                processed_agg_dict[col] = aggs
            else:
                # Single non-callable, non-string item
                processed_agg_dict[col] = aggs
        
        result = grouped.agg(processed_agg_dict).reset_index()
        
        # Flatten column names if multi-level
        if isinstance(result.columns, pd.MultiIndex):
            new_columns = []
            for col in result.columns:
                if isinstance(col, tuple):
                    if col[1] == '' or col[1] is None:
                        new_columns.append(col[0])
                    else:
                        new_columns.append(f"{col[0]}_{col[1]}")
                else:
                    new_columns.append(col)
            result.columns = new_columns
        
        return result
    except Exception as e:
        raise ValueError(f"Error applying aggregations: {str(e)}")


def _apply_value_counts(grouped, count_column: str, df_columns, group_by_column: List[str]) -> pd.DataFrame:
    """Apply value counts to grouped data."""
    
    if count_column not in df_columns:
        raise ValueError(f"Count column '{count_column}' not found in DataFrame.")
    
    try:
        result = grouped[count_column].value_counts().reset_index()
        # Reorder columns to put grouping columns first
        cols = group_by_column + [count_column, 'count']
        result = result[cols]
        return result
    except Exception as e:
        raise ValueError(f"Error applying value counts: {str(e)}")


def _apply_default_aggregations(grouped, df: pd.DataFrame, group_by_column: List[str]) -> pd.DataFrame:
    """Apply default aggregations to numeric columns."""
    
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    numeric_cols = [col for col in numeric_cols if col not in group_by_column]
    
    if len(numeric_cols) == 0:
        print("Warning: No numeric columns found for default analysis.")
        return pd.DataFrame()
    
    try:
        result = grouped[numeric_cols].agg(['count', 'mean', 'std', 'min', 'max']).reset_index()
        
        # Flatten column names if multi-level
        if isinstance(result.columns, pd.MultiIndex):
            new_columns = []
            for col in result.columns:
                if isinstance(col, tuple):
                    if col[1] == '' or col[1] is None:
                        new_columns.append(col[0])
                    else:
                        new_columns.append(f"{col[0]}_{col[1]}")
                else:
                    new_columns.append(col)
            result.columns = new_columns
        
        return result
    except Exception as e:
        raise ValueError(f"Error applying default aggregations: {str(e)}")

def _apply_ungrouped_aggregations(df: pd.DataFrame, agg_dict: Dict[str, Union[str, List[Union[str, Any]], Any]]) -> pd.DataFrame:
    """Apply aggregations to ungrouped data."""

    try:
        result_dict = {}

        for col, aggs in agg_dict.items():
            if isinstance(aggs, str) or callable(aggs):
                aggs = [aggs]

            for agg in aggs:
                if agg == 'count':
                    result_dict[f"{col}_{agg}"] = df[col].count()
                elif agg == 'nunique':
                    result_dict[f"{col}_{agg}"] = df[col].nunique()
                elif agg in ['mean', 'median', 'sum', 'std', 'var', 'min', 'max']:
                    if df[col].dtype in ['object', 'string']:
                        print(f"Warning: Cannot apply {agg} to non-numeric column '{col}'. Skipping.")
                        continue
                    try:
                        result_dict[f"{col}_{agg}"] = getattr(df[col], agg)()
                    except Exception:
                        print(f"Warning: Could not apply {agg} to column '{col}'. Skipping.")
                        continue
                elif agg in ['first', 'last']:
                    try:
                        result_dict[f"{col}_{agg}"] = getattr(df[col], agg)()
                    except Exception:
                        print(f"Warning: Could not apply {agg} to column '{col}'. Skipping.")
                        continue
                elif callable(agg):
                    try:
                        result_dict[f"{col}_custom"] = agg(df[col])
                    except Exception:
                        print(f"Warning: Could not apply custom function to column '{col}'. Skipping.")
                        continue
                else:
                    raise ValueError(f"Invalid aggregation function: {agg}")

        return pd.DataFrame([result_dict])

    except Exception as e:
        raise ValueError(f"Error applying ungrouped aggregations: {str(e)}")



In [255]:
def get_unique_values(
    df: pd.DataFrame,
    columns: Union[str, List[str]],
    dropna: bool = False,
    as_list: bool = False
) -> Union[pd.Series, List, Dict[str, Union[pd.Series, List]]]:
    """
    Get unique values from one or more DataFrame columns.

    Parameters:
        df (pd.DataFrame): Input DataFrame.
        columns (str or list of str): Column(s) to get unique values from.
        dropna (bool): Whether to drop NaN values.
        as_list (bool): If True, returns list(s) instead of Series.

    Returns:
        pd.Series, list, or dict: Unique values.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("'df' must be a pandas DataFrame.")
    
    if isinstance(columns, str):
        columns = [columns]
    elif not isinstance(columns, list) or not all(isinstance(col, str) for col in columns):
        raise TypeError("'columns' must be a string or list of strings.")

    if df.empty:
        raise ValueError("Input DataFrame is empty.")

    result = {}

    for col in columns:
        col = col.strip()
        if col not in df.columns:
            raise ValueError(f"Column '{col}' not found in DataFrame.")

        uniques = pd.Series(df[col].unique())

        if dropna:
            uniques = uniques.dropna()

        result[col] = uniques.tolist() if as_list else uniques

    if len(result) == 1:
        return next(iter(result.values()))
    else:
        return result





In [256]:
def sort_columns(
    df: pd.DataFrame,
    columns: Union[str, List[str]],
    ascending: Union[bool, List[bool]] = True,
    na_position: str = 'last',
    inplace: bool = False,
    ignore_index: bool = False
) -> Union[pd.DataFrame, None]:
    """
    Sort a DataFrame by one or more columns.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        columns (str or list of str): Column(s) to sort by.
        ascending (bool or list of bool): Sort order.
        na_position (str): 'first' or 'last' (default).
        inplace (bool): Whether to sort in place.
        ignore_index (bool): If True, reset index in result.

    Returns:
        pd.DataFrame or None: Sorted DataFrame or None if inplace=True.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("'df' must be a pandas DataFrame.")
    
    if isinstance(columns, str):
        columns = [columns]
    elif not isinstance(columns, list) or not all(isinstance(col, str) for col in columns):
        raise TypeError("'columns' must be a string or list of strings.")
    
    for col in columns:
        if col.strip() not in df.columns:
            raise ValueError(f"Column '{col}' not found in DataFrame.")

    if isinstance(ascending, bool):
        ascending = [ascending] * len(columns)
    elif isinstance(ascending, list):
        if len(ascending) != len(columns):
            raise ValueError("'ascending' list must match number of columns.")
        if not all(isinstance(x, bool) for x in ascending):
            raise TypeError("All 'ascending' values must be bool.")
    else:
        raise TypeError("'ascending' must be a bool or list of bools.")

    return df.sort_values(
        by=[col.strip() for col in columns],
        ascending=ascending,
        na_position=na_position,
        inplace=inplace,
        ignore_index=ignore_index
    )


In [257]:
def find_missing_values(df):
    """
    Comprehensive function to detect missing values in a DataFrame.
    
    Parameters:
    df (pd.DataFrame): Input DataFrame
    
    Returns:
    dict: Dictionary containing:
        - 'summary': DataFrame with missing value statistics per column
        - 'locations': DataFrame with row/column locations of missing values
        - 'total_missing': Total count of missing values
        - 'missing_percentage': Overall percentage of missing values
    """
    
    # Input validation
    if not isinstance(df, pd.DataFrame):
        raise ValueError("Input must be a pandas DataFrame")
    
    if df.empty:
        return {
            'summary': pd.DataFrame(),
            'locations': pd.DataFrame(),
            'total_missing': 0,
            'missing_percentage': 0.0
        }
    
    # Create a copy to avoid modifying original
    df_copy = df.copy()
    
    # Replace common placeholder patterns with NaN
    placeholder_patterns = [
        '#########',  # Your specific pattern
        '######### ',
        '##############',  # Longer pattern for dates
        'N/A',
        'n/a',
        'NA',
        'na',
        'NULL',
        'null',
        'Null',
        'None',
        'none',
        'NONE',
        '',
        ' ',
        '  ',
        '   '
    ]
    
    for pattern in placeholder_patterns:
        df_copy = df_copy.replace(pattern, np.nan)
    
    # Also handle whitespace-only strings
    df_copy = df_copy.replace(r'^\s*$', np.nan, regex=True)
    
    # Calculate missing values per column
    missing_counts = df_copy.isnull().sum()
    missing_percentages = (missing_counts / len(df_copy)) * 100
    
    # Create summary DataFrame
    summary = pd.DataFrame({
        'Column': df_copy.columns,
        'Missing_Count': missing_counts.values,
        'Missing_Percentage': missing_percentages.values,
        'Total_Rows': len(df_copy),
        'Non_Missing_Count': len(df_copy) - missing_counts.values
    })
    
    # Sort by missing count (descending)
    summary = summary.sort_values('Missing_Count', ascending=False)
    
    # Find locations of missing values
    locations = []
    for col in df_copy.columns:
        missing_indices = df_copy[df_copy[col].isnull()].index.tolist()
        for idx in missing_indices:
            locations.append({
                'Row_Index': idx,
                'Column': col,
                'Original_Value': df.iloc[idx][col] if idx < len(df) else None
            })
    
    locations_df = pd.DataFrame(locations)
    
    # Calculate overall statistics
    total_cells = df_copy.shape[0] * df_copy.shape[1]
    total_missing = missing_counts.sum()
    overall_missing_percentage = (total_missing / total_cells) * 100 if total_cells > 0 else 0
    
    return {
        'summary': summary,
        'locations': locations_df,
        'total_missing': int(total_missing),
        'missing_percentage': round(overall_missing_percentage, 2),
        'total_cells': total_cells,
        'columns_with_missing': summary[summary['Missing_Count'] > 0]['Column'].tolist(),
        'columns_without_missing': summary[summary['Missing_Count'] == 0]['Column'].tolist()
    }

def print_missing_report(missing_info):
    """
    Print a formatted report of missing values.
    
    Parameters:
    missing_info (dict): Output from find_missing_values function
    """
    
    print("="*60)
    print("MISSING VALUES ANALYSIS REPORT")
    print("="*60)
    
    print(f"\nOVERALL STATISTICS:")
    print(f"Total cells: {missing_info['total_cells']:,}")
    print(f"Total missing values: {missing_info['total_missing']:,}")
    print(f"Overall missing percentage: {missing_info['missing_percentage']}%")
    
    print(f"\nCOLUMNS WITH MISSING VALUES: {len(missing_info['columns_with_missing'])}")
    print(f"COLUMNS WITHOUT MISSING VALUES: {len(missing_info['columns_without_missing'])}")
    
    if not missing_info['summary'].empty:
        print(f"\nMISSING VALUES BY COLUMN:")
        print(missing_info['summary'].to_string(index=False))
    
    if not missing_info['locations'].empty:
        print(f"\nFIRST 10 MISSING VALUE LOCATIONS:")
        print(missing_info['locations'].head(10).to_string(index=False))
    
    print("="*60)

In [258]:
def summarize_by_period(
    df: pd.DataFrame,
    date_col: str,
    freq: str,
) -> pd.DataFrame:
    """
    Add a time period column and call analyze_data grouped by it.
    """
    df = df.copy()
    df[date_col] = pd.to_datetime(df[date_col], errors='coerce')

    if freq == 'day':
        df['day_name'] = df[date_col].dt.day_name()
        group_col = 'day_name'

    elif freq == 'week':
        df['week_number'] = df[date_col].dt.isocalendar().week
        df['year'] = df[date_col].dt.isocalendar().year
        df['week_label'] = df['year'].astype(str) + '-W' + df['week_number'].astype(str)
        group_col = 'week_label'

    elif freq == 'month':
        df['month'] = df[date_col].dt.month_name()
        df['year'] = df[date_col].dt.year
        df['month_label'] = df['year'].astype(str) + '-' + df['month']
        group_col = 'month_label'

    elif freq == 'quarter':
        df['quarter_label'] = df[date_col].dt.to_period('Q').astype(str)
        group_col = 'quarter_label'

    elif freq == 'year':
        df['year_label'] = df[date_col].dt.year
        group_col = 'year_label'

    else:
        raise ValueError("Frequency must be one of: 'day', 'week', 'month', 'quarter', 'year'")

    # Call your analyze_data function with the new group_by column
    analyzed_df = analyze_data(
        df,
        group_by_column=group_col
    )

    return analyzed_df
