In [5]:
# !pip install awswrangler
# !pip install holidays

In [6]:
import awswrangler as wr
import pandas as pd
import datetime as dt
from datetime import datetime
import holidays

In [7]:
#TIMEFRAME
start_date = '2021-01-01'
end_date = '2023-10-21'

### DFs generation

In [54]:
# DB Setting
bucket_name = 's3://viamericas-datalake-dev-us-east-1-283731589572-athena/'
origin_name = 'AwsDataCatalog'
database_name= 'viamericas'
table_name = 'daily_check_gp'

In [55]:
df = wr.athena.read_sql_table(
    table=table_name,
    database=database_name,
)

  return cls.dispatch_func(func)(*args, **kw)


In [56]:
df.day.max() # Solo para chequear

'2024-02-03'

In [57]:
# Convert the 'date' column to datetime format
df['day'] = pd.to_datetime(df['day'])
# Grouping by 'payer' and 'country' concatenated for this level of granularity
df['payer_country'] = df['payer'] + '_' + df['country']
# Margin (when tx !=0)
df['margin'] = df.apply(lambda row: row['gp'] / row['tx'] if row['tx'] != 0 else 0, axis=1)
df['margin'] = df['margin'].apply(lambda x: float(x)).round(4)

In [58]:
# Specify date range
df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]

In [60]:
#df[df['tx'] == 0] # Hay casos donde tx es 0, pero GP no es cero

In [61]:
#Connection to daily_forex 
forex_table = 'last_daily_forex_country'

rates = wr.athena.read_sql_table(
    table=forex_table,
    database=database_name)

  return cls.dispatch_func(func)(*args, **kw)


In [62]:
rates.head()

Unnamed: 0,symbol,max_feed_date,max_feed_price,country,day
0,USDINR,2022-05-27 23:58:28,77.6922,INDIA,2022-05-27
1,USDMXN,2022-05-27 23:58:28,19.7722,MEXICO,2022-05-27
2,USDBRL,2023-04-15 23:58:27,4.9099,BRAZIL,2023-04-15
3,USDMXN,2023-04-15 23:58:27,18.0108,MEXICO,2023-04-15
4,USDINR,2023-06-13 23:58:27,82.3874,INDIA,2023-06-13


In [63]:
# FOREX - Selecting columns & renaming
rates['day'] = pd.to_datetime(rates['day'])
rates = rates[['day','country','max_feed_price']]

### UNIVERSE

In [64]:
#Filtros
df = df[df['payer'] != 'EXPIRED ORDERS']
df = df[df['amount'] != 0] # Excluding 0 (flag A & Flag C), defined in EDA

In [65]:
df['amount'].sum()

Decimal('25098657847.5725')

In [66]:
# AGING FILTER

def aging_filter(df):
    """
    Filter a DataFrame based on aging criteria described in aging.ipynb

    Args:
        df (pandas.DataFrame): Input DataFrame with columns 'date', 'payer_country', 'amount', and 'tx'.

    Returns:
        pandas.DataFrame: Filtered DataFrame containing only the rows that meet the aging criteria.
    """
    # Find the last date in the sample
    last_date_sample = df['day'].max()

    # Calculate the limit date, one day before the last date in the sample
    limit_date = last_date_sample - pd.Timedelta(days=1)

    # Aggregate data by 'payer_country'
    result = (
        df.groupby('payer_country')
        .agg(
            first_date=('day', 'min'),
            last_date=('day', 'max'),
            total_amount=('amount', 'sum'),
            total_transactions=('tx', 'sum')
        )
        .reset_index()
    )

    # Calculate age of payer
    result['age_payer'] = ((limit_date - result['first_date']).dt.days / 30).round(2)

    # Calculate active time
    result['active_time'] = ((result['last_date'] - result['first_date']).dt.days / 30).round(2)

    # Calculate inactive time
    result['inactive_time'] = ((limit_date - result['last_date']).dt.days / 30).round(2)

    # Sort the DataFrame by 'total_amount' from highest to lowest
    result = result.sort_values(by='total_amount', ascending=False)

    # Filter the DataFrame based on conditions
    aging_universe = result.loc[
        (result.age_payer >= 3) & 
        (result.inactive_time <= 3) & 
        (result.total_amount > 10000) & 
        (result.total_transactions > 50)
    ]
    
    return aging_universe

