# Functions

In [117]:
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from typing import List, Dict, Tuple, Union, Optional, Any

# Importing DataBase From PostgreSQL

In [118]:
def get_db_engine():
    """Create and return a database engine"""
    return create_engine("postgresql+psycopg2://postgres:password@localhost:5432/Data_Asset_Linkage")

# LEVEL 0: Making Functions to fetch the data from the tables 

In [119]:
# Get Customer information Function 
def get_customers(
    customer_id: Union[str, List[str], None] = None,
    name: Union[str, List[str], None] = None,
    city: Union[str, List[str], None] = None,
    update_date: Union[str, datetime.date, List[Union[str, datetime.date]], None] = None,
    exact_match: bool = True,
    min_update_date: Union[str, datetime.date, None] = None,
    max_update_date: Union[str, datetime.date, None] = None,
    select_columns: Union[str, List[str], None] = None
) -> pd.DataFrame:
    """
    Ultimate flexible customer data retrieval with support for all parameter types.
    
    Parameters:
    - customer_id: Single ID or list of IDs (exact match)
    - name: Single name or list of names (exact/partial match)
    - city: Single city or list of cities (exact/partial match)
    - update_date: Single date or list of dates (str 'YYYY-MM-DD' or date object)
    - exact_match: False for partial text matching (default True)
    - min_update_date: Minimum date filter (inclusive)
    - max_update_date: Maximum date filter (inclusive)
    - select_columns: Column name or list of column names to return
    
    Returns:
    - Pandas DataFrame with matching customer records
    """
    
    #Handle column selection
    if select_columns:
        if isinstance(select_columns, list):
            column_str = ", ".join(select_columns)
        else:
            column_str = select_columns
    else:
        column_str = "*"

    # Base query
    query = f"SELECT {column_str} FROM customer WHERE 1=1"
    params = {}
    param_counter = 0
    
    def add_condition(field, value, exact=True, is_date=False):
        nonlocal query, params, param_counter
        param_prefix = f"{field}_{param_counter}"
        param_counter += 1
        
        if isinstance(value, (list, tuple)):
            conditions = []
            for i, val in enumerate(value):
                param_name = f"{param_prefix}_{i}"
                if is_date and isinstance(val, str):
                    val = datetime.strptime(val, '%Y-%m-%d').date()
                if exact:
                    conditions.append(f"{field} = %({param_name})s")
                else:
                    conditions.append(f"{field} ILIKE %({param_name})s")
                    val = f"%{val}%" if not is_date else val
                params[param_name] = val
            query += " AND (" + " OR ".join(conditions) + ")"
        else:
            if is_date and isinstance(value, str):
                value = datetime.strptime(value, '%Y-%m-%d').date()
            if exact:
                query += f" AND {field} = %({param_prefix})s"
            else:
                query += f" AND {field} ILIKE %({param_prefix})s"
                value = f"%{value}%" if not is_date else value
            params[param_prefix] = value
    
    # Add filters
    if customer_id is not None:
        add_condition("customer_id", customer_id, exact=True)
    
    if name is not None:
        add_condition("name", name, exact=exact_match)
    
    if city is not None:
        add_condition("city", city, exact=exact_match)
    
    if update_date is not None:
        add_condition("update_date", update_date, exact=True, is_date=True)
    
    # Date range filters
    if min_update_date:
        if isinstance(min_update_date, str):
            min_update_date = datetime.strptime(min_update_date, '%Y-%m-%d').date()
        params['min_date'] = min_update_date
        query += " AND update_date >= %(min_date)s"
    
    if max_update_date:
        if isinstance(max_update_date, str):
            max_update_date = datetime.strptime(max_update_date, '%Y-%m-%d').date()
        params['max_date'] = max_update_date
        query += " AND update_date <= %(max_date)s"
    
    # Execute the query
    engine = get_db_engine()
    try:
        with engine.connect() as conn:
            if params:
                with conn.connection.cursor() as cursor:
                    cursor.execute(query, params)
                    columns = [desc[0] for desc in cursor.description]
                    data = cursor.fetchall()
                    return pd.DataFrame(data, columns=columns)
            else:
                return pd.read_sql(query, conn)
    finally:
        engine.dispose()

