In [78]:
# Import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from collections import defaultdict

In [79]:
# Fetch data from Cloud Storage
from google.cloud import storage

client = storage.Client()
eod_balance = pd.read_csv(
    "gs://berkabank/production/data/02_elaboration/eod_balance.csv"
)

In [80]:
eod_balance.head()

Unnamed: 0,account_id,balance_date,end_of_day_balance,daily_amount_flow,account_creation_date
0,576,1993-01-01,900.0,900.0,1993-01-01
1,704,1993-01-01,1000.0,1000.0,1993-01-01
2,2378,1993-01-01,700.0,700.0,1993-01-01
3,3818,1993-01-01,600.0,600.0,1993-01-01
4,576,1993-01-02,900.0,0.0,1993-01-01


In [81]:
import os

PIPELINE_NAME = "production"
process_eod_balance_component_args = {
    "column_mapping": {
        "balance_date": "balance_date",
        "account_creation_date": "account_creation_date",
        "account_id": "account_id",
        "daily_amount_flow": "daily_amount_flow",
        "n_transactions": "n_transactions",
        "is_primary": "is_primary",
        "end_of_day_balance": "end_of_day_balance",
        "low_balance_flag": "low_balance_flag",
        "streak_id": "streak_id",
        "low_balance_streak": "low_balance_streak",
        "target": "target",
    },
    "input_bucket_path": f"gs://{os.environ.get('BUCKET_NAME')}/{PIPELINE_NAME}/data/02_elaboration/",
    "output_bucket_path": f"gs://{os.environ.get('BUCKET_NAME')}/{PIPELINE_NAME}/data/03_primary/",
    "input_files": ["eod_balance"],
    "output_files": ["eod_balance_preprocessed"],
    "transaction_usage_flag": 50,
    "seniority_account_flag": 30,
    "target_balance": 20000,
    "incident_duration_days": 20,
    "compute_target": True,
}

In [82]:
from dataclasses import dataclass
import pandas as pd
import numpy as np
from typing import List, Optional, Dict
from pydantic import BaseModel


@dataclass
class EODBalancePreprocessing:
    transaction_usage_flag: int
    seniority_account_flag: int
    target_balance: int
    incident_duration_days: int
    eod_balance: pd.DataFrame
    column_mapping: Dict
    compute_target: bool

    def __post_init__(self):
        # Convert the columns to the correct data types
        self.eod_balance[self.column_mapping["balance_date"]] = pd.to_datetime(
            self.eod_balance[self.column_mapping["balance_date"]]
        )
        self.eod_balance[self.column_mapping["account_creation_date"]] = pd.to_datetime(
            self.eod_balance[self.column_mapping["account_creation_date"]]
        )

        # Sort the DataFrame
        self.eod_balance = self.eod_balance.sort_values(
            [self.column_mapping["account_id"], self.column_mapping["balance_date"]]
        )

    def compute_n_transactions(self) -> pd.DataFrame:
        # Compute the number of cumulative transactions over the period for non-zero transactions
        non_zero_transactions = self.eod_balance[
            self.eod_balance[self.column_mapping["daily_amount_flow"]] != 0
        ]
        non_zero_transactions[self.column_mapping["n_transactions"]] = (
            non_zero_transactions.groupby(self.column_mapping["account_id"]).cumcount()
        )

        # Join with the original DataFrame and forward fill to maintain the count on days with 0 transactions
        self.eod_balance = self.eod_balance.join(
            non_zero_transactions[[self.column_mapping["n_transactions"]]],
            rsuffix="_non_zero",
        )
        self.eod_balance[self.column_mapping["n_transactions"]] = (
            self.eod_balance[self.column_mapping["n_transactions"]]
            .fillna(method="ffill")
            .fillna(0)
        )

        return self.eod_balance

    def calculate_seniority(self) -> pd.DataFrame:

        self.eod_balance = self.compute_n_transactions()

        # Calculate seniority as the number of days since account creation
        self.eod_balance["days_since_account_creation"] = (
            self.eod_balance[self.column_mapping["balance_date"]]
            - self.eod_balance[self.column_mapping["account_creation_date"]]
        ).dt.days
        return self.eod_balance

    def calculate_primary_flag(self) -> pd.DataFrame:
        # Each client is non primary by default, we will update this value by the time when requirements are met
        self.eod_balance[self.column_mapping["is_primary"]] = False
        self.eod_balance.loc[
            (
                self.eod_balance[self.column_mapping["n_transactions"]]
                >= self.transaction_usage_flag
            )
            & (
                self.eod_balance["days_since_account_creation"]
                >= self.seniority_account_flag
            ),
            self.column_mapping["is_primary"],
        ] = True
        return self.eod_balance

    def calculate_balance_incidents(self) -> pd.DataFrame:
        # Sort the DataFrame
        self.eod_balance = self.eod_balance.sort_values(
            [self.column_mapping["account_id"], self.column_mapping["balance_date"]]
        )
        # Create 'low_balance_flag'
        self.eod_balance[self.column_mapping["low_balance_flag"]] = (
            self.eod_balance[self.column_mapping["end_of_day_balance"]]
            < self.target_balance
        )
        # Create 'streak_id'
        self.eod_balance[self.column_mapping["streak_id"]] = (
            self.eod_balance[self.column_mapping["low_balance_flag"]]
            != self.eod_balance.groupby(self.column_mapping["account_id"])[
                self.column_mapping["low_balance_flag"]
            ].shift()
        ).cumsum()
        # Create 'low_balance_streak'
        self.eod_balance[self.column_mapping["low_balance_streak"]] = (
            self.eod_balance.groupby(
                [self.column_mapping["account_id"], self.column_mapping["streak_id"]]
            )[self.column_mapping["low_balance_flag"]].cumsum()
        )
        return self.eod_balance

    def calculate_target(self) -> pd.DataFrame:
        self.eod_balance[self.column_mapping["target"]] = (
            self.eod_balance[self.column_mapping["low_balance_streak"]]
            >= self.incident_duration_days
        ) & (self.eod_balance[self.column_mapping["is_primary"]])
        return self.eod_balance

    def run(self) -> pd.DataFrame:
        self.calculate_seniority()
        self.calculate_primary_flag()
        self.calculate_balance_incidents()
        if self.compute_target:
            self.calculate_target()
        return self.eod_balance