In [67]:
# Defining Universe
df_aging = aging_filter(df) #Filtering 'payer_country' based on Aging notebook
df_filtered = df[df['payer_country'].isin(df_aging['payer_country'])] # Applying aging filters 
df_filtered['day'] = pd.to_datetime(df_filtered['day'])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_filtered['day'] = pd.to_datetime(df_filtered['day'])


### VARIABLES

In [68]:
def generate_lag_and_variation(df, num_lags):
    """
    Generate lagged values and variations for a given df

    Args:
        df (pandas.DataFrame): Input df with columns 'symbol' and 'feed_price'.
        num_lags (int): Number of lagged values to generate.

    Returns:
        pandas.DataFrame: df with lagged values and variations added as new columns.
    """
    # Create columns for each day's lag up to the defined maximum
    for i in range(1, num_lags + 1):
        col_name = f'rate_lag_{i}'
        # Shift the 'feed_price' column grouped by 'symbol'
        df[col_name] = df.groupby('country')['max_feed_price'].shift(i)

    # Calculate the variation columns between consecutive lags
    for i in range(1, num_lags):
        col_name = f'var_rate_lag_{i}'
        # Calculate the difference between consecutive lag columns
        df[col_name] = df[f'rate_lag_{i}'] - df[f'rate_lag_{i + 1}']

    return df


In [69]:
rates_number = 30
rates = generate_lag_and_variation(rates, rates_number)

In [71]:
# Primera fusion: traigo rates al df donde filtré el universo
df1 = pd.merge(df_filtered, rates, on=['day', 'country'], how='left')

In [72]:
df1['date'] = pd.to_datetime(df1['date'])

In [73]:
### EFFECT OF CANCELED TRANSACTIONS ###
# ES DISTINTA PORQUE DAILY_CHECK TIENE ALGUNOS FILTROS Y ESTA NO
database_name= 'analytics'
table2_name = 'daily_sales_count_cancelled_v2'##WE LOAD THE BASE WITH CANCELLATIONS

df_canc = wr.athena.read_sql_table(
    table=table2_name,
    database=database_name)

  return cls.dispatch_func(func)(*args, **kw)


In [74]:
df_canc['date'] = pd.to_datetime(df_canc['date'])
df_canc['payer_country'] = df_canc['payer'] +'_'+ df_canc['country']
# Specific date range
df_canc = df_canc[(df_canc['date'] >= start_date) & (df_canc['date'] <= end_date)]

In [75]:
def fill_missing_dates(df, start_date, end_date):
    """
    Fill missing dates in the DataFrame with zero values and ensure all date ranges are covered.

    Args:
        df (pandas.DataFrame): Input DataFrame with columns 'date', 'amount', 'tx_cancelled', 'payer_country', etc.
        start_date (str or datetime.date): Start date of the desired date range.
        end_date (str or datetime.date): End date of the desired date range.

    Returns:
        pandas.DataFrame: DataFrame with missing dates filled and all date ranges covered.
    """
    # Create an empty DataFrame with the specified date range
    date_range = pd.date_range(start=start_date, end=end_date)
    df_fill = pd.DataFrame({'date': date_range, 'amount': 0, 'tx_cancelled': 0})
    df_fill['date'] = pd.to_datetime(df_fill['date']).dt.date

    # Sort the original DataFrame by 'country', 'payer', and 'date'
    df = df.sort_values(by=['country', 'payer', 'date'])

    # Create an empty DataFrame to hold the result
    result_df = pd.DataFrame()

    # Loop through each 'payer_country'
    for payer_country in df['payer_country'].unique():
        # Filter DataFrame by 'payer_country'
        df_aux = df[df['payer_country'] == payer_country]

        # Combine df_aux (payer_country) with df_fill, keeping values from df_aux and filling missing dates
        merged_df = df_aux.set_index('date').combine_first(df_fill.set_index('date')).reset_index()

        # Fill missing values in specified columns
        columns_to_fill = ['payer', 'country', 'payer_country']
        merged_df[columns_to_fill] = merged_df[columns_to_fill].ffill().bfill()

        # Concatenate the result with the final DataFrame
        result_df = pd.concat([result_df, merged_df], ignore_index=True)

    return result_df

In [76]:
# Call the function with the specified start_date and end_date
df_full = fill_missing_dates(df_canc, start_date, end_date)