In [120]:
# Get Accounts information Function
def get_accounts(
    account_no: Union[str, List[str], None] = None,
    account_type: Union[str, List[str], None] = None,
    customer_id: Union[str, List[str], None] = None,
    account_status: Union[str, List[str], None] = None,
    activation_date: Union[str, datetime.date, List[Union[str, datetime.date]], None] = None,
    exact_match: bool = True,
    min_activation_date: Union[str, datetime.date, None] = None,
    max_activation_date: Union[str, datetime.date, None] = None,
    select_columns: Union[str, List[str], None] = None
) -> pd.DataFrame:
    """
    Flexible account data retrieval with support for all parameter types.

    Parameters:
    - account_no: Single or list of account numbers
    - account_type: Single or list of account types
    - customer_id: Single or list of customer IDs
    - account_status: Single or list of statuses
    - activation_date: Single or list of activation dates
    - exact_match: Use False for partial text matching
    - min_activation_date: Inclusive minimum activation date
    - max_activation_date: Inclusive maximum activation date
    - select_columns: Optional list or string of specific columns to return

    Returns:
    - Pandas DataFrame with matching account records
    """

    #Handle column selection
    if select_columns:
        if isinstance(select_columns, list):
            column_str = ", ".join(select_columns)
        else:
            column_str = select_columns
    else:
        column_str = "*"

    query = f"SELECT {column_str} FROM accounts WHERE 1=1"
    params = {}
    param_counter = 0

    def add_condition(field, value, exact=True, is_date=False):
        nonlocal query, params, param_counter
        param_prefix = f"{field}_{param_counter}"
        param_counter += 1

        if isinstance(value, (list, tuple)):
            conditions = []
            for i, val in enumerate(value):
                param_name = f"{param_prefix}_{i}"
                if is_date and isinstance(val, str):
                    val = datetime.strptime(val, '%Y-%m-%d').date()
                if exact:
                    conditions.append(f"{field} = %({param_name})s")
                else:
                    conditions.append(f"{field} ILIKE %({param_name})s")
                    val = f"%{val}%" if not is_date else val
                params[param_name] = val
            query += " AND (" + " OR ".join(conditions) + ")"
        else:
            if is_date and isinstance(value, str):
                value = datetime.strptime(value, '%Y-%m-%d').date()
            if exact:
                query += f" AND {field} = %({param_prefix})s"
            else:
                query += f" AND {field} ILIKE %({param_prefix})s"
                value = f"%{value}%" if not is_date else value
            params[param_prefix] = value

    # Apply filters
    if account_no is not None:
        add_condition("account_no", account_no, exact=True)
    if account_type is not None:
        add_condition("account_type", account_type, exact=exact_match)
    if customer_id is not None:
        add_condition("customer_id", customer_id, exact=True)
    if account_status is not None:
        add_condition("account_status", account_status, exact=exact_match)
    if activation_date is not None:
        add_condition("activation_date", activation_date, exact=True, is_date=True)
    if min_activation_date:
        if isinstance(min_activation_date, str):
            min_activation_date = datetime.strptime(min_activation_date, '%Y-%m-%d').date()
        query += " AND activation_date >= %(min_date)s"
        params['min_date'] = min_activation_date
    if max_activation_date:
        if isinstance(max_activation_date, str):
            max_activation_date = datetime.strptime(max_activation_date, '%Y-%m-%d').date()
        query += " AND activation_date <= %(max_date)s"
        params['max_date'] = max_activation_date

    # Execute the query
    engine = get_db_engine()
    try:
        with engine.connect() as conn:
            if params:
                with conn.connection.cursor() as cursor:
                    cursor.execute(query, params)
                    columns = [desc[0] for desc in cursor.description]
                    data = cursor.fetchall()
                    return pd.DataFrame(data, columns=columns)
            else:
                return pd.read_sql(query, conn)
    finally:
        engine.dispose()


