## Load Libraries

In [213]:
# %pip install pandera

In [217]:
import os
import numpy as np
import pandas as pd

os.getcwd()
import ftplib
import tempfile
import zipfile
from io import BytesIO
import paramiko
import pandera as pa
from pandera.typing.pandas import Index, DataFrame, Series
from tqdm.notebook import tqdm

from datetime import date, timedelta, datetime
import time
from typing import Tuple, Dict, List, Optional, Iterable, Callable, TypeVar, Any

from prefect import task, flow, get_run_logger # type: ignore

from utils import get_latest_zip
from glob import glob
from dotenv import load_dotenv

load_dotenv()

import warnings
warnings.simplefilter('ignore')

## Progress bar

In [219]:
from ipywidgets import IntProgress
for i in tqdm(range(10), desc="Downloading data", bar_format="{desc}: | {percentage:3.0f}%"):
    time.sleep(0.5)

ModuleNotFoundError: No module named 'ipywidgets'

## Download data

In [None]:
def download_data(*, progress=None) -> List[str]:
    sftpHost = os.getenv('ftp_host')
    sftpPort = int(os.getenv('ftp_port'))
    uname = os.getenv('ftp_user')
    pwd = os.getenv('ftp_pass')

    current_date = datetime.now().strftime('%Y%m%d')

    # ---- PARAMIKO CLIENT SETUP (replaces pysftp.CnOpts) ----
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())  # equivalent to cnopts.hostkeys=None

    # ---- CONNECT ----
    client.connect(
        hostname=sftpHost,
        port=sftpPort,
        username=uname,
        password=pwd,
        allow_agent=False,
        look_for_keys=False,
    )

    sftp = client.open_sftp()
    print("Connected to SFTP Server!!!")

    # ---- DELETE LOCAL Vilbev FILES ----
    for filename in os.listdir('.'):
        if filename.startswith('Vilbev-') and filename.endswith('.zip'):
            try:
                os.remove(filename)
                print(f'Deleted existing file: {filename}')
            except Exception as e:
                print(f'Error deleting {filename}: {e}')

    # ---- REMOTE & LOCAL PATHS ----
    remote_file = f"/home/viljoenbev/Vilbev-{current_date}.zip"
    local_file = f"./data/Vilbev-{current_date}.zip"

    # ---- DOWNLOAD ----
    try:
        sftp.get(
            remotepath=remote_file,
            localpath=local_file,
            callback=None  # optionally add progress callback
        )
        print(f'Download is Complete!!! File saved as {local_file}')

    except FileNotFoundError:
        print(f"‚ùå Remote file not found: {remote_file}")
    except Exception as e:
        print(f"‚ùå Error downloading file: {e}")

    # ---- CLEAN UP ----
    sftp.close()
    client.close()

In [None]:
for i in tqdm(range(1)):
    download_data()
    time.sleep(1)


