**Enunciado:** Realizar una herramienta ETL que permita obtener el datamart ventas por internet (Internet Sales) y ventas por revendedores (Reseller Sales) a partir de la base de datos operacional. Documentar las dimensiones que participan y las tablas de hechos (50%). (Octubre 22)

* ¿Cuáles son las dimensiones que participan?

* ¿Cuál es el nivel de granularidad de los datos?

* ¿Cuáles son los indicadores (Medidas) de cada tabla de hechos?

In [1]:
import pandas as pd
from sqlalchemy import create_engine, inspect
import yaml
import os
import numpy as np

In [2]:
config_path = os.path.join(os.getcwd(), "config.yml")

with open(config_path, 'r') as f:
    config = yaml.safe_load(f)
    config_oltp = config['OLTP']
    config_olap = config['OLAP']

url_oltp = (f"mssql+pyodbc://{config_oltp['user']}:{config_oltp['password']}@{config_oltp['host']},{config_oltp['port']}/{config_oltp['dbname']}"
          f"?driver={config_oltp['drivername'].replace(' ', '+')}")

url_olap = (f"mssql+pyodbc://{config_olap['user']}:{config_olap['password']}@{config_olap['host']},{config_olap['port']}/{config_olap['dbname']}"
           f"?driver={config_olap['drivername'].replace(' ', '+')}")
oltp = create_engine(url_oltp)
olap = create_engine(url_olap)

Leer las tablas

In [3]:
def cargaSegura(engine, schema, table):
    inspector = inspect(engine)

    # Obtener columnas
    columnas = [col["name"] for col in inspector.get_columns(table, schema=schema)]
    columnas_problematicas = []

    # Intentar cargar tabla completa
    try:
        return pd.read_sql_table(table_name=table, con=engine, schema=schema)
    except Exception:
        pass

    # Detectar columnas problemáticas
    for col in columnas:
        try:
            pd.read_sql_query(
                f'SELECT TOP 10 "{col}" FROM "{schema}"."{table}"',
                con=engine
            )
        except Exception:
            columnas_problematicas.append(col)


    # Columnas buenas
    columnas_ok = [col for col in columnas if col not in columnas_problematicas]

    # Si no hay columnas válidas
    if not columnas_ok:
        print(f"⚠ La tabla {schema}.{table} no tiene columnas convertibles. Retornando dataframe vacío.")
        return pd.DataFrame()

    # Cargar solo columnas válidas
    query = (
        f'SELECT {", ".join([f"""\"{c}\"""" for c in columnas_ok])} '
        f'FROM "{schema}"."{table}"'
    )

    df = pd.read_sql_query(query, con=engine)
    return df


def extractHumanResources(conection):
    tablas = [
        "Shift", "Department", "Employee", "EmployeeDepartmentHistory", "EmployeePayHistory"
    ]
    humanResources = {}
    for tabla in tablas:
        df = cargaSegura(conection, "HumanResources", tabla)
        humanResources[tabla] = df
        
    return humanResources

def extractPerson(conection):
    tablas = [
        "PersonPhone", "PhoneNumberType", "Address", "AddressType",
        "StateProvince", "BusinessEntity", "BusinessEntityAddress", "BusinessEntityContact",
        "ContactType", "CountryRegion", "EmailAddress", "Password", "Person"
    ]
    person = {}
    for tabla in tablas:
        df = cargaSegura(conection, "Person", tabla)
        person[tabla] = df
        
    return person

def extractProduction(conection):
    tablas = [
        "Product", "ScrapReason", "ProductCategory", "ProductCostHistory", "ProductDescription",
        "ProductDocument", "ProductInventory", "ProductListPriceHistory", "ProductModel",
        "ProductModelIllustration", "ProductModelProductDescriptionCulture", "BillOfMaterials",
        "ProductPhoto", "ProductProductPhoto", "TransactionHistory", "ProductReview",
        "TransactionHistoryArchive", "ProductSubcategory", "UnitMeasure", "WorkOrder",
        "Culture", "WorkOrderRouting", "Document", "Illustration", "Location"
    ]
    production = {}
    for tabla in tablas:
        df = cargaSegura(conection, "Production", tabla)
        production[tabla] = df
        
    return production