In [121]:
# Get Transaction information Function
def get_transactions(
    transaction_id: Union[int, List[int], None] = None,
    account_no: Union[str, List[str], None] = None,
    customer_id: Union[str, List[str], None] = None,
    amount: Union[float, List[float], None] = None,
    min_amount: float = None,
    max_amount: float = None,
    transaction_time: Union[str, datetime, List[Union[str, datetime]], None] = None,
    min_transaction_time: Union[str, datetime, None] = None,
    max_transaction_time: Union[str, datetime, None] = None,
    select_columns: Union[str, List[str], None] = None
) -> pd.DataFrame:
    """
    Flexible transaction data retrieval with filters on all key fields.

    Parameters:
    - transaction_id: single ID or list
    - account_no: single or list of account numbers
    - customer_id: single or list of customer IDs
    - amount: exact amount or list of exact amounts
    - min_amount: lower bound for amount
    - max_amount: upper bound for amount
    - transaction_time: exact timestamp or list of timestamps
    - min_transaction_time: datetime lower bound
    - max_transaction_time: datetime upper bound
    - select_columns: List or comma-separated string of columns to return

    Returns:
    - Pandas DataFrame with matching transactions
    """

    # Handle selected columns
    if select_columns:
        if isinstance(select_columns, list):
            column_str = ", ".join(select_columns)
        else:
            column_str = select_columns
    else:
        column_str = "*"

    query = f"SELECT {column_str} FROM transactions WHERE 1=1"
    params = {}
    param_counter = 0

    def add_condition(field, value, is_date=False):
        nonlocal query, params, param_counter
        param_prefix = f"{field}_{param_counter}"
        param_counter += 1

        if isinstance(value, (list, tuple)):
            conditions = []
            for i, val in enumerate(value):
                param_name = f"{param_prefix}_{i}"
                if is_date and isinstance(val, str):
                    val = datetime.strptime(val, '%Y-%m-%d %H:%M:%S')
                conditions.append(f"{field} = %({param_name})s")
                params[param_name] = val
            query += " AND (" + " OR ".join(conditions) + ")"
        else:
            if is_date and isinstance(value, str):
                value = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
            query += f" AND {field} = %({param_prefix})s"
            params[param_prefix] = value

    # Apply filters
    if transaction_id is not None:
        add_condition("transaction_id", transaction_id)
    if account_no is not None:
        add_condition("account_no", account_no)
    if customer_id is not None:
        add_condition("customer_id", customer_id)
    if amount is not None:
        add_condition("amount", amount)
    if min_amount is not None:
        query += " AND amount >= %(min_amount)s"
        params['min_amount'] = min_amount
    if max_amount is not None:
        query += " AND amount <= %(max_amount)s"
        params['max_amount'] = max_amount
    if transaction_time is not None:
        add_condition("transaction_time", transaction_time, is_date=True)
    if min_transaction_time:
        if isinstance(min_transaction_time, str):
            min_transaction_time = datetime.strptime(min_transaction_time, '%Y-%m-%d %H:%M:%S')
        query += " AND transaction_time >= %(min_time)s"
        params['min_time'] = min_transaction_time
    if max_transaction_time:
        if isinstance(max_transaction_time, str):
            max_transaction_time = datetime.strptime(max_transaction_time, '%Y-%m-%d %H:%M:%S')
        query += " AND transaction_time <= %(max_time)s"
        params['max_time'] = max_transaction_time

    # Execute query
    engine = get_db_engine()
    try:
        with engine.connect() as conn:
            if params:
                with conn.connection.cursor() as cursor:
                    cursor.execute(query, params)
                    columns = [desc[0] for desc in cursor.description]
                    data = cursor.fetchall()
                    return pd.DataFrame(data, columns=columns)
            else:
                return pd.read_sql(query, conn)
    finally:
        engine.dispose()


# LEVEL 1: Merging/Joining The Tables 

In [124]:
def multi_join_cust_acc(
    join_type: str = "left",
    customer_on: Union[str, List[str]] = "customer_id",
    account_on: Union[str, List[str]] = "customer_id",
    customer_filters: Optional[dict] = None,
    account_filters: Optional[dict] = None,
    cust: Optional[pd.DataFrame] = None,
    acc: Optional[pd.DataFrame] = None,
    select_columns: Union[List[str], str, None] = None
) -> pd.DataFrame:
    """
    Joins customer and account data on specified keys.

    Parameters:
    - join_type (str): Type of join (e.g., 'left', 'inner'). Default is 'left'.
    - customer_on (str or List[str]): Column(s) in customer data to join on.
    - account_on (str or List[str]): Column(s) in account data to join on.
    - customer_filters (dict): Filters to apply if cust DataFrame is not provided.
    - account_filters (dict): Filters to apply if acc DataFrame is not provided.
    - cust (pd.DataFrame): Optional. If provided, used directly as customers data.
    - acc (pd.DataFrame): Optional. If provided, used directly as accounts data.
    - select_columns (str or List[str]): Columns to keep in final DataFrame.

    Returns:
    - pd.DataFrame: Merged DataFrame based on join configuration.
    """

    # Use provided DataFrames or fetch with filters
    customers_df = cust if cust is not None else get_customers(**(customer_filters or {}))
    accounts_df = acc if acc is not None else get_accounts(**(account_filters or {}))

    # Normalize join keys
    customer_keys = [customer_on] if isinstance(customer_on, str) else customer_on
    account_keys = [account_on] if isinstance(account_on, str) else account_on

    # Perform join
    merged = pd.merge(
        customers_df,
        accounts_df,
        how=join_type.lower(),
        left_on=customer_keys,
        right_on=account_keys,
        suffixes=('', '_dup')
    )

    # Drop duplicate join cols
    for ak in account_keys:
        dup_col = f"{ak}_dup"
        if dup_col in merged.columns:
            merged.drop(columns=[dup_col], inplace=True)

    # Select specific columns if provided
    if select_columns:
        if isinstance(select_columns, str):
            select_columns = [select_columns]
        merged = merged[select_columns]

    return merged


