In [1]:
from interchange.persistence.file import FileStorage
from interchange.visa import transform, extract, clean, calculate, interchange, store
layer = FileStorage.Layer

In [2]:
client_id = "SBSA"
file_id = "B6781ADDCFE0CD800BFA2968A6ED2816"

In [3]:
origin_layer = layer.STAGING
target_layer = layer.STAGING
client_id = client_id
file_id = file_id
origin_subdir = "200-SMS_EXT_MESSAGES"
target_subdir = "300-SMS_CLN_MESSAGES"

In [4]:
print(origin_layer,target_layer,client_id,file_id,origin_subdir,target_subdir)

staging staging SBSA B6781ADDCFE0CD800BFA2968A6ED2816 200-SMS_EXT_MESSAGES 300-SMS_CLN_MESSAGES


In [5]:
from datetime import datetime

import pandas as pd

from interchange.logs.logger import Logger
from interchange.persistence.database import Database
from interchange.persistence.file import FileStorage


log = Logger(__name__)
fs = FileStorage()


In [6]:
def _load_visa_field_definitions(type_record: str, sort_by: list[str]) -> pd.DataFrame:
    """
    Return a dataframe of Visa field definitions ordered by specific fields.
    """
    db = Database()
    fd = db.read_records(
        table_name="visa_fields",
        fields=[
            "type_record",
            "column_name",
            "column_type",
            "float_decimals",
            "date_format",
        ],
        where={"type_record": type_record},
    )
    int_cols = ["float_decimals"]
    fd[int_cols] = fd[int_cols].apply(
        pd.to_numeric, downcast="integer", errors="coerce"
    )
    return fd.sort_values(sort_by, ascending=True)


def _retrieve_file_date(
    client_id: str,
    file_id: str,
) -> str:
    """
    Retrieve a file's processing date in 'YYYY-MM-DD' string format.
    """
    db = Database()
    file_date = db.read_records(
        table_name="file_control",
        fields=[
            "brand_id",
            "file_type",
            "file_processing_date",
            "landing_file_name",
        ],
        where={
            "client_id": client_id,
            "file_id": file_id,
        },
    ).iloc[0]
    return file_date["file_processing_date"]


def _parse_dates(date_series: pd.Series, date_format: str, file_date: str) -> pd.Series:
    """
    Parse a series of formatted string dates into datetime objects.
    """
    FILE_DATE_FORMAT = "%Y-%m-%d"
    reference_date = datetime.strptime(file_date, FILE_DATE_FORMAT)
    match date_format:
        case date_format if date_format.startswith("%"):
            result = pd.to_datetime(date_series, format=date_format, errors="coerce")
        case "!MMDD":
            TARGET_FORMAT = "%Y%m%d"
            pre = str(reference_date.year) + date_series
            pre = pd.to_datetime(pre, format=TARGET_FORMAT, errors="coerce")
            pre.loc[pre > reference_date] = pre.loc[
                pre > reference_date
            ] - pd.DateOffset(years=1)  # type: ignore
            result = pre
        case "!YDDD":
            TARGET_FORMAT = "%y%j"
            pre = str(reference_date.year)[2] + date_series
            pre = pd.to_datetime(pre, format=TARGET_FORMAT, errors="coerce")
            pre.loc[pre > reference_date] = pre.loc[
                pre > reference_date
            ] - pd.DateOffset(years=10)  # type: ignore
            result = pre
        case _:
            raise NotImplementedError
    return result


def _clean_field_values(
    field_series: pd.Series, field_defs: pd.DataFrame, file_date: str
) -> pd.Series:
    """
    Perform data cleaning on a series of values depending on its target data type.
    """
    name = field_series.name
    definition = field_defs[field_defs["column_name"] == name].iloc[0]
    match definition["column_type"]:
        case "str":
            result = field_series.str.strip().replace("", " ")
        case "int":
            pre = field_series.str.strip()
            result = pd.to_numeric(pre, errors="coerce").astype("Int64")
        case "float":
            mapping = {
                "}": "0",
                "{": "0",
                "A": "1",
                "B": "2",
                "C": "3",
                "D": "4",
                "E": "5",
                "F": "6",
                "G": "7",
                "H": "8",
                "I": "9",
                "J": "1",
                "K": "2",
                "L": "3",
                "M": "4",
                "N": "5",
                "O": "6",
                "P": "7",
                "Q": "8",
                "R": "9",
            }
            float_decimals = definition["float_decimals"]
            if not float_decimals > 0:
                raise ValueError
            field_series = field_series.replace(mapping, regex=True)
            pre = field_series.str.strip()
            result = pd.to_numeric(pre, errors="coerce") / (10**float_decimals)
        case "date":
            date_format = definition["date_format"]
            if not date_format:
                raise ValueError
            pre = field_series.str.strip()
            result = _parse_dates(pre, date_format, file_date)
        case _:
            raise NotImplementedError
    return result

In [7]:
field_defs = _load_visa_field_definitions("sms", sort_by=[])
print(field_defs)

2025-11-26 12:34:04,263 :: PID 26580 :: TID 12004 :: database._create_connection :: Line 33 :: DEBUG :: Connected to SQLite database
2025-11-26 12:34:04,265 :: PID 26580 :: TID 12004 :: database.read_records :: Line 138 :: DEBUG :: Attempting to execute SELECT SQL statement
2025-11-26 12:34:04,267 :: PID 26580 :: TID 12004 :: database._execute :: Line 57 :: DEBUG :: SQL statement executed successfully
2025-11-26 12:34:04,273 :: PID 26580 :: TID 12004 :: database._close_connection :: Line 44 :: DEBUG :: Closed connection to SQLite database


    type_record                                        column_name  \