In [83]:
print(process_eod_balance_component_args)

eod_balance.head()

{'column_mapping': {'balance_date': 'balance_date', 'account_creation_date': 'account_creation_date', 'account_id': 'account_id', 'daily_amount_flow': 'daily_amount_flow', 'n_transactions': 'n_transactions', 'is_primary': 'is_primary', 'end_of_day_balance': 'end_of_day_balance', 'low_balance_flag': 'low_balance_flag', 'streak_id': 'streak_id', 'low_balance_streak': 'low_balance_streak', 'target': 'target'}, 'input_bucket_path': 'gs://berkabank/production/data/02_elaboration/', 'output_bucket_path': 'gs://berkabank/production/data/03_primary/', 'input_files': ['eod_balance'], 'output_files': ['eod_balance_preprocessed'], 'transaction_usage_flag': 50, 'seniority_account_flag': 30, 'target_balance': 20000, 'incident_duration_days': 20, 'compute_target': True}


Unnamed: 0,account_id,balance_date,end_of_day_balance,daily_amount_flow,account_creation_date
0,576,1993-01-01,900.0,900.0,1993-01-01
1,704,1993-01-01,1000.0,1000.0,1993-01-01
2,2378,1993-01-01,700.0,700.0,1993-01-01
3,3818,1993-01-01,600.0,600.0,1993-01-01
4,576,1993-01-02,900.0,0.0,1993-01-01


In [84]:
transaction_usage_flag = process_eod_balance_component_args["transaction_usage_flag"]
seniority_account_flag = process_eod_balance_component_args["seniority_account_flag"]
target_balance = process_eod_balance_component_args["target_balance"]
incident_duration_days = process_eod_balance_component_args["incident_duration_days"]
column_mapping = process_eod_balance_component_args["column_mapping"]
compute_target = process_eod_balance_component_args["compute_target"]

In [85]:
# Process end-of-day balance
eod_balance_processor = EODBalancePreprocessing(
    transaction_usage_flag=transaction_usage_flag,
    seniority_account_flag=seniority_account_flag,
    target_balance=target_balance,
    incident_duration_days=incident_duration_days,
    compute_target=True,
    eod_balance=eod_balance,
    column_mapping=column_mapping,
)
eod_balance_preprocessed = eod_balance_processor.run()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  non_zero_transactions[self.column_mapping["n_transactions"]] = (
  self.eod_balance[self.column_mapping["n_transactions"]]


In [86]:
eod_balance_preprocessed.to_csv("gs://berkabank/production/data/03_primary/eod_balance_preprocessed.csv",index=False)