In [125]:
def multi_join_cust_tran(
    join_type: str = "left",
    customer_on: Union[str, List[str]] = "customer_id",
    transaction_on: Union[str, List[str]] = "customer_id",
    customer_filters: Optional[dict] = None,
    transaction_filters: Optional[dict] = None,
    cust: Optional[pd.DataFrame] = None,
    tran: Optional[pd.DataFrame] = None,
    select_columns: Union[List[str], str, None] = None  #
) -> pd.DataFrame:
    """
    Joins customer and transaction data on specified keys.

    Parameters:
    - join_type (str): Type of SQL join ('left', 'inner', etc.). Default is 'left'.
    - customer_on (str or List[str]): Key(s) from the customer table to join on.
    - transaction_on (str or List[str]): Key(s) from the transaction table to join on.
    - customer_filters (dict): Filters for get_customers if no cust DataFrame is passed.
    - transaction_filters (dict): Filters for get_transactions if no tran DataFrame is passed.
    - cust (pd.DataFrame): Optional. Pre-filtered customer DataFrame.
    - tran (pd.DataFrame): Optional. Pre-filtered transaction DataFrame.
    - select_columns (str or List[str], optional): Columns to include in the final result.

    Returns:
    - pd.DataFrame: Joined DataFrame of customers and transactions.
    """

    customers_df = cust if cust is not None else get_customers(**(customer_filters or {}))
    transactions_df = tran if tran is not None else get_transactions(**(transaction_filters or {}))

    left_keys = [customer_on] if isinstance(customer_on, str) else customer_on
    right_keys = [transaction_on] if isinstance(transaction_on, str) else transaction_on

    merged = pd.merge(
        customers_df,
        transactions_df,
        how=join_type.lower(),
        left_on=left_keys,
        right_on=right_keys,
        suffixes=('', '_dup')
    )

    # Drop duplicate join columns from transactions
    for rk in right_keys:
        dup_col = f"{rk}_dup"
        if dup_col in merged.columns:
            merged.drop(columns=[dup_col], inplace=True)

    # Select only specified columns if requested
    if select_columns:
        if isinstance(select_columns, str):
            select_columns = [select_columns]
        merged = merged[select_columns]

    return merged

In [126]:
def multi_join_acc_tran(
    join_type: str = "left",
    account_on: Union[str, List[str]] = "account_no",
    transaction_on: Union[str, List[str]] = "account_no",
    account_filters: Optional[dict] = None,
    transaction_filters: Optional[dict] = None,
    acc: Optional[pd.DataFrame] = None,
    tran: Optional[pd.DataFrame] = None,
    select_columns: Union[str, List[str], None] = None
) -> pd.DataFrame:
    """
    Joins account and transaction data on specified keys.

    Parameters:
    - join_type (str): Type of join ('left', 'inner', etc.). Default is 'left'.
    - account_on (str or List[str]): Key(s) from the account table.
    - transaction_on (str or List[str]): Key(s) from the transaction table.
    - account_filters (dict): Filters to apply on get_accounts() if acc not provided.
    - transaction_filters (dict): Filters to apply on get_transactions() if tran not provided.
    - acc (pd.DataFrame): Optional. Pre-filtered account DataFrame.
    - tran (pd.DataFrame): Optional. Pre-filtered transaction DataFrame.
    - select_columns (str or List[str], optional): Subset of columns to return.

    Returns:
    - pd.DataFrame: Joined account and transaction data.
    """

    accounts_df = acc if acc is not None else get_accounts(**(account_filters or {}))
    transactions_df = tran if tran is not None else get_transactions(**(transaction_filters or {}))

    left_keys = [account_on] if isinstance(account_on, str) else account_on
    right_keys = [transaction_on] if isinstance(transaction_on, str) else transaction_on

    merged = pd.merge(
        accounts_df,
        transactions_df,
        how=join_type.lower(),
        left_on=left_keys,
        right_on=right_keys,
        suffixes=('', '_dup')
    )

    for rk in right_keys:
        dup_col = f"{rk}_dup"
        if dup_col in merged.columns:
            merged.drop(columns=[dup_col], inplace=True)

    if select_columns:
        if isinstance(select_columns, str):
            select_columns = [select_columns]
        merged = merged[select_columns]

    return merged

