In [1]:
from Utils.DataLoader import DataLoader
from Utils.DuckDb import DuckDb
from datetime import datetime
import pandas as pd
from typing import List

In [2]:
DUCK_DB_UTILS = DuckDb()

In [3]:
MININUM_USERS_TO_CONSIDER = 100_000
BATCH_SIZE_TO_GET_DATA_FROM_DATABASE = 20_000

In [4]:
def get_user_list(limit: int = 9999999999, offset: int = 0) -> List[str]:
    conn = DUCK_DB_UTILS.get_connection()
    query = '''
        SELECT DISTINCT(msno)
        FROM main.user_logs ul
        ORDER BY msno
        LIMIT ?
        OFFSET ?
    '''
    query_results = conn.execute(query, [limit, offset]).fetchall()
    result = list(
        map(
            lambda qr: qr[0], query_results
        )
    )
    return result

In [5]:
def get_dataset_by_users(msnos: List[str]) -> pd.DataFrame:
    query = '''
        SELECT
            ----------------------
            -- Calculated fields --
            ----------------------
            50 + (0.0051 * num_unq) + (0.0001 * ul.total_secs) AS cost,
            --	t.actual_amount_paid - cost AS net_profit,
            ----------------------
            -- User Logs fields --
            ----------------------
            ul.msno,
            ul.safra,
            ul.num_25,
            ul.num_50,
            ul.num_75,
            ul.num_985,
            ul.num_100,
            ul.num_unq,
            ul.total_secs,
            ul.total_hours,
            -------------------------
            -- Transactions fields --
            -------------------------
            t.msno,
            t.payment_method_id,
            t.payment_plan_days,
            t.plan_list_price,
            t.actual_amount_paid,
            t.is_auto_renew,
            t.is_cancel,
            t.safra,
            t.transaction_date_year,
            t.transaction_date_month,
            t.transaction_date_day,
            t.transaction_date_day_of_week,
            t.transaction_date_day_of_year,
            t.membership_expire_date_year,
            t.membership_expire_date_month,
            t.membership_expire_date_day,
            t.membership_expire_date_day_of_week,
            t.membership_expire_date_day_of_year,
            t.discount,
            t.price_per_month,
            ---------------------
            -- Members columns --
            ---------------------
            m.msno,
            m.safra,
            m.city,
            m.registered_via,
            m.is_active,
            m.registration_init_time_year,
            m.registration_init_time_month,
            m.registration_init_time_day,
            m.registration_init_time_day_of_week,
            m.registration_init_time_day_of_year
        FROM
            main.user_logs ul
        INNER JOIN
            main.transactions t ON
            t.msno == ul.msno
            AND t.safra == ul.safra
        INNER JOIN
            main.members m ON
            m.msno = ul.msno AND m.safra = ul.safra
        WHERE
            ul.msno IN ?
            AND
            t.is_cancel = False
        ORDER BY
            ul.msno, ul.safra
    '''

    conn = DUCK_DB_UTILS.get_connection()
    query_results = conn.execute(query, (msnos,)).fetch_df()
    return query_results

In [6]:
def upload_treated_dataframe_to_duck_db(df: pd.DataFrame):
    conn = DUCK_DB_UTILS.get_connection()
    datetime_string_identifier = datetime.now().strftime('%Y_%m_%d_%Hh%mm')
    table_name = 'treated_dataset_' + datetime_string_identifier
    temp_table = 'temp_' + datetime_string_identifier

    conn.register(temp_table, df)
    conn.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM {temp_table}")

    print(f'Inseridos registros na tabela {table_name}')


In [7]:
users_msno = get_user_list(limit=10, offset=0)

df = get_dataset_by_users(users_msno)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [8]:
def get_dataset() -> pd.DataFrame:
    all_dfs: List[pd.DataFrame] = []

    count = 0
    while count < MININUM_USERS_TO_CONSIDER:
        print(f'Processando count: {count}')

        users_msno = get_user_list(limit=BATCH_SIZE_TO_GET_DATA_FROM_DATABASE, offset=count)
        count += BATCH_SIZE_TO_GET_DATA_FROM_DATABASE

        all_dfs.append(
            get_dataset_by_users(users_msno)
        )

    print(f'Qtd. de dataframes: {len(all_dfs)}')

    all_dfs = list(
        filter(
            lambda df: df.__len__() > 0, all_dfs
        )
    )

    print(f'Qtd. de dataframes pós remoção dos vazios: {len(all_dfs)}')

    result = pd.concat(all_dfs)
    return result