def extractPurchasing(conection):
    tablas = [
        "ShipMethod", "ProductVendor", "Vendor", "PurchaseOrderDetail", "PurchaseOrderHeader"
    ]
    purchasing = {}
    for tabla in tablas:
        df = cargaSegura(conection, "Purchasing", tabla)
        purchasing[tabla] = df
        
    return purchasing

def extractSales(conection):
    tablas = [
        "CountryRegionCurrency", "CreditCard", "Currency", "CurrencyRate", "Customer",
        "PersonCreditCard", "SalesOrderDetail", "SalesOrderHeader",
        "SalesOrderHeaderSalesReason", "SalesPerson",
        "SalesPersonQuotaHistory", "SalesReason", "SalesTaxRate",
        "SalesTerritory", "SalesTerritoryHistory", "ShoppingCartItem",
        "SpecialOffer", "SpecialOfferProduct", "Store"
    ]
    sales = {}
    for tabla in tablas:
        df = cargaSegura(conection, "Sales", tabla)
        sales[tabla] = df
        
    return sales

In [4]:
def extractEmployeeHierarchy(engine):
    query = """
    SELECT 
        e.BusinessEntityID AS EmployeeID,
        e.NationalIDNumber AS EmployeeNationalIDAlternateKey,
        e.OrganizationNode.ToString() AS OrgNode,
        m.BusinessEntityID AS ParentEmployeeKey,
        m.NationalIDNumber AS ParentEmployeeNationalIDAlternateKey
    FROM HumanResources.Employee e
    LEFT JOIN HumanResources.Employee m
        ON e.OrganizationNode.GetAncestor(1) = m.OrganizationNode;
    """
    return pd.read_sql_query(query, con=engine)

In [5]:
humanResources =  extractHumanResources(oltp)
person = extractPerson(oltp)
production = extractProduction(oltp)
purchasing = extractPurchasing(oltp) #Funciona
sales = extractSales(oltp) #Funciona

  columnas = [col["name"] for col in inspector.get_columns(table, schema=schema)]
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  columnas = [col["name"] for col in inspector.get_columns(table, schema=schema)]
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  columnas = [col["name"] for col in inspector.get_columns(table, schema=schema)]
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  columnas = [col["name"] for col in inspector.get_columns(table, schema=schema)]
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.reflect(bind=self.con, only=[table_name], views=True)
  self.meta.

In [6]:
sales["Currency"].describe(include='all')

Unnamed: 0,CurrencyCode,Name,ModifiedDate
count,105,105,105
unique,105,105,
top,AED,Emirati Dirham,
freq,1,1,
mean,,,2008-04-30 00:00:00
min,,,2008-04-30 00:00:00
25%,,,2008-04-30 00:00:00
50%,,,2008-04-30 00:00:00
75%,,,2008-04-30 00:00:00
max,,,2008-04-30 00:00:00


In [7]:
def transformDimCurrency(currency):
    dimCurrency = pd.DataFrame(columns=[
        "CurrencyKey", "CurrencyAlternateKey", "CurrencyName"
    ])
    
    dimCurrency["CurrencyAlternateKey"] = currency["CurrencyCode"] 
    dimCurrency["CurrencyName"] = currency["Name"] 
    dimCurrency["CurrencyKey"] = range(1, len(dimCurrency) + 1)
    
    return dimCurrency

In [8]:
dimCurrency = transformDimCurrency(sales["Currency"])
dimCurrency

Unnamed: 0,CurrencyKey,CurrencyAlternateKey,CurrencyName
0,1,AED,Emirati Dirham
1,2,AFA,Afghani
2,3,ALL,Lek
3,4,AMD,Armenian Dram
4,5,ANG,Netherlands Antillian Guilder
...,...,...,...
100,101,VEB,Bolivar
101,102,VND,Dong
102,103,XOF,CFA Franc BCEAO
103,104,ZAR,Rand


In [9]:
hierarchy = extractEmployeeHierarchy(oltp)
hierarchy.head()