# LEVEL 3: Super Function

In [127]:
def super_join_cust_acc_tran(
    join_type: str = "left",
    customer_key: Union[str, List[str]] = "customer_id",
    account_key: Union[str, List[str]] = "account_no",
    transaction_key: Union[str, List[str]] = "transaction_id",
    cust: Optional[pd.DataFrame] = None,
    acc: Optional[pd.DataFrame] = None,
    tran: Optional[pd.DataFrame] = None,
    cust_acc_df: Optional[pd.DataFrame] = None,
    cust_tran_df: Optional[pd.DataFrame] = None,
    acc_tran_df: Optional[pd.DataFrame] = None,
    select_columns: Union[str, List[str], None] = None
) -> pd.DataFrame:
    """
    Joins customer-account, customer-transaction, and account-transaction data 
    into a unified DataFrame.

    Parameters:
    - join_type (str): Type of join to use (default='left').
    - customer_key (str or list of str): Keys to join on for customer joins.
    - account_key (str or list of str): Keys to join on for account joins.
    - transaction_key (str or list of str): Keys to join on for transaction joins.
    - cust, acc, tran (pd.DataFrame): Optional pre-fetched raw DataFrames.
    - cust_acc_df, cust_tran_df, acc_tran_df (pd.DataFrame): Optional pre-joined DataFrames.
    - select_columns (str or List[str], optional): If specified, returns only selected columns.

    Returns:
    - pd.DataFrame: Fully merged DataFrame of customers, accounts, and transactions.
    """

    # Fallback to default functions if not provided
    if cust_acc_df is None:
        cust_acc_df = get_cust_acc(cust=cust, acc=acc)
    if cust_tran_df is None:
        cust_tran_df = get_cust_tran(cust=cust, tran=tran)
    if acc_tran_df is None:
        acc_tran_df = get_acc_tran(acc=acc, tran=tran)

    # Normalize join keys to lists
    customer_keys = [customer_key] if isinstance(customer_key, str) else customer_key
    account_keys = [account_key] if isinstance(account_key, str) else account_key
    transaction_keys = [transaction_key] if isinstance(transaction_key, str) else transaction_key

    # Merge customer-account with customer-transaction on customer keys
    cust_merge = pd.merge(
        cust_acc_df,
        cust_tran_df,
        how=join_type,
        on=customer_keys,
        suffixes=('', '_cust_tran')
    )

    # Merge with account-transaction on account keys
    final_merge = pd.merge(
        cust_merge,
        acc_tran_df,
        how=join_type,
        on=account_keys,
        suffixes=('', '_acc_tran')
    )

    # Drop duplicate transaction join keys if present
    for key in transaction_keys:
        for suffix in ['_cust_tran', '_acc_tran']:
            dup_col = f"{key}{suffix}"
            if dup_col in final_merge.columns:
                final_merge.drop(columns=[dup_col], inplace=True)

    # Return only selected columns if provided
    if select_columns:
        if isinstance(select_columns, str):
            select_columns = [select_columns]
        final_merge = final_merge[select_columns]

    return final_merge


In [128]:
# changes in website make it such that if the user dont want to use custome code he/she can directly add the 
# rules by using the above only and if wants to use the custom code only he/she can write it but
# dont ask the user to write both for the rules to be executed properly

In [129]:
# check how to make it platform independent like A--->B--->C
#                                                A<---B<---C

In [130]:
# A : our code of everything in pandas
# B : Connect it with FAST API
# C : Say the client is using Java and he wants to access the Data Frame 