In [77]:
def generate_tx_lags_and_variation(df, tx_count):
    """
    Generate lag columns for cancelled transactions and their variations.

    Args:
    - df: DataFrame containing transaction data
    - tx_count: Number of periods for lag calculation

    Returns:
    - df: DataFrame with added lag and variation columns
    """
    # Sort the dataset based on country, payer, and date
    df = df.sort_values(by=['country', 'payer', 'date'])

    # Create columns for each day's lag up to the defined maximum
    for i in range(1, tx_count + 1):
        col_name = f'tx_cancelled_lag_{i}'
        # Shift the 'tx_cancelled' column grouped by 'country' and 'payer'
        df[col_name] = df.groupby(['country', 'payer'])['tx_cancelled'].shift(i)

    # Calculate the variation columns between consecutive delays
    for i in range(1, tx_count):
        col_name = f'var_tx_cancelled_lag_{i}'
        # Calculate the difference between consecutive lag columns
        df[col_name] = df[f'tx_cancelled_lag_{i}'] - df[f'tx_cancelled_lag_{i + 1}']

    return df

In [78]:
# Call the function and assign the result back to df2
tx_cancelled_lags = 30
df2 = generate_tx_lags_and_variation(df_full, tx_cancelled_lags)

In [79]:
# Coupon ratio
df1['ratio_coupon_tx']=df1.coupon_count/df1.tx

In [80]:
df1.ratio_coupon_tx.min();df1.ratio_coupon_tx.max()

5.0

In [81]:
def generate_coupon_tx_lags(df, tx_count):
    """
    Generate lag columns for coupon_tx ratio

    Args:
    - df: DataFrame containing transaction data
    - tx_count: Number of periods for lag calculation

    Returns:
    - df: DataFrame with added lag and variation columns
    """
    # Sort the dataset based on country, payer, and date
    df = df.sort_values(by=['country', 'payer', 'date'])

    # Create columns for each day's lag up to the defined maximum
    for i in range(1, tx_count + 1):
        col_name = f'ratio_coupon_tx_lag_{i}'
        # Shift the 'ratio_coupon_tx' column grouped by 'country' and 'payer'
        df[col_name] = df.groupby(['country', 'payer'])['ratio_coupon_tx'].shift(i)

    return df

In [82]:
# Call the function and assign the result back to df1
tx_ratio_coupon_tx_lags = 30
df1 = generate_coupon_tx_lags(df1, tx_ratio_coupon_tx_lags)

In [83]:
def generate_tx_lags(df, tx_count):
    """
    Generate lags columns for txs

    Args:
    - df: DataFrame containing transaction data
    - tx_count: Number of periods for lag calculation

    Returns:
    - df: DataFrame with added lag and variation columns
    """
    # Sort the dataset based on country, payer, and date
    df = df.sort_values(by=['country', 'payer', 'date'])

    # Create columns for each day's lag up to the defined maximum
    for i in range(1, tx_count + 1):
        col_name = f'tx_lag_{i}'
        # Shift the 'tx' column grouped by 'country' and 'payer'
        df[col_name] = df.groupby(['country', 'payer'])['tx'].shift(i)

    return df

In [84]:
# Call the function and assign the result back to df1
tx_lags = 30
df1 = generate_tx_lags(df1, tx_lags)

In [85]:
def generate_margin_lags(df, margin_lags):
    """
    Generate lag columns for margin

    Args:
    - df: DataFrame containing transaction data
    - margin_lags: Number of periods for lag calculation

    Returns:
    - df: DataFrame with added lag columns for margin
    """
    # Sort the dataset based on country, payer, and date
    df = df.sort_values(by=['country', 'payer', 'date'])

    # Create columns for each day's lag up to the defined maximum
    for i in range(1, margin_lags + 1):
        col_name = f'margin_lag_{i}'
        # Shift the 'margin' column grouped by 'country' and 'payer'
        df[col_name] = df.groupby(['country', 'payer'])['margin'].shift(i)

    return df

In [86]:
# Call the function and assign the result back to df1
margin_lags = 10
df1 = generate_margin_lags(df1, margin_lags)

In [87]:
df1.tail()