[A

[A[A

Downloading Data:   0%|          | 0/100 [00:00<?, ?it/s]

Connected to SFTP Server!!!
Download is Complete!!! File saved as ./data/Vilbev-20260129.zip




100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1/1 [00:01<00:00,  1.72s/it]


## Unzip file

In [None]:
def extract_data() -> pd.DataFrame:
    """
    Extract the first CSV file from a ZIP archive and load it into a pandas DataFrame.
    Handles:
    - file existence checks
    - multiple CSV files (selects first match)
    - safe extraction into a temp folder
    - consistent return behavior
    """
    zip_file_path = get_latest_zip(os.getenv('BASE_DIR'))

    if not os.path.exists(zip_file_path):
        raise FileNotFoundError(f"‚ùå ZIP file does not exist: {zip_file_path}")

    print(f"üì¶ Reading ZIP archive: {zip_file_path}")

    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:

        # List all files
        file_list = zip_ref.namelist()
        print("üìÅ Files inside ZIP:", file_list)

        # find CSV file(s)
        csv_files = [f for f in file_list if f.lower().endswith(".csv")]

        if not csv_files:
            raise ValueError("‚ùå No CSV file found inside ZIP.")

        # Use the first CSV file found
        csv_file_name = csv_files[0]
        print(f"üìÑ Found CSV file: {csv_file_name}")

        # Ensure extraction directory exists
        extract_dir = "data"
        os.makedirs(extract_dir, exist_ok=True)

        # Extract file (optional but useful for debugging)
        extracted_path = zip_ref.extract(csv_file_name, path=extract_dir)
        print(f"üì§ Extracted to: {extracted_path}")

        # Load CSV into pandas directly from ZIP
        with zip_ref.open(csv_file_name) as csv_file:
            try:
                df = pd.read_csv(csv_file)
                print(f"‚úÖ Loaded CSV: {csv_file_name}")
            except Exception as e:
                raise ValueError(f"‚ùå Failed to read CSV inside ZIP: {e}")

    return df

In [None]:
raw = extract_data()

## Transform data

In [None]:
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Function to transform Viljoen Beverages data

    Args:
        df: Input dataframe to transform
        returns: Transformed dataframe

    """

    # Standard column layout
    columns=[
        'SellerID','GUID','Date','Reference','Customer_Code','Name','Physical_Address1',\
        'Physical_Address2','Physical_Address3','Physical_Address4','Telephone',\
        'Stock_Code','Description','Price_Ex_Vat','Quantity','RepCode','ProductBarCodeID'
        ]
    # Create an empty dataframe
    df1=pd.DataFrame(columns=columns)

    # Build the dataframe
    df1['Date']=df['Date']
    df1['SellerID']='VILJOEN'
    df1['GUID']=0
    df1['Reference']=df['Reference']
    df1['Customer_Code']=df['Customer code']
    df1['Name']=df['Customer name']
    df1['Physical_Address1']=df['Physical_Address1']
    df1['Physical_Address2']=df['Physical_Address2']
    df1['Physical_Address3']=df['Physical_Address3']
    df1['Physical_Address4']=(
        df['Deliver1'].fillna('').astype(str) +' '+
        df['Deliver2'].fillna('').astype(str) +' '+
        df['Deliver3'].fillna('').astype(str) +' '+
        df['Deliver4'].fillna('').astype(str)
        ).str.strip()

    df1['Telephone']=df['Telephone']
    df1['Stock_Code']=df['Product code']
    df1['Description']=df['Product description']
    df1['Price_Ex_Vat']=round(abs(df['Value']/df['Quantity']),2)
    df1['Quantity']=df['Quantity']
    df1['RepCode']=df['Rep']
    df1['ProductBarCodeID']=''

    print("‚úÖ DATA TRANSFORMATION IN PROGRESS!!")
    print(f"‚úÖ Total quantity: {np.sum(df1['Quantity']):.0f}")

    df2=df1.copy()
    df2['Date']=pd.to_datetime(df2['Date'])
    df2['Date']=df2['Date'].apply(lambda x: x.strftime("%Y-%m-%d"))

    #   INTELLIGENT NAME BACKFILLING
    # -----------------------------------

    print("üß† Handling missing buyer names intelligently..........")
    # 1) Use Customer_Code as primary matching key
    # -----------------------------
    df1['Name'] = df1.groupby('Customer_Code')['Name'].transform(
        lambda x: x.fillna(x.mode().iloc[0]) if x.mode().size > 0 else x
    )
    # 2) Use Address fields as secondary matching key
    # -----------------------------
    df1['Name'] = df1.groupby(
        ['Physical_Address1', 'Physical_Address2', 'Physical_Address3', 'Physical_Address4']
    )['Name'].transform(
        lambda x: x.fillna(x.mode().iloc[0]) if x.mode().size > 0 else x
    )
    # 3) Use telephone number as fallback
    # -----------------------------
    df1['Name'] = df1.groupby('Telephone')['Name'].transform(
        lambda x: x.fillna(x.mode().iloc[0]) if x.mode().size > 0 else x
    )
    # 4) Global fallback (only for final unresolved missing names)
    # -----------------------------
    df1['Name'].fillna('SPAR NORTH RAND (11691)', inplace=True)
    print("‚úÖ Missing buyer names fixed.....")

    #   DATE FORMAT CLEANING
    # -----------------------------
    print("‚úÖ Date fomat cleaned.........")
    df1['Date'] = pd.to_datetime(df1['Date'], errors="coerce").dt.strftime("%Y-%m-%d")
    print("‚úÖ Data transformation complete!")

    return df1

In [None]:
df = transform_data(raw)

In [None]:
df

## Validate data

In [None]:
def validate_data(df: pd.DataFrame):
    """
    Function to validate data
    """
    # logger = get_run_logger()
    class Schema(pa.DataFrameModel):
        # 1. Check data types and uniqueness
        SellerID: Series[str] = pa.Field(nullable=False)  # seller IDs must be non-null
        GUID: Series[int] = pa.Field(ge=0, nullable=False)  # must be non-null

        # 2. Dates coerced to proper datetime
        Date: Series[pd.Timestamp] = pa.Field(coerce=False, nullable=False) # must be non-null

        # 3. Reference and customer codes
        Reference: Series[str] = pa.Field(nullable=False) # must be non-null
        Customer_Code: Series[str] = pa.Field(str_matches=r"^[A-Z0-9]+$", nullable=False)  # must be non-null

        # 4. Customer details
        Name: Series[str] = pa.Field(nullable=False) # must be non-null
        Physical_Address1: Series[str] = pa.Field(nullable=True)
        Physical_Address2: Series[str] = pa.Field(nullable=True)
        Physical_Address3: Series[str] = pa.Field(nullable=True)
        Physical_Address4: Series[str] = pa.Field(nullable=True)

        # 5. Telephone validation (basic regex for digits, spaces, +, -)
        Telephone: Series[str] = pa.Field(nullable=True)

        # 6. Product details
        Stock_Code: Series[str] = pa.Field(nullable=False) # must be non-null
        Description: Series[str] = pa.Field(nullable=False) # must be non-null
        Price_Ex_Vat: Series[float] = pa.Field(ge=0.0, nullable=False)  # must be non-null
        Quantity: Series[int] = pa.Field(nullable=False)  # must be non-null

        # 7. Rep and barcode
        RepCode: Series[str] = pa.Field(nullable=True)
        ProductBarCodeID: Series[str] = pa.Field(nullable=True)  # typical EAN/UPC

        class Config:
            strict = True  # enforce exact schema
            coerce = True  # auto-convert types where possible

    try:
        # lazy=True means "find all errors before crashing"
        Schema.validate(df, lazy=True)
        print("‚úÖ Data passed validation! Proceeding to ETL...")

    except pa.errors.SchemaErrors as err:
        print("‚ö†Ô∏è Data Contract Breached!.......\n")
        print(f"‚ùå Total errors found: {len(err.failure_cases)}")

        # Let's look at the specific failures
        print("\n*********‚ö†Ô∏èFailure Report‚ö†Ô∏è************\n")
        print(err.failure_cases[['column', 'check', 'failure_case']])

In [None]:
validate_data(df)