# Single Functions for each level 

In [131]:
def get_table_data(
    table_name: str,
    filters: Dict[str, Any] = None,
    date_fields: List[str] = None,
    min_max_fields: Dict[str, Dict[str, Union[str, datetime, float]]] = None,
    select_columns: Union[str, List[str], None] = None,
    df: Optional[pd.DataFrame] = None
) -> pd.DataFrame:
    """
    Generalized function to fetch records from any table or passed DataFrame (customer, accounts, transactions).
    
    Parameters:
    - table_name: Name of the table (ignored if df is provided)
    - filters: Dictionary of exact values or lists to filter (e.g., {'name': ['Alice', 'Bob'], 'city': 'Delhi'})
    - date_fields: List of fields that are dates or datetimes (used for parsing)
    - min_max_fields: Dict with min/max filters. Format:
        {
            'update_date': {'min': '2024-01-01', 'max': '2024-12-31'},
            'amount': {'min': 1000, 'max': 100000}
        }
    - select_columns: Comma string or list of columns to return
    - df: Optional. If provided, will filter on this DataFrame instead of querying the DB
    
    Returns:
    - Filtered DataFrame
    """
    
    filters = filters or {}
    date_fields = date_fields or []
    min_max_fields = min_max_fields or {}

    def parse_value(val, is_date=False):
        if is_date:
            if isinstance(val, str):
                try:
                    return datetime.strptime(val, '%Y-%m-%d')
                except:
                    return datetime.strptime(val, '%Y-%m-%d %H:%M:%S')
        return val

    # --------------------------------------------
    # If df is provided, apply filtering directly
    # --------------------------------------------
    if df is not None:
        df_copy = df.copy()

        # Apply filters
        for field, value in filters.items():
            is_date = field in date_fields
            if isinstance(value, (list, tuple)):
                df_copy = df_copy[df_copy[field].isin([parse_value(v, is_date) for v in value])]
            else:
                df_copy = df_copy[df_copy[field] == parse_value(value, is_date)]

        # Apply min/max filters
        for field, bounds in min_max_fields.items():
            if 'min' in bounds:
                df_copy = df_copy[df_copy[field] >= parse_value(bounds['min'], field in date_fields)]
            if 'max' in bounds:
                df_copy = df_copy[df_copy[field] <= parse_value(bounds['max'], field in date_fields)]

        # Select columns
        if select_columns:
            if isinstance(select_columns, str):
                select_columns = [select_columns]
            df_copy = df_copy[select_columns]

        return df_copy

    # -----------------------------------------------------
    # Else: fallback to database query using psycopg2/sql
    # -----------------------------------------------------
    from sqlalchemy import create_engine
    engine = get_db_engine()

    if isinstance(select_columns, list):
        column_str = ", ".join(select_columns)
    elif isinstance(select_columns, str):
        column_str = select_columns
    else:
        column_str = "*"

    query = f"SELECT {column_str} FROM {table_name} WHERE 1=1"
    params = {}
    param_counter = 0

    for field, value in filters.items():
        is_date = field in date_fields
        prefix = f"{field}_{param_counter}"
        param_counter += 1

        if isinstance(value, (list, tuple)):
            clause_parts = []
            for i, val in enumerate(value):
                pname = f"{prefix}_{i}"
                clause_parts.append(f"{field} = %({pname})s")
                params[pname] = parse_value(val, is_date)
            query += " AND (" + " OR ".join(clause_parts) + ")"
        else:
            pname = f"{prefix}"
            params[pname] = parse_value(value, is_date)
            query += f" AND {field} = %({pname})s"

    for field, bounds in min_max_fields.items():
        if 'min' in bounds:
            query += f" AND {field} >= %(min_{field})s"
            params[f"min_{field}"] = parse_value(bounds['min'], field in date_fields)
        if 'max' in bounds:
            query += f" AND {field} <= %(max_{field})s"
            params[f"max_{field}"] = parse_value(bounds['max'], field in date_fields)

    try:
        with engine.connect() as conn:
            if params:
                with conn.connection.cursor() as cursor:
                    cursor.execute(query, params)
                    columns = [desc[0] for desc in cursor.description]
                    rows = cursor.fetchall()
                    return pd.DataFrame(rows, columns=columns)
            else:
                return pd.read_sql(query, conn)
    finally:
        engine.dispose()