Unnamed: 0,payer,country,date,tx,amount,coupon_count,gp,day,payer_country,margin,...,margin_lag_1,margin_lag_2,margin_lag_3,margin_lag_4,margin_lag_5,margin_lag_6,margin_lag_7,margin_lag_8,margin_lag_9,margin_lag_10
31853,VIAMERICAS USA DEPOSITS,UNITED STATES (ATM & DEPOSITS),2023-08-10,1,951.5,0,4.7575,2023-08-10,VIAMERICAS USA DEPOSITS_UNITED STATES (ATM & D...,4.7575,...,2.5,2.0866,1.499,1.25,0.75,10.726,7.3809,1.5,4.0963,1.5
36124,VIAMERICAS USA DEPOSITS,UNITED STATES (ATM & DEPOSITS),2023-08-11,1,250.48,0,1.2524,2023-08-11,VIAMERICAS USA DEPOSITS_UNITED STATES (ATM & D...,1.2524,...,4.7575,2.5,2.0866,1.499,1.25,0.75,10.726,7.3809,1.5,4.0963
61640,VIAMERICAS USA DEPOSITS,UNITED STATES (ATM & DEPOSITS),2023-08-18,1,200.0,0,1.0,2023-08-18,VIAMERICAS USA DEPOSITS_UNITED STATES (ATM & D...,1.0,...,1.2524,4.7575,2.5,2.0866,1.499,1.25,0.75,10.726,7.3809,1.5
104,VIAMERICAS USA DEPOSITS,UNITED STATES (ATM & DEPOSITS),2023-08-25,1,1000.0,0,4.0172673,2023-08-25,VIAMERICAS USA DEPOSITS_UNITED STATES (ATM & D...,4.0173,...,1.0,1.2524,4.7575,2.5,2.0866,1.499,1.25,0.75,10.726,7.3809
60151,VIAMERICAS USA DEPOSITS,UNITED STATES (ATM & DEPOSITS),2023-09-28,1,1000.0,0,5.0,2023-09-28,VIAMERICAS USA DEPOSITS_UNITED STATES (ATM & D...,5.0,...,4.0173,1.0,1.2524,4.7575,2.5,2.0866,1.499,1.25,0.75,10.726


In [92]:
df_final = pd.merge(df1,df2, on=['date','payer','country', 'payer_country', 'amount'], how='outer')
df_final['date'] = pd.to_datetime(df_final['date'])

In [93]:
df_final.loc[(df_final['payer_country'] == 'ELEKTRA (MEXICO)_MEXICO') & (df_final['date'] == '2022-07-04')]

Unnamed: 0,payer,country,date,tx,amount,coupon_count,gp,day,payer_country,margin,...,var_tx_cancelled_lag_20,var_tx_cancelled_lag_21,var_tx_cancelled_lag_22,var_tx_cancelled_lag_23,var_tx_cancelled_lag_24,var_tx_cancelled_lag_25,var_tx_cancelled_lag_26,var_tx_cancelled_lag_27,var_tx_cancelled_lag_28,var_tx_cancelled_lag_29
58340,ELEKTRA (MEXICO),MEXICO,2022-07-04,8364,4152238.92,90,54173.7271,2022-07-04,ELEKTRA (MEXICO)_MEXICO,6.477,...,-75,73,-8,156,80,23,-69,-93,-29,-30


### DUMMIES

In [97]:
def mark_us_holidays(df):
    """
    Mark US holidays

    Args:
        df (df): DataFrame containing a 'date' column in datetime format.

    Returns:
        DataFrame: DataFrame with an additional 'is_holiday' column, where 1 indicates a US holiday and 0 otherwise.
    """
    # Load US holidays
    us_holidays = holidays.US()
    
    # Define a function to check if a date is a holiday in the US
    def is_us_holiday(date):
        return 1 if date in us_holidays else 0
    
    # Apply the function to the 'date' column and create a new 'is_holiday' column
    df_final['is_holiday'] = df_final['date'].apply(is_us_holiday)
    
    return df

In [98]:
#Applying holiday function 
df_final = mark_us_holidays(df_final)

In [99]:
def calculate_var_30ds(window, row, df_final):
    """
    Calculate the var_30ds metric for a given row based on the average amount in the last 30 days.

    Args:
        window (int): The size of the window in days.
        row (pd.Series): The row containing the data for which var_30ds is to be calculated.
        df_final (pd.DataFrame): The DataFrame containing the historical data.

    Returns:
        float: The calculated var_30ds metric, or 0 if there are insufficient data or if division by zero occurs.
        None: If the row does not correspond to a holiday.

    """
    if row['is_holiday'] == 1:
        # Filter the DataFrame to get only the last 30 days for the current 'payer_country'
        filtered_df = df_final[(df_final['payer_country'] == row['payer_country']) & 
                               (df_final['date'] >= (row['date'] - pd.Timedelta(days=window))) & 
                               (df_final['date'] < row['date'])]
        
        # Check if there are sufficient data points to calculate the average and avoid division by zero errors
        if len(filtered_df) > 0:
            # Calculate the average 'amount' for the current 'payer_country' in the last 30 days
            avg_amount = filtered_df['amount'].mean()
#            print(row['payer_country'],row['date'], avg_amount)
            
            # Calculate var_30ds based on the specified formula
            if avg_amount != 0 and row['amount'] != 0:
                var_30ds = (float(row['amount']) / float(avg_amount)) - 1
                return var_30ds
            else:
                return 0
        else:
            return 0
    else:
        return None


In [100]:
# Aplicar la función calculate_var_30ds a cada fila del DataFrame
window = 30
df_final['var_30ds'] = df_final.apply(lambda row: calculate_var_30ds(window, row, df_final), axis=1)

In [101]:
#df_final[df_final['is_holiday'] == 1]['date'].unique()

In [102]:
def mark_fourth_july(df):
    """
    Mark the Fourth of July in the DataFrame.

    Args:
        df (DataFrame): DataFrame containing a 'date' column in datetime format.

    Returns:
        DataFrame: DataFrame with an additional 'is_fourth_of_july' column.
    """
    # Check if the date is the Fourth of July
    df['is_fourth_of_july'] = (
        (df['date'].dt.month == 7) & (df['date'].dt.day == 4)
    ).astype(int)
    
    return df

In [103]:
df_final = mark_fourth_july(df_final)

In [None]:
# def calculate_var_30ds(window, row, df_final):
#     if row['is_holiday'] == 1:
#         # Filtrar el DataFrame para obtener solo los últimos 30 días para el 'payer_country' actual
#         filter_condition = (df_final['payer_country'] == row['payer_country']) & (df_final['date'] >= (row['date'] - pd.Timedelta(days=window))) & (df_final['date'] < row['date'])
#         filtered_df = df_final[filter_condition]
        
#         # Calcular el promedio de 'amount' para el 'payer_country' actual en los últimos 30 días
#         avg_amount = filtered_df['amount'].mean()
# #        print(avg_amount, row['amount'])

#         if (row['payer_country'] == 'ELEKTRA (MEXICO)_MEXICO') & (row['date'] == datetime.strptime('2023-09-04', '%Y-%m-%d')):
#             print(filtered_df)

        
#         # Calcular var_30ds según la fórmula especificada
#         if avg_amount != 0 and row['amount'] != 0:
# #            print('OK', row['payer_country'], row['date'], avg_amount, row['amount'], filtered_df['date'].min())
#             var_30ds = float(row['amount']) / float(avg_amount) - 1 # Convertir avg_amount a flotante antes de la división
#             return var_30ds  
#         else:
#             return 0
#     else:
#         return None

In [105]:
df_final.loc[(df_final['payer_country'] == 'ELEKTRA (MEXICO)_MEXICO') & (df_final['date'] == '2022-07-04')]

Unnamed: 0,payer,country,date,tx,amount,coupon_count,gp,day,payer_country,margin,...,var_tx_cancelled_lag_23,var_tx_cancelled_lag_24,var_tx_cancelled_lag_25,var_tx_cancelled_lag_26,var_tx_cancelled_lag_27,var_tx_cancelled_lag_28,var_tx_cancelled_lag_29,is_holiday,var_30ds,is_fourth_of_july
58340,ELEKTRA (MEXICO),MEXICO,2022-07-04,8364,4152238.92,90,54173.7271,2022-07-04,ELEKTRA (MEXICO)_MEXICO,6.477,...,156,80,23,-69,-93,-29,-30,1,-0.431529,1


In [108]:
bucket = 'viamericas-datalake-dev-us-east-1-283731589572-analytics'
prefix_abt = 'ABTv2'
file_name = 'ABTv2.csv'

# Ruta S3
s3_path = f"s3://{bucket}/{prefix_abt}/{file_name}"

wr.s3.to_csv(df_final, path=s3_path, index=False)

{'paths': ['s3://viamericas-datalake-dev-us-east-1-283731589572-analytics/ABTv2/ABTv2.csv'],
 'partitions_values': {}}