In [9]:
full_dataframe = get_dataset()

Processando count: 0


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processando count: 20000


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processando count: 40000


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processando count: 60000


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processando count: 80000


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Qtd. de dataframes: 5
Qtd. de dataframes pós remoção dos vazios: 5


In [10]:
def get_next_safras(safra: int, month_qty: int) -> int:
    str_safra = str(safra)
    year, month = int(str_safra[:4]), int(str_safra[4:])

    month -= 1

    month += month_qty

    year += month // 12
    month = month % 12

    month += 1

    formatted_month = f'0{month}' if month < 10 else (month)
    return int(f'{year}{formatted_month}')


In [14]:
def calc_past_months_columns(df: pd.DataFrame, users_msno: List[str], cols: List[str]) -> pd.DataFrame:
    df_by_users = {}

    print(f'Separando DataFrames por usuários')
    for index, usr in enumerate(users_msno):
        if index % 1000 == 0:
            print(f'-> {index} / {len(users_msno)}')
            
        df_by_users[usr] = df[df['msno'] == usr]


    def process_user_row(user_df: pd.DataFrame, row: pd.Series) -> pd.DataFrame:
        current_safra = row['safra']

        safras_to_consider = [
            # -5, # previous safras
            # -4,
            # -3,
            -2,
            -1,

            +1, # next safra
        ]

        if not all_safras_exist(user_df, current_safra, safras_to_consider):
            # print(f'Linha sem safras {safras_to_consider}')
            return row
        
        for col in cols:
            for safra_modifier in safras_to_consider:
                safra = get_next_safras(current_safra, safra_modifier)
                safra_row = user_df[user_df['safra'] == safra].reset_index()
                title = f'{col}{safra_modifier if safra_modifier < 0 else f"+{safra_modifier}"}M'
                row[title] = safra_row[col][0]

        # print(f'Linha {row} ajustada')
        return row


    def all_safras_exist(user_df: pd.DataFrame, current_safra: int, safras_to_consider: List[int]) -> bool:
        for safra_modifier in safras_to_consider:
            next_safra = get_next_safras(current_safra, safra_modifier)
            filtered = user_df[user_df['safra'] == next_safra]

            if len(filtered) == 0:
                return False
            
        return True


    rows = []
    users_qty = len(df_by_users.values())
    count = 0
    for msno, user_df in df_by_users.items():
        # print(f'Processando usuário {msno}')

        count += 1
        print(f'Processando usuário {count}/{users_qty} ({msno})')

        for _, user_row in user_df.iterrows():
            user_row = process_user_row(user_df, user_row)
            rows.append(user_row)
            # print(user_row)

    result = pd.DataFrame(rows)
    return result

In [15]:
users_msno = list(full_dataframe['msno'].unique())

In [16]:
treated_df = calc_past_months_columns(
    full_dataframe,
    users_msno,
    cols=['cost', 'num_unq', 'total_secs'])