In [102]:
def universal_data_join(
    tables_to_join: List[str],
    join_type: str = "left",
    join_keys: Dict[str, str] = None,
    filters: Dict[str, dict] = None,
    select_columns: Union[List[str], str, None] = None,
    preloaded_data: Dict[str, pd.DataFrame] = None
) -> pd.DataFrame:
    """
    Dynamically joins selected tables (customer, account, transaction) based on input order.

    Parameters:
    - tables_to_join: List of table names to join, in order (e.g. ["customer", "account"])
    - join_type: Join type ('left', 'inner', etc.)
    - join_keys: Dict of table_name → join column in that table
    - filters: Optional dict of filters per table
    - preloaded_data: Optional dict of DataFrames per table
    - select_columns: Optional final list of columns to return

    Returns:
    - pd.DataFrame: Final merged result
    """
    assert len(tables_to_join) >= 2, "You must select at least two tables to join."
    assert join_keys is not None, "You must specify join keys for each table."

    fetch_functions = {
        "customer": get_customers,
        "account": get_accounts,
        "transaction": get_transactions
    }

    dataframes = {}

    for table in tables_to_join:
        if preloaded_data and table in preloaded_data:
            dataframes[table] = preloaded_data[table]
        else:
            if table not in fetch_functions:
                raise ValueError(f"No fetch function available for table '{table}'")
            dataframes[table] = fetch_functions[table](**(filters.get(table, {}) if filters else {}))

    # Start joining
    merged = dataframes[tables_to_join[0]]
    
    for i in range(1, len(tables_to_join)):
        left_table = tables_to_join[i - 1]
        right_table = tables_to_join[i]
        left_key = join_keys[left_table]
        right_key = join_keys[right_table]

        merged = pd.merge(
            merged,
            dataframes[right_table],
            how=join_type,
            left_on=left_key,
            right_on=right_key,
            suffixes=('', f'_{right_table}')
        )

    if select_columns:
        if isinstance(select_columns, str):
            select_columns = [select_columns]
        merged = merged[select_columns]

    return merged


In [132]:
universal_data_join(tables_to_join=["customer","account"],
                   join_keys={"customer":"customer_id","account":"customer_id"},
                    filters={"customer":{"city":"Jaipur"},
                            "account":{"account_type":"Savings"}
                            },
                    select_columns=["name","city","account_type","account_status"]
                   )

Unnamed: 0,name,city,account_type,account_status
0,Rohan Mehta,Jaipur,Savings,Active
1,Vivek Choudhary,Jaipur,Savings,Suspended
2,Neeta Sharma,Jaipur,,
3,Amit Sharma,Jaipur,,


# Making The Above Two Function as a Class

In [133]:
# Base Table names are:
#1.customer
#2.accounts
#3.transactions

