In [1]:
import logging
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine.url import URL

# ───────────────────────────────────────────────────────────────────────────────
# STEP 0: USER CONFIGURATION — point at your Sepidar01 database
# ───────────────────────────────────────────────────────────────────────────────
SQL_SERVER_HOST     = "WENLINHNBB"   # ← e.g. "localhost\\SQLEXPRESS"
SQL_SERVER_PORT     = 1433           # ← usually 1433
SQL_SERVER_DATABASE = "Sepidar01"    # ← your database name
# ───────────────────────────────────────────────────────────────────────────────

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)


def get_sqlalchemy_engine():
    # Create a SQLAlchemy engine using Windows Integrated Authentication.
    if SQL_SERVER_HOST in ("", "YOUR_SQL_SERVER_HOST_HERE"):
        raise ValueError("Please set SQL_SERVER_HOST to your actual server/instance.")
    if SQL_SERVER_DATABASE in ("", "YOUR_DATABASE_NAME_HERE"):
        raise ValueError("Please set SQL_SERVER_DATABASE to your actual database name.")

    driver_params = {
        "driver": "ODBC Driver 17 for SQL Server",
        "Trusted_Connection": "yes"
    }
    logger.info("➡️ Using Windows Integrated Authentication (Trusted_Connection=yes).")

    connection_url = URL.create(
        "mssql+pyodbc",
        username=None,
        password=None,
        host=SQL_SERVER_HOST,
        port=int(SQL_SERVER_PORT),
        database=SQL_SERVER_DATABASE,
        query=driver_params,
    )
    try:
        engine = create_engine(connection_url, fast_executemany=True, echo=False)
        with engine.connect() as conn:
            version = conn.execute(text("SELECT @@VERSION")).scalar()
            logger.info(f"✔ Connected to database '{SQL_SERVER_DATABASE}'. "
                        f"Server version: {version.splitlines()[0]}")
        return engine
    except Exception as e:
        raise ConnectionError(f"❌ Could not create SQLAlchemy engine: {e}")


def extract_all_tables(engine):
    # Extract these seven tables from Sepidar01:
    #   • GNR.Party
    #   • SLS.Invoice
    #   • SLS.InvoiceItem
    #   • INV.Item
    #   • PAY.Personnel
    #   • INV.ItemStock
    #   • RPA.ReceiptHeader
    table_map = {
        "customer":    ("GNR", "Party"),
        "invoice":     ("SLS", "Invoice"),
        "invoiceitem": ("SLS", "InvoiceItem"),
        "product":     ("INV", "Item"),
        "employee":    ("PAY", "Personnel"),
        "inventory":   ("INV", "ItemStock"),
        "payment":     ("RPA", "ReceiptHeader")
    }

    dfs = {}
    for key, (schema, tbl) in table_map.items():
        fq_name = f"{schema}.[{tbl}]"
        logger.info(f"➡️ Extracting '{fq_name}' …")
        try:
            dfs[key] = pd.read_sql(f"SELECT * FROM {fq_name}", engine)
        except Exception as e:
            raise RuntimeError(f"❌ Failed to extract {fq_name}: {e}")
    logger.info("✔ Successfully extracted all seven tables from Sepidar01.")
    return dfs