0           sms                             online_settlement_date   
1           sms                          issuer_acquirer_indicator   
2           sms                                   local_draft_date   
3           sms                                 pos_condition_code   
4           sms                                       message_type   
..          ...                                                ...   
124         sms  optional_issuer_isa_amount_in_settlement_currency   
125         sms                      merchant_volume_indicator_sms   
126         sms                                  dcc_indicator_sms   
127         sms                          fee_program_indicator_sms   
128         sms                   recurring_payment_indicator_flag   

    column_type  float_decimals date_format  
0          date             NaN      %y%m%d  
1           str             NaN              
2          date      

In [8]:
file_date = _retrieve_file_date(client_id, file_id)
print(file_date)

2025-11-26 12:34:04,290 :: PID 26580 :: TID 12004 :: database._create_connection :: Line 33 :: DEBUG :: Connected to SQLite database
2025-11-26 12:34:04,291 :: PID 26580 :: TID 12004 :: database.read_records :: Line 138 :: DEBUG :: Attempting to execute SELECT SQL statement
2025-11-26 12:34:04,292 :: PID 26580 :: TID 12004 :: database._execute :: Line 57 :: DEBUG :: SQL statement executed successfully
2025-11-26 12:34:04,294 :: PID 26580 :: TID 12004 :: database._close_connection :: Line 44 :: DEBUG :: Closed connection to SQLite database


2025-11-18


In [9]:
data = fs.read_parquet(
        origin_layer,
        client_id,
        file_id,
        subdir=origin_subdir,
    )

2025-11-26 12:34:04,305 :: PID 26580 :: TID 12004 :: database._create_connection :: Line 33 :: DEBUG :: Connected to SQLite database
2025-11-26 12:34:04,306 :: PID 26580 :: TID 12004 :: database.read_records :: Line 138 :: DEBUG :: Attempting to execute SELECT SQL statement
2025-11-26 12:34:04,308 :: PID 26580 :: TID 12004 :: database._execute :: Line 57 :: DEBUG :: SQL statement executed successfully
2025-11-26 12:34:04,309 :: PID 26580 :: TID 12004 :: database._close_connection :: Line 44 :: DEBUG :: Closed connection to SQLite database


In [10]:
data

Unnamed: 0_level_0,issuer_acquirer_indicator,mvv_code,remote_terminal_indicator,charge_indicator,product_id_sms,business_application_identifier,account_funding_source,affiliate_id,settlement_date_sms,draft_identifier_sms,...,application_draft_counter,application_interchange_profile_sms,cryptogram_draft_type,terminal_country_code,terminal_draft_date,cryptogram_amount,cryptogram_currency_code,cryptogram_cashback_amount,issuer_discretionary_data,issuer_script_results_sms
record,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,A,,,,F,,D,4424630002,111825,385321443675101,...,00000,,00,000,000000,000000000000,000,000000000000,,
1,A,,,,F,,D,4424630002,111825,385321445950920,...,00000,,00,000,000000,000000000000,000,000000000000,,
2,A,,,,F,,D,7374180002,111825,385321456835123,...,00000,,00,000,000000,000000000000,000,000000000000,,
3,A,,,,F,,D,4028240002,111825,585321463117068,...,00233,,00,000,000000,000000000000,000,000000000000,,
4,A,,,,N,,D,4840020002,111825,385321464460423,...,00293,,00,000,000000,000000000000,000,000000000000,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
35969,A,,,,E,,D,4529010004,111825,345322343424710,...,00060,1800,01,710,251118,000000505000,710,000000000000,,
35970,A,,,,E,,D,4519700004,111825,355322351475509,...,00014,1800,01,710,251118,000000250000,710,000000000000,,
35971,A,,,,E,,D,4519700004,111825,585322351546438,...,00003,1800,01,710,251118,000000155000,710,000000000000,,
35972,A,,,,E,,D,4786890004,111825,385322370600256,...,,,,,,,,,,


In [11]:
fields = []
for _, field_series in data.items():
    clean_field = _clean_field_values(field_series, field_defs, file_date)
    fields.append(clean_field)
clean_df = pd.concat(fields, axis=1)

In [14]:
clean_df["draft_amount"]

record
0         2000.0
1         5000.0
2          850.0
3        10050.0
4         1000.0
          ...   
35969     5050.0
35970     2500.0
35971     1550.0
35972     1000.0
35973    10050.0
Name: draft_amount, Length: 35974, dtype: float64

In [13]:
fs.write_parquet(clean_df, target_layer, client_id, file_id, subdir=target_subdir)

2025-11-26 12:34:05,774 :: PID 26580 :: TID 12004 :: file.write_parquet :: Line 128 :: DEBUG :: Writing SBSA file B6781ADDCFE0CD800BFA2968A6ED2816 to parquet
2025-11-26 12:34:05,785 :: PID 26580 :: TID 12004 :: database._create_connection :: Line 33 :: DEBUG :: Connected to SQLite database
2025-11-26 12:34:05,787 :: PID 26580 :: TID 12004 :: database.read_records :: Line 138 :: DEBUG :: Attempting to execute SELECT SQL statement
2025-11-26 12:34:05,788 :: PID 26580 :: TID 12004 :: database._execute :: Line 57 :: DEBUG :: SQL statement executed successfully
2025-11-26 12:34:05,788 :: PID 26580 :: TID 12004 :: database._close_connection :: Line 44 :: DEBUG :: Closed connection to SQLite database