In [134]:
# combine the two function into a class such that it can be accesed anywhere
class DataJoiner:
    def __init__(self, db_engine_func):
        self.get_db_engine = db_engine_func  # A function that returns a DB engine or simply it connects with the database engine

    def get_table_data(
        self,
        table_name: str,
        filters: Dict[str, Any] = None,
        date_fields: List[str] = None,
        min_max_fields: Dict[str, Dict[str, Union[str, datetime, float]]] = None,
        select_columns: Union[str, List[str], None] = None,
        df: Optional[pd.DataFrame] = None
    ) -> pd.DataFrame:
        """
        Generalized function to fetch records from any table or passed DataFrame (customer, accounts, transactions).
        
        Parameters:
        - table_name: Name of the table (ignored if df is provided)
        - filters: Dictionary of exact values or lists to filter
        - date_fields: List of fields that are dates or datetimes
        - min_max_fields: Dict with min/max filters
        - select_columns: Columns to return
        - df: Optional. If provided, filters on this DataFrame
        
        Returns:
        - pd.DataFrame: Filtered DataFrame
        """
        filters = filters or {}
        date_fields = date_fields or []
        min_max_fields = min_max_fields or {}

        def parse_value(val, is_date=False):
            if is_date and isinstance(val, str):
                try:
                    return datetime.strptime(val, '%Y-%m-%d')
                except:
                    return datetime.strptime(val, '%Y-%m-%d %H:%M:%S')
            return val

        if df is not None:
            df_copy = df.copy()

            for field, value in filters.items():
                is_date = field in date_fields
                if isinstance(value, (list, tuple)):
                    df_copy = df_copy[df_copy[field].isin([parse_value(v, is_date) for v in value])]
                else:
                    df_copy = df_copy[df_copy[field] == parse_value(value, is_date)]

            for field, bounds in min_max_fields.items():
                if 'min' in bounds:
                    df_copy = df_copy[df_copy[field] >= parse_value(bounds['min'], field in date_fields)]
                if 'max' in bounds:
                    df_copy = df_copy[df_copy[field] <= parse_value(bounds['max'], field in date_fields)]

            if select_columns:
                if isinstance(select_columns, str):
                    select_columns = [select_columns]
                df_copy = df_copy[select_columns]

            return df_copy

        # DB fetching
        engine = self.get_db_engine()
        column_str = ", ".join(select_columns) if isinstance(select_columns, list) else (select_columns or "*")
        query = f"SELECT {column_str} FROM {table_name} WHERE 1=1"
        params = {}
        param_counter = 0

        for field, value in filters.items():
            is_date = field in date_fields
            prefix = f"{field}_{param_counter}"
            param_counter += 1
            if isinstance(value, (list, tuple)):
                parts = []
                for i, val in enumerate(value):
                    pname = f"{prefix}_{i}"
                    parts.append(f"{field} = %({pname})s")
                    params[pname] = parse_value(val, is_date)
                query += " AND (" + " OR ".join(parts) + ")"
            else:
                pname = prefix
                query += f" AND {field} = %({pname})s"
                params[pname] = parse_value(value, is_date)

        for field, bounds in min_max_fields.items():
            if 'min' in bounds:
                query += f" AND {field} >= %(min_{field})s"
                params[f"min_{field}"] = parse_value(bounds['min'], field in date_fields)
            if 'max' in bounds:
                query += f" AND {field} <= %(max_{field})s"
                params[f"max_{field}"] = parse_value(bounds['max'], field in date_fields)

        with engine.connect() as conn:
            if params:
                with conn.connection.cursor() as cursor:
                    cursor.execute(query, params)
                    rows = cursor.fetchall()
                    cols = [desc[0] for desc in cursor.description]
                    return pd.DataFrame(rows, columns=cols)
            else:
                return pd.read_sql(query, conn)

    def universal_data_join(
        self,
        tables_to_join: List[str],
        join_type: str = "left",
        join_keys: Dict[str, str] = None,
        filters: Dict[str, dict] = None,
        select_columns: Union[List[str], str, None] = None,
        preloaded_data: Dict[str, pd.DataFrame] = None,
        date_fields: Dict[str, List[str]] = None,
        min_max_fields: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = None
    ) -> pd.DataFrame:
        """
        Dynamically joins selected tables (customer, account, transaction) based on input order.

        Parameters:
        - tables_to_join: List of table names to join, in order
        - join_type: Type of join
        - join_keys: Dict of table_name → join column in that table
        - filters: Optional filters per table
        - preloaded_data: Optional dict of DataFrames per table
        - select_columns: Optional list of columns to return
        - date_fields: Dict of table_name → date columns
        - min_max_fields: Dict of table_name → field → {min, max}

        Returns:
        - pd.DataFrame: Final merged DataFrame
        """
        assert len(tables_to_join) >= 2, "You must join at least two tables."
        assert join_keys is not None, "You must specify join keys for each table."

        dataframes = {}

        for table in tables_to_join:
            if preloaded_data and table in preloaded_data:
                df = preloaded_data[table]
            else:
                df = self.get_table_data(
                    table_name=table,
                    filters=(filters or {}).get(table, {}),
                    date_fields=(date_fields or {}).get(table, []),
                    min_max_fields=(min_max_fields or {}).get(table, {}),
                    select_columns=None
                )
            dataframes[table] = df

        merged = dataframes[tables_to_join[0]]

        for i in range(1, len(tables_to_join)):
            left_table = tables_to_join[i - 1]
            right_table = tables_to_join[i]
            left_key = join_keys[left_table]
            right_key = join_keys[right_table]

            merged = pd.merge(
                merged,
                dataframes[right_table],
                how=join_type,
                left_on=left_key,
                right_on=right_key,
                suffixes=('', f'_{right_table}')
            )

        if select_columns:
            if isinstance(select_columns, str):
                select_columns = [select_columns]
            merged = merged[select_columns]

        return merged

In [None]:
universal_data_join()