Conexiones

In [1]:
import pandas as pd
import yaml
from sqlalchemy import create_engine

# cargar configuraciones
with open('../config_fill.yml', 'r') as f:
    config = yaml.safe_load(f)
    config_aw = config['Adventure_Works']
    config_etl = config['ETL_PRO']

url_aw = f"mssql+pyodbc://@{config_aw['host']}/{config_aw['dbname']}?driver={config_aw['driver'].replace(' ', '+')}&trusted_connection={config_aw['trusted_connection']}"
url_etl = f"{config_etl['drivername']}://{config_etl['user']}:{config_etl['password']}@{config_etl['host']}:{config_etl['port']}/{config_etl['dbname']}"

aw_engine = create_engine(url_aw)      # SQL Server (OLTP)
etl_engine = create_engine(url_etl)    # Postgres (DWH)


Extraemos ventas de reseller 

In [2]:
query_fact = """
SELECT
    sod.SalesOrderID,
    sod.SalesOrderDetailID,
    soh.OrderDate,
    soh.DueDate,
    soh.ShipDate,

    c.StoreID,
    soh.SalesPersonID,
    soh.TerritoryID,

    sod.ProductID,
    sod.OrderQty,
    sod.UnitPrice,
    sod.UnitPriceDiscount,
    sod.LineTotal,

    soh.TaxAmt,
    soh.Freight
FROM Sales.SalesOrderDetail sod
JOIN Sales.SalesOrderHeader soh
    ON sod.SalesOrderID = soh.SalesOrderID
JOIN Sales.Customer c
    ON soh.CustomerID = c.CustomerID
WHERE c.StoreID IS NOT NULL;
"""

df_fact = pd.read_sql(query_fact, aw_engine)
df_fact.head()


Unnamed: 0,SalesOrderID,SalesOrderDetailID,OrderDate,DueDate,ShipDate,StoreID,SalesPersonID,TerritoryID,ProductID,OrderQty,UnitPrice,UnitPriceDiscount,LineTotal,TaxAmt,Freight
0,43659,1,2011-05-31,2011-06-12,2011-06-07,1046,279,5,776,1,2024.994,0.0,2024.994,1971.5149,616.0984
1,43659,2,2011-05-31,2011-06-12,2011-06-07,1046,279,5,777,3,2024.994,0.0,6074.982,1971.5149,616.0984
2,43659,3,2011-05-31,2011-06-12,2011-06-07,1046,279,5,778,1,2024.994,0.0,2024.994,1971.5149,616.0984
3,43659,4,2011-05-31,2011-06-12,2011-06-07,1046,279,5,771,1,2039.994,0.0,2039.994,1971.5149,616.0984
4,43659,5,2011-05-31,2011-06-12,2011-06-07,1046,279,5,772,1,2039.994,0.0,2039.994,1971.5149,616.0984


In [3]:
# cargar dim_date
dim_date = pd.read_sql("SELECT date_key, full_date FROM dim_date", etl_engine)

# convertir a datetime
df_fact["OrderDate"] = pd.to_datetime(df_fact["OrderDate"])
df_fact["DueDate"] = pd.to_datetime(df_fact["DueDate"])
df_fact["ShipDate"] = pd.to_datetime(df_fact["ShipDate"])

# merge para order_date_key
df_fact = df_fact.merge(dim_date, left_on="OrderDate", right_on="full_date", how="left")
df_fact.rename(columns={"date_key": "order_date_key"}, inplace=True)
df_fact.drop(columns=["full_date"], inplace=True)

# due_date_key
df_fact = df_fact.merge(dim_date, left_on="DueDate", right_on="full_date", how="left")
df_fact.rename(columns={"date_key": "due_date_key"}, inplace=True)
df_fact.drop(columns=["full_date"], inplace=True)

# ship_date_key
df_fact = df_fact.merge(dim_date, left_on="ShipDate", right_on="full_date", how="left")
df_fact.rename(columns={"date_key": "ship_date_key"}, inplace=True)
df_fact.drop(columns=["full_date"], inplace=True)


In [4]:
dim_reseller = pd.read_sql("SELECT reseller_key, reseller_id FROM dim_reseller", etl_engine)

df_fact = df_fact.merge(
    dim_reseller,
    left_on="StoreID",
    right_on="reseller_id",
    how="left"
)


In [5]:
dim_salesperson = pd.read_sql("SELECT salesperson_key, salesperson_id FROM dim_salesperson", etl_engine)

df_fact = df_fact.merge(
    dim_salesperson,
    left_on="SalesPersonID",
    right_on="salesperson_id",
    how="left"
)


In [6]:
dim_product = pd.read_sql("SELECT product_key, product_id FROM dim_product", etl_engine)

df_fact = df_fact.merge(
    dim_product,
    left_on="ProductID",
    right_on="product_id",
    how="left"
)


In [7]:
dim_territory = pd.read_sql("SELECT territory_key, territory_id FROM dim_territory", etl_engine)

df_fact = df_fact.merge(
    dim_territory,
    left_on="TerritoryID",
    right_on="territory_id",
    how="left"
)


In [8]:
df_fact["extended_amount"] = df_fact["OrderQty"] * df_fact["UnitPrice"]
df_fact["discount_amount"] = df_fact["extended_amount"] * df_fact["UnitPriceDiscount"]
df_fact["sales_amount"] = df_fact["extended_amount"] - df_fact["discount_amount"]


In [9]:
fact_reseller = df_fact[[
    "SalesOrderID",
    "SalesOrderDetailID",

    "order_date_key",
    "due_date_key",
    "ship_date_key",

    "reseller_key",
    "salesperson_key",
    "product_key",
    "territory_key",

    "OrderQty",
    "UnitPrice",
    "UnitPriceDiscount",
    "extended_amount",
    "discount_amount",
    "sales_amount",
    "TaxAmt",
    "Freight"
]]


In [10]:
fact_reseller.to_sql(
    "fact_reseller_sales",
    etl_engine,
    if_exists="replace",
    index=False
)


919