Unnamed: 0,EmployeeID,EmployeeNationalIDAlternateKey,OrgNode,ParentEmployeeKey,ParentEmployeeNationalIDAlternateKey
0,1,295847284,,,
1,2,245797967,/1/,,
2,3,509647174,/1/1/,2.0,245797967.0
3,4,112457891,/1/1/1/,3.0,509647174.0
4,5,695256908,/1/1/2/,3.0,509647174.0


In [10]:
def transformDimEmployee(employee, employeePayHistory, employeeDepartmentHistory, department, salesPerson, person, emailAddress, personPhone, hierarchy):
    dimEmployee = pd.DataFrame(columns=[
        "EmployeeKey", "EmployeeNationalIDAlternateKey", "Title", "HireDate", "BirthDate", "LoginID",
        "MaritalStatus", "SalariedFlag", "Gender",
        "VacationHours", "SickLeaveHours", "CurrentFlag", "SalesPersonFlag", "Status"
    ])

    employeePayHistory = (
        employeePayHistory.sort_values("RateChangeDate")
        .groupby("BusinessEntityID")
        .tail(1)
    )
    
    dimEmployee["EmployeeKey"] = employee["BusinessEntityID"]
    dimEmployee["EmployeeNationalIDAlternateKey"] = employee["NationalIDNumber"]
    dimEmployee["Title"] = employee["JobTitle"]
    dimEmployee["HireDate"] = employee["HireDate"]
    dimEmployee["BirthDate"] = employee["BirthDate"]
    dimEmployee["LoginID"] = employee["LoginID"]
    dimEmployee["MaritalStatus"] = employee["MaritalStatus"]
    dimEmployee["SalariedFlag"] = employee["SalariedFlag"].astype(int)
    dimEmployee["Gender"] = employee["Gender"]
    dimEmployee["VacationHours"] = employee["VacationHours"]
    dimEmployee["SickLeaveHours"] = employee["SickLeaveHours"]
    dimEmployee["CurrentFlag"] = employee["CurrentFlag"].astype(int)

    dimEmployee = dimEmployee.merge(
        hierarchy[["EmployeeID", "ParentEmployeeKey", "ParentEmployeeNationalIDAlternateKey"]],
        left_on="EmployeeKey",
        right_on="EmployeeID",
        how="left"
    ).drop(columns=["EmployeeID"])

    dimEmployee = dimEmployee.merge(
        salesPerson[["BusinessEntityID", "TerritoryID"]],
        left_on="EmployeeKey",
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"]) \
     .rename(columns={"TerritoryID": "SalesTerritoryKey"})
    
    dimEmployee["SalesTerritoryKey"] = dimEmployee["SalesTerritoryKey"].fillna(11)

    dimEmployee = dimEmployee.merge(
        person[["BusinessEntityID", "FirstName", "LastName", "MiddleName", "NameStyle"]],
        left_on="EmployeeKey",
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"])

    dimEmployee["NameStyle"] = dimEmployee["NameStyle"].astype(int)

    dimEmployee = dimEmployee.merge(
        emailAddress[["BusinessEntityID", "EmailAddress"]],
        left_on="EmployeeKey",
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"])

    dimEmployee = dimEmployee.merge( 
        personPhone[["BusinessEntityID", "PhoneNumber"]], 
        left_on="EmployeeKey", 
        right_on="BusinessEntityID", 
        how="left" 
    ).drop(columns=["BusinessEntityID"]) \
     .rename(columns={"PhoneNumber": "Phone"})
    
    dimEmployee = dimEmployee.merge(
        employeePayHistory[["BusinessEntityID", "PayFrequency", "Rate"]],
        left_on="EmployeeKey",
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"]) \
     .rename(columns={"Rate": "BaseRate"})

    
    dimEmployee = dimEmployee.merge(
        employeeDepartmentHistory[["BusinessEntityID", "DepartmentID", "StartDate", "EndDate"]],
        left_on="EmployeeKey",
        right_on="BusinessEntityID",
        how="left"
    ).merge(
        department[["DepartmentID", "Name"]],
        left_on="DepartmentID",
        right_on="DepartmentID",
        how="left"
    ).rename(columns={"Name": "DepartmentName"}).drop(columns=["BusinessEntityID", "DepartmentID"])


    dimEmployee["SalesPersonFlag"] = np.where(
        (dimEmployee["DepartmentName"].str.contains("Sales", na=False)) &
        (dimEmployee["Title"] != "Vice President of Engineering"),
        1,
        0
    )

    dimEmployee["Status"] = np.where(
        dimEmployee["EndDate"].isna(),
        "Current",
        None
    )

    column_order = [
        "EmployeeKey", "ParentEmployeeKey", "EmployeeNationalIDAlternateKey", "ParentEmployeeNationalIDAlternateKey", 
        "SalesTerritoryKey", "FirstName", "LastName", "MiddleName", "NameStyle", "Title", "HireDate", "BirthDate", 
        "LoginID", "EmailAddress", "Phone", "MaritalStatus", "SalariedFlag", "Gender", "PayFrequency", "BaseRate", 
        "VacationHours", "SickLeaveHours", "CurrentFlag", "SalesPersonFlag", "DepartmentName", "StartDate", "EndDate","Status"
    ]

    dimEmployee = dimEmployee[column_order]
    dimEmployee["EmployeeKey"] = range(1, len(dimEmployee) + 1)
    lookup = dimEmployee.set_index("EmployeeNationalIDAlternateKey")["EmployeeKey"].to_dict()
    dimEmployee["ParentEmployeeKey"] = dimEmployee["ParentEmployeeNationalIDAlternateKey"].map(lookup)

    ceo_key = dimEmployee.loc[
        dimEmployee["Title"] == "Chief Executive Officer", "EmployeeKey"
    ].iloc[0]
    dimEmployee.loc[
        dimEmployee["ParentEmployeeKey"].isna() & (dimEmployee["EmployeeKey"] != ceo_key),
        ["ParentEmployeeKey", "ParentEmployeeNationalIDAlternateKey"]
    ] = [
        ceo_key,
        dimEmployee.loc[dimEmployee["EmployeeKey"] == ceo_key, "EmployeeNationalIDAlternateKey"].iloc[0]
    ]
    
    return dimEmployee

In [11]:
pd.set_option("display.max_columns", None)
pd.set_option("display.width", None)

In [12]:
dim = transformDimEmployee(
    humanResources["Employee"],
    humanResources["EmployeePayHistory"],
    humanResources["EmployeeDepartmentHistory"],
    humanResources["Department"],
    sales["SalesPerson"],
    person["Person"],
    person["EmailAddress"],
    person["PersonPhone"],
    hierarchy
)

In [103]:
def transformDimReseller(customer, salesOrderHeader, personPhone, personAddress, personBusinessEntityAddress, demographics):
    dimReseller = pd.DataFrame(columns=[
        "ResellerKey", "GeographyKey", "ResellerAlternateKey", 
         "OrderFrequency", 
        "OrderMonth", "FirstOrderYear", "LastOrderYear",    "MinPaymentType", "MinPaymentAmount", 
         "IDStore"
    ])

    #demographics = utils_etl.extractStoreDemographics(oltp)

    # Este es para usarlo solo para sacar el CustomerID que va a SalesOrderHeader

    customersNoNulos = customer[
        customer["PersonID"].notna() & customer["StoreID"].notna()
    ].copy()  

    # Renombrar CustomerID a CustomerStoreID
    customersNoNulos = customersNoNulos.rename(columns={"CustomerID": "CustomerStoreID"})

    ####

    customer = customer[customer["StoreID"].notna()]


    dimReseller["ResellerKey"] = customer["CustomerID"]
    dimReseller["ResellerAlternateKey"] = customer["AccountNumber"]
    dimReseller["IDStore"] = customer["StoreID"]


    # Datos que se pueden traer desde demographics
    dimReseller = dimReseller.merge(
        demographics[["BusinessEntityID", "ResellerName", "BusinessType", "NumberEmployees", "AnnualSales", "BankName", "AnnualRevenue", "YearOpened", "ProductLine"]],
        left_on="IDStore",
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"])

    # Teléfono
    dimReseller = dimReseller.merge(
        personPhone[["BusinessEntityID", "PhoneNumber"]],
        left_on=dimReseller["IDStore"] - 1, # PersonID es StoreID - 1
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"]) \
     .rename(columns={"PhoneNumber": "Phone"})
    
    # Direccion
    dimReseller = dimReseller.merge(
        personBusinessEntityAddress[["BusinessEntityID", "AddressID"]],
        left_on=dimReseller["IDStore"],
        right_on="BusinessEntityID",
        how="left"
    ).drop(columns=["BusinessEntityID"])

    dimReseller = dimReseller.merge(
        personAddress[["AddressID", "AddressLine1", "AddressLine2"]],
        on="AddressID",
        how="left"
    ).drop(columns=["AddressID"])

    # Tipo de negocio 
    codeBusiness = {"BM": "Value Added Reseller", "BS": "Specialty Bike Shop", "OS": "Warehouse"}
    dimReseller["BusinessType"] = dimReseller["BusinessType"].map(codeBusiness)

    # Orders
    dimReseller = dimReseller.merge(
        customersNoNulos[["CustomerStoreID", "StoreID"]],
        left_on=dimReseller["IDStore"],
        right_on="StoreID",
        how="left"
    )
    

    dimReseller = dimReseller.merge(
        salesOrderHeader[["CustomerID", "OrderDate"]],
        left_on=dimReseller["CustomerStoreID"],
        right_on="CustomerID",
        how="left"
    )

    order_counts = dimReseller.groupby("CustomerStoreID")["OrderDate"].count()
    dimReseller["OrderFrequency"] = dimReseller["CustomerStoreID"].map(order_counts)
    dimReseller["OrderMonth"] = dimReseller["OrderDate"].dt.month
    dimReseller["FirstOrderYear"] = dimReseller.groupby("CustomerStoreID")["OrderDate"].transform("min").dt.year
    dimReseller["LastOrderYear"]  = dimReseller.groupby("CustomerStoreID")["OrderDate"].transform("max").dt.year

    # Frecuency
    



    # Pasar las columnas a int
    cols_int = ["NumberEmployees", "YearOpened",  "OrderMonth", "FirstOrderYear", "LastOrderYear", "OrderFrequency"]

    for c in cols_int:
        dimReseller[c] = dimReseller[c].astype("Int64")
    
    column_order = ["ResellerKey", "GeographyKey", "ResellerAlternateKey", "Phone", "BusinessType", "ResellerName", 
                    "NumberEmployees", "OrderFrequency", "OrderMonth", "FirstOrderYear", "LastOrderYear", 
                    "ProductLine", "AddressLine1", "AddressLine2", "AnnualSales", "BankName", "MinPaymentType", 
                    "MinPaymentAmount", "AnnualRevenue", "YearOpened"]
    dimReseller = dimReseller[column_order]
    dimReseller = dimReseller.drop_duplicates(subset=["ResellerKey"])

    return dimReseller

In [104]:
def extractStoreDemographics(engine):
    query = """
    WITH XMLNAMESPACES (
        'http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey' AS ss
    )
    SELECT 
        s.BusinessEntityID AS BusinessEntityID,
        s.Name AS ResellerName,
        s.SalesPersonID AS StorePersonID,

        s.Demographics.value('(ss:StoreSurvey/ss:YearOpened)[1]', 'int') AS YearOpened,
        s.Demographics.value('(ss:StoreSurvey/ss:AnnualSales)[1]', 'money') AS AnnualSales,
        s.Demographics.value('(ss:StoreSurvey/ss:AnnualRevenue)[1]', 'money') AS AnnualRevenue,
        s.Demographics.value('(ss:StoreSurvey/ss:NumberEmployees)[1]', 'int') AS NumberEmployees,
        s.Demographics.value('(ss:StoreSurvey/ss:BankName)[1]', 'nvarchar(100)') AS BankName,
        s.Demographics.value('(ss:StoreSurvey/ss:BusinessType)[1]', 'nvarchar(20)') AS BusinessType,
        s.Demographics.value('(ss:StoreSurvey/ss:Specialty)[1]', 'nvarchar(50)') AS ProductLine

    FROM Sales.Store s;
    """
    
    return pd.read_sql_query(query, con=engine)

storeDemographics = extractStoreDemographics(oltp)

In [105]:
dimReseller = transformDimReseller(
    sales["Customer"],
    sales["SalesOrderHeader"],
    person["PersonPhone"],
    person["Address"],
    person["BusinessEntityAddress"],
    storeDemographics
)
dimReseller

Unnamed: 0,ResellerKey,GeographyKey,ResellerAlternateKey,Phone,BusinessType,ResellerName,NumberEmployees,OrderFrequency,OrderMonth,FirstOrderYear,LastOrderYear,ProductLine,AddressLine1,AddressLine2,AnnualSales,BankName,MinPaymentType,MinPaymentAmount,AnnualRevenue,YearOpened
0,1,,AW00000001,245-555-0173,Value Added Reseller,A Bike Store,2,8,7,2011,2012,Road,2251 Elliot Avenue,,300000.0,International Bank,,,30000.0,1970
4,2,,AW00000002,170-555-0127,Specialty Bike Shop,Progressive Sports,10,32,6,2012,2014,Mountain,7943 Walnut Ave,,800000.0,International Security,,,80000.0,1972
20,3,,AW00000003,279-555-0130,Warehouse,Advanced Bike Components,40,24,8,2011,2014,Road,12345 Sterling Avenue,,1500000.0,Primary International,,,150000.0,1974
32,4,,AW00000004,710-555-0173,Value Added Reseller,Modular Cycle Systems,5,32,5,2012,2014,Road,800 Interchange Blvd.,Suite 2501,300000.0,United Security,,,30000.0,1976
48,5,,AW00000005,828-555-0186,Specialty Bike Shop,Metropolitan Sports Supply,13,16,7,2012,2014,Road,482505 Warm Springs Blvd.,,800000.0,Primary Bank & Reserve,,,80000.0,1978
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7776,30114,,AW00030114,1 (11) 500 555-0116,Value Added Reseller,Recreation Toy Store,11,16,5,2012,2014,Mountain,"39, avenue des Laurentides",,800000.0,International Bank,,,80000.0,1988
7784,30115,,AW00030115,155-555-0140,Value Added Reseller,Retreat Inn,8,16,6,2012,2014,Road,Suite 2502 410 Albert Street,,300000.0,Primary Bank & Reserve,,,30000.0,1982
7792,30116,,AW00030116,433-555-0168,Value Added Reseller,Technical Parts Manufacturing,5,8,5,2013,2014,Touring,Ontario Mills,,300000.0,International Security,,,30000.0,1976
7796,30117,,AW00030117,560-555-0171,Value Added Reseller,Totes & Baskets Company,2,24,7,2011,2014,Road,72540 Blanco Rd.,,300000.0,Guardian Bank,,,30000.0,1970


In [106]:
# 1. Identificar Resellers (PersonID nulo)
resellers = sales["Customer"].loc[
    sales["Customer"]["PersonID"].isna() &
    sales["Customer"]["StoreID"].notna(),
    ["CustomerID", "StoreID"]
]

print("Cantidad de Resellers:", len(resellers))
print("Ejemplos:", resellers.head(), "\n")


# 2. Buscar órdenes realizadas por esos Resellers
reseller_ids = set(resellers["CustomerID"])

reseller_orders = sales["SalesOrderHeader"].loc[
    sales["SalesOrderHeader"]["CustomerID"].isin(reseller_ids)
]

print("Órdenes realizadas por Resellers:", len(reseller_orders))
print(reseller_orders.head(), "\n")


# 3. Confirmar si algún CustomerID en SalesOrderHeader es un Store
header_customers = sales["SalesOrderHeader"][["CustomerID"]].merge(
    sales["Customer"][["CustomerID", "PersonID", "StoreID"]],
    on="CustomerID",
    how="left"
)

header_with_stores = header_customers[header_customers["StoreID"].notna()]

print("SalesOrderHeader que tienen StoreID:", len(header_with_stores))
print(header_with_stores.head())


Cantidad de Resellers: 701
Ejemplos:    CustomerID  StoreID
0           1    934.0
1           2   1028.0
2           3    642.0
3           4    932.0
4           5   1026.0 

Órdenes realizadas por Resellers: 0
Empty DataFrame
Columns: [SalesOrderID, RevisionNumber, OrderDate, DueDate, ShipDate, Status, OnlineOrderFlag, SalesOrderNumber, PurchaseOrderNumber, AccountNumber, CustomerID, SalesPersonID, TerritoryID, BillToAddressID, ShipToAddressID, ShipMethodID, CreditCardID, CreditCardApprovalCode, CurrencyRateID, SubTotal, TaxAmt, Freight, TotalDue, Comment, rowguid, ModifiedDate]
Index: [] 

SalesOrderHeader que tienen StoreID: 3806
   CustomerID  PersonID  StoreID
0       29825    1045.0   1046.0
1       29672     721.0    722.0
2       29734     851.0    852.0
3       29994    1417.0   1418.0
4       29565     483.0    484.0


In [107]:
def transformFactResellerSales(product, salesOrderDetail, salesOrderHeader, dimCurrency, currencyRate, stateProvince, salesTaxRate, dimReseller):
  
  # Start with salesOrderDetail instead of empty DataFrame
  salesOrderDetail = salesOrderDetail.copy()
  salesOrderDetail["SalesOrderLineNumber"] = (
    salesOrderDetail.groupby("SalesOrderID").cumcount() + 1
  )

  # Start building factResellerSales from salesOrderDetail
  factResellerSales = salesOrderDetail[["ProductID", "SalesOrderID", "SpecialOfferID", 
                                         "SalesOrderLineNumber", "OrderQty", "UnitPrice", 
                                         "UnitPriceDiscount", "LineTotal", "CarrierTrackingNumber"]].rename(
    columns={"ProductID": "ProductKey"}
  )

  # Now merge with salesOrderHeader
  factResellerSales = factResellerSales.merge(
        salesOrderHeader[["SalesOrderID", "SalesOrderNumber", "RevisionNumber", "OrderDate", 
                         "DueDate", "ShipDate", "TerritoryID", "CustomerID",
                         "Freight", "CurrencyRateID"]],
        on="SalesOrderID",
        how="left"
  ).rename(columns={
      "SpecialOfferID": "PromotionKey", 
      "OrderQty": "OrderQuantity", 
      "UnitPriceDiscount": "UnitPriceDiscountPct", 
      "TerritoryID": "SalesTerritoryKey", 
      "LineTotal": "SalesAmount"
  }).drop(columns=["SalesOrderID"])

  factResellerSales = factResellerSales.merge(
        dimReseller[["ResellerKey", "ResellerAlternateKey"]],
        left_on="CustomerID",
        right_on="ResellerKey",
        how="left"
  ).drop(columns=["CustomerID"])

  factResellerSales = factResellerSales[factResellerSales["ResellerKey"].notna()]

  factResellerSales = factResellerSales.merge(
        product[["ProductID", "StandardCost"]],
        left_on="ProductKey",
        right_on="ProductID",
        how="left"
    ).rename(columns={"StandardCost": "ProductStandardCost"}) \
     .drop(columns=["ProductID"])
  
  factResellerSales = factResellerSales.merge(
    currencyRate[["CurrencyRateID", "ToCurrencyCode"]],
    on="CurrencyRateID",
    how="left"
  ).drop(columns=["CurrencyRateID"]).merge(
    dimCurrency[["CurrencyAlternateKey", "CurrencyKey"]],
    left_on="ToCurrencyCode",
    right_on="CurrencyAlternateKey",
    how="left"
  ).drop(columns=["CurrencyAlternateKey", "ToCurrencyCode"])

  factResellerSales = factResellerSales.merge(
     stateProvince[["StateProvinceID", "TerritoryID"]],
     left_on="SalesTerritoryKey",
     right_on="TerritoryID",
     how="left"
  ).drop(columns=["TerritoryID"]).merge(
     salesTaxRate[["StateProvinceID", "TaxRate"]],
     on="StateProvinceID",
     how="left"
  ).drop(columns=["StateProvinceID"])

  factResellerSales["OrderDateKey"] = (
    factResellerSales["OrderDate"]
        .dt.strftime("%Y%m%d")
        .astype("Int64")
)

  factResellerSales["DueDateKey"] = (
      factResellerSales["DueDate"]
          .dt.strftime("%Y%m%d")
          .astype("Int64")
  )

  factResellerSales["ShipDateKey"] = (
      factResellerSales["ShipDate"]
          .dt.strftime("%Y%m%d")
          .astype("Int64")
  )

  factResellerSales["ExtendedAmount"] = factResellerSales["UnitPrice"] * factResellerSales["OrderQuantity"]
  factResellerSales["DiscountAmount"] = factResellerSales["ExtendedAmount"] * factResellerSales["UnitPriceDiscountPct"]
  factResellerSales["TotalProductCost"] = factResellerSales["ProductStandardCost"] * factResellerSales["OrderQuantity"]
  factResellerSales["TaxAmt"] = (factResellerSales["ExtendedAmount"] - factResellerSales["DiscountAmount"]) * (factResellerSales["TaxRate"] / 100)

  factResellerSales = factResellerSales.drop(columns=["TaxRate"])

  column_order = ["ProductKey", "OrderDateKey", "DueDateKey", "ShipDateKey", "ResellerKey", "PromotionKey", "CurrencyKey",
    "SalesTerritoryKey", "SalesOrderNumber", "SalesOrderLineNumber", "RevisionNumber", "OrderQuantity", 
    "UnitPrice", "ExtendedAmount", "UnitPriceDiscountPct", "DiscountAmount", "ProductStandardCost", "TotalProductCost",
    "SalesAmount", "TaxAmt", "Freight", "CarrierTrackingNumber", "OrderDate", "DueDate", "ShipDate"]
  
  factResellerSales = factResellerSales[column_order]
  
  return factResellerSales

In [109]:
fact = transformFactResellerSales(
    production["Product"],
    sales["SalesOrderDetail"],
    sales["SalesOrderHeader"],
    dimCurrency,
    sales["CurrencyRate"],
    person["StateProvince"],
    sales["SalesTaxRate"],
    dimReseller
)

fact
fact[fact["SalesOrderNumber"] == "SO43659"]
#dimCustomer[dimCustomer["CustomerKey"] == 11146]

Unnamed: 0,ProductKey,OrderDateKey,DueDateKey,ShipDateKey,ResellerKey,PromotionKey,CurrencyKey,SalesTerritoryKey,SalesOrderNumber,SalesOrderLineNumber,RevisionNumber,OrderQuantity,UnitPrice,ExtendedAmount,UnitPriceDiscountPct,DiscountAmount,ProductStandardCost,TotalProductCost,SalesAmount,TaxAmt,Freight,CarrierTrackingNumber,OrderDate,DueDate,ShipDate
0,776,20110531,20110612,20110607,29825.0,1,,5,SO43659,1,8,1,2024.9940,2024.994,0.0,0.0,1898.0944,1898.0944,2024.994,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
1,776,20110531,20110612,20110607,29825.0,1,,5,SO43659,1,8,1,2024.9940,2024.994,0.0,0.0,1898.0944,1898.0944,2024.994,161.999520,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
2,776,20110531,20110612,20110607,29825.0,1,,5,SO43659,1,8,1,2024.9940,2024.994,0.0,0.0,1898.0944,1898.0944,2024.994,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
3,776,20110531,20110612,20110607,29825.0,1,,5,SO43659,1,8,1,2024.9940,2024.994,0.0,0.0,1898.0944,1898.0944,2024.994,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
4,776,20110531,20110612,20110607,29825.0,1,,5,SO43659,1,8,1,2024.9940,2024.994,0.0,0.0,1898.0944,1898.0944,2024.994,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
139,711,20110531,20110612,20110607,29825.0,1,,5,SO43659,12,8,4,20.1865,80.746,0.0,0.0,13.0863,52.3452,80.746,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
140,711,20110531,20110612,20110607,29825.0,1,,5,SO43659,12,8,4,20.1865,80.746,0.0,0.0,13.0863,52.3452,80.746,5.854085,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
141,711,20110531,20110612,20110607,29825.0,1,,5,SO43659,12,8,4,20.1865,80.746,0.0,0.0,13.0863,52.3452,80.746,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
142,711,20110531,20110612,20110607,29825.0,1,,5,SO43659,12,8,4,20.1865,80.746,0.0,0.0,13.0863,52.3452,80.746,,616.0984,4911-403C-98,2011-05-31,2011-06-12,2011-06-07