Separando DataFrames por usuários
-> 0 / 24998
-> 1000 / 24998
-> 2000 / 24998
-> 3000 / 24998
-> 4000 / 24998
-> 5000 / 24998
-> 6000 / 24998
-> 7000 / 24998
-> 8000 / 24998
-> 9000 / 24998
-> 10000 / 24998
-> 11000 / 24998
-> 12000 / 24998
-> 13000 / 24998
-> 14000 / 24998
-> 15000 / 24998
-> 16000 / 24998
-> 17000 / 24998
-> 18000 / 24998
-> 19000 / 24998
-> 20000 / 24998
-> 21000 / 24998
-> 22000 / 24998
-> 23000 / 24998
-> 24000 / 24998
Processando usuário 1/24998 (+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=)
Processando usuário 2/24998 (+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=)
Processando usuário 3/24998 (+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=)
Processando usuário 4/24998 (+++snpr7pmobhLKUgSHTv/mpkqgBT0tQJ0zQj6qKrqc=)
Processando usuário 5/24998 (++/9R3sX37CjxbY/AaGvbwr3QkwElKBCtSvVzhCBDOk=)
Processando usuário 6/24998 (++/UDNo9DLrxT8QVGiDi1OnWfczAdEwThaVyD0fXO50=)
Processando usuário 7/24998 (++/ZHqwUNa7U21Qz+zqteiXlZapxey86l6eEorrak/g=)
Processando usuário 8/24998 (

In [17]:
treated_df.head()

Unnamed: 0,cost,msno,safra,num_25,num_50,num_75,num_985,num_100,num_unq,total_secs,...,registration_init_time_day_of_year,cost-2M,cost-1M,cost+1M,num_unq-2M,num_unq-1M,num_unq+1M,total_secs-2M,total_secs-1M,total_secs+1M
0,55.9125,+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=,201609,60,12,14,7,171,179,49996,...,87,,,,,,,,,
1,67.3556,+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=,201611,128,71,50,101,470,488,148668,...,159,,,,,,,,,
2,80.6741,+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=,201612,193,88,98,142,855,979,256812,...,159,,,,,,,,,
3,82.2167,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,201601,63,21,20,16,1050,989,271728,...,322,,,,,,,,,
4,64.6709,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,201607,72,17,17,17,459,513,120546,...,322,,,,,,,,,


In [20]:
debug_df = treated_df[treated_df['msno'] == '+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=']
# debug_df[['safra', 'msno', 'cost', 'cost-1M', 'cost+1M']]
# debug_df[['safra', 'msno', 'num_unq-2M', 'num_unq-1M', 'num_unq', 'num_unq+1M']]
debug_df[['safra', 'msno', 'total_secs-2M', 'total_secs-1M', 'total_secs', 'total_secs+1M', 'num_unq-2M', 'num_unq-1M', 'num_unq', 'num_unq+1M']]

Unnamed: 0,safra,msno,total_secs-2M,total_secs-1M,total_secs,total_secs+1M,num_unq-2M,num_unq-1M,num_unq,num_unq+1M
3,201601,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,,,271728,,,,989,
4,201607,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,,,120546,,,,513,
5,201608,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,,,185178,,,,747,
6,201609,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,120546.0,185178.0,173410,190726.0,513.0,747.0,707,791.0
7,201610,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,185178.0,173410.0,190726,133337.0,747.0,707.0,791,545.0
8,201611,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,173410.0,190726.0,133337,154978.0,707.0,791.0,545,676.0
9,201612,+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=,,,154978,,,,676,


In [21]:
treated_df.__len__()

151620

In [22]:
treated_df['msno'].unique()

array(['+++FOrTS7ab3tIgIh8eWwX4FqRv8w/FoiOuyXsFvphY=',
       '+++hVY1rZox/33YtvDgmKA2Frg/2qhkz12B9ylCvh8o=',
       '+++l/EXNMLTijfLBa8p2TUVVVp2aFGSuUI/h7mLmthw=', ...,
       '/C5D00PhkQD8J4p+UA78qdqdzw6HydQXEatUaVXZRyw=',
       '/C5TDKmiZPSxEPeBpQAZBvPKpq6qQ34gQikiwl25ypw=',
       '/C5czCZJNpjposurNQc9fD+xwgUfTU8vhIC3SRGurRM='], dtype=object)

In [23]:
user = 'hCxil8JInsp8LQflkJSBx8XaY18EPgctARico3NVxAs='

debug_df = treated_df[treated_df['msno'] == user]
debug_df[['safra', 'msno', 'cost', 'cost-1M', 'cost+1M']]

Unnamed: 0,safra,msno,cost,cost-1M,cost+1M


In [24]:
upload_treated_dataframe_to_duck_db(treated_df)

Inseridos registros na tabela treated_dataset_2025_03_11_08h03m