def transform_sales_fact(dfs):
    # Build “SalesFact” by joining:
    #   SLS.InvoiceItem → SLS.Invoice → GNR.Party  ← INV.Item
    #
    # We pre‑rename any overlapping columns to avoid suffix collisions.

    # Aliases
    cust_df  = dfs["customer"]     # GNR.Party
    inv_df   = dfs["invoice"]      # SLS.Invoice
    items_df = dfs["invoiceitem"]  # SLS.InvoiceItem
    prod_df  = dfs["product"]      # INV.Item

    # 1) Pre‑rename overlapping columns in InvoiceItem → item_df2
    item_df2 = items_df.rename(columns={
        "Price":                   "LinePrice",
        "PriceInBaseCurrency":     "LinePriceInBase",
        "Discount":                "LineDiscount",
        "DiscountInBaseCurrency":  "LineDiscountInBase",
        "Addition":                "LineAddition",
        "AdditionInBaseCurrency":  "LineAdditionInBase",
        "Tax":                     "LineTax",
        "TaxInBaseCurrency":       "LineTaxInBase",
        "Duty":                    "LineDuty",
        "DutyInBaseCurrency":      "LineDutyInBase",
        "NetPriceInBaseCurrency":  "LineNetInBase"
    })

    # 2) Pre‑rename invoice‑level columns in inv_df → inv_subset
    inv_subset = inv_df[[
        "InvoiceId",                 
        "CustomerPartyRef",
        "Number",
        "Date",
        "Price",
        "PriceInBaseCurrency",
        "Discount",
        "DiscountInBaseCurrency",
        "Addition",
        "AdditionInBaseCurrency",
        "Tax",
        "TaxInBaseCurrency",
        "Duty",
        "DutyInBaseCurrency",
        "NetPriceInBaseCurrency"
    ]].rename(columns={
        "Number":                 "InvoiceNumber",
        "Price":                  "InvoicePrice",
        "PriceInBaseCurrency":    "InvoicePriceInBase",
        "Discount":               "InvoiceDiscount",
        "DiscountInBaseCurrency": "InvoiceDiscountInBase",
        "Addition":               "InvoiceAddition",
        "AdditionInBaseCurrency": "InvoiceAdditionInBase",
        "Tax":                    "InvoiceTax",
        "TaxInBaseCurrency":      "InvoiceTaxInBase",
        "Duty":                   "InvoiceDuty",
        "DutyInBaseCurrency":     "InvoiceDutyInBase",
        "NetPriceInBaseCurrency": "InvoiceNetInBase"
    })

    # 3) Merge InvoiceItem → Invoice on InvoiceRef = InvoiceId
    merged = item_df2.merge(
        inv_subset,
        how="left",
        left_on="InvoiceRef",
        right_on="InvoiceId",
        validate="many_to_one"
    )

    # 4) Pre‑rename customer columns into cust_subset
    # (GNR.Party does not have “Number” so we drop it here)
    cust_subset = cust_df[[
        "PartyId",        
        "Name",
        "LastName",
        "IsCustomer"
    ]].rename(columns={
        "Name":         "CustomerFirstName",
        "LastName":     "CustomerLastName",
        "IsCustomer":   "CustomerFlag"
    })

    # 5) Merge in Customer on CustomerPartyRef = PartyId
    merged = merged.merge(
        cust_subset,
        how="left",
        left_on="CustomerPartyRef",
        right_on="PartyId",
        validate="many_to_one"
    )

    # 6) Pre‑rename product columns into prod_subset
    prod_subset = prod_df[[
        "ItemID",            
        "Code",
        "Title",
        "UnitRef",
        "SecondaryUnitRef",
        "SaleUnitRef"
    ]].rename(columns={
        "Code":         "ProductCode",
        "Title":        "ProductTitle"
    })

    # 7) Merge in Product on ItemRef = ItemID
    merged = merged.merge(
        prod_subset,
        how="left",
        left_on="ItemRef",
        right_on="ItemID",
        validate="many_to_one"
    )

    # 8) Final SalesFact DataFrame is “merged”
    salesfact = merged.copy()

    # 9) Coerce numeric columns to numeric dtype if they exist
    numeric_cols = [
        "Quantity",              
        "LinePrice",
        "LinePriceInBase",
        "LineDiscount",
        "LineDiscountInBase",
        "LineAddition",
        "LineAdditionInBase",
        "LineTax",
        "LineTaxInBase",
        "LineDuty",
        "LineDutyInBase",
        "LineNetInBase",
        "InvoicePrice",
        "InvoicePriceInBase",
        "InvoiceDiscount",
        "InvoiceDiscountInBase",
        "InvoiceAddition",
        "InvoiceAdditionInBase",
        "InvoiceTax",
        "InvoiceTaxInBase",
        "InvoiceDuty",
        "InvoiceDutyInBase",
        "InvoiceNetInBase"
    ]
    for col in numeric_cols:
        if col in salesfact.columns:
            salesfact[col] = pd.to_numeric(salesfact[col], errors="coerce")

    logger.info(f"✔ SalesFact transformation complete (rows = {len(salesfact):,}).")
    return salesfact


def load_salesfact_to_sql(df, engine):
    # Write the SalesFact DataFrame into Sepidar01.dbo.SalesFact (replace if exists).
    # We remove method="multi" to avoid the double‑parenthesis syntax error.
    try:
        logger.info("➡️ Loading SalesFact into dbo.SalesFact (if_exists='replace')…")
        df.to_sql(
            name="SalesFact",
            schema="dbo",
            con=engine,
            if_exists="replace",
            index=False
        )
        logger.info(f"✔ dbo.SalesFact loaded (rows = {len(df):,}).")
    except Exception as e:
        raise RuntimeError(f"❌ Failed to load dbo.SalesFact: {e}")


def save_all_snapshots(dfs, salesfact_df):
    # Save each of the seven raw tables plus SalesFact as CSV & Parquet in the working directory.
    try:
        for key, df in dfs.items():
            name = key[0].upper() + key[1:]  # e.g. “Customer”, “InvoiceItem”
            df.to_csv(f"{name}.csv", index=False)
            df.to_parquet(f"{name}.parquet", index=False)
            logger.info(f"✔ Saved snapshot: {name}.csv / {name}.parquet")

        salesfact_df.to_csv("SalesFact_snapshot.csv", index=False)
        salesfact_df.to_parquet("SalesFact_snapshot.parquet", index=False)
        logger.info("✔ Saved SalesFact_snapshot.csv / SalesFact_snapshot.parquet")
    except Exception as e:
        raise RuntimeError(f"❌ Failed to save snapshots: {e}")


def run_full_etl():
    engine = None
    try:
        engine = get_sqlalchemy_engine()
        dfs = extract_all_tables(engine)
        salesfact_df = transform_sales_fact(dfs)
        load_salesfact_to_sql(salesfact_df, engine)
        save_all_snapshots(dfs, salesfact_df)
        logger.info("🎉 === ETL PIPELINE FOR Sepidar01 COMPLETED SUCCESSFULLY ===")
    except Exception as exc:
        logger.error(f"✘ ETL aborted due to exception:\n{exc}")
    finally:
        if engine is not None:
            try:
                engine.dispose()
                logger.info("Disposed SQLAlchemy engine.")
            except:
                pass

# Execute the ETL pipeline
run_full_etl()


2025-06-02 08:01:59 - INFO - ➡️ Using Windows Integrated Authentication (Trusted_Connection=yes).
2025-06-02 08:01:59 - INFO - ✔ Connected to database 'Sepidar01'. Server version: Microsoft SQL Server 2022 (RTM-GDR) (KB5046861) - 16.0.1135.2 (X64) 
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'GNR.[Party]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'SLS.[Invoice]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'SLS.[InvoiceItem]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'INV.[Item]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'PAY.[Personnel]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'INV.[ItemStock]' …
2025-06-02 08:01:59 - INFO - ➡️ Extracting 'RPA.[ReceiptHeader]' …
2025-06-02 08:01:59 - INFO - ✔ Successfully extracted all seven tables from Sepidar01.
2025-06-02 08:01:59 - INFO - ✔ SalesFact transformation complete (rows = 308).
2025-06-02 08:01:59 - INFO - ➡️ Loading SalesFact into dbo.SalesFact (if_exists='replace')…
2025-06-02 08:02:00 - INFO - ✔ dbo.SalesFact loaded (rows