# ETL con Python


1. Usaremos la libreria psycopg2 para conectarmos a PostgreSQL
2. Tambien con la libreria pandas podemos ejecutar diferentes consultas
3. Simulando "secretos" guardaremos los valores de conexión a la base de datos en un archivo json (host,port,database,user and password)
4. Los procesos de transformación se haran con uso del motor de SQL (simulando un entorno correcto)

## Configuración de entorno

In [1]:
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
import json
import datetime

In [2]:
# Simulando secretos
with open('./files_config/config_pg.json','r') as f:
    config_pg = json.load(f)

## Conexión a la base de datos

In [3]:
conn_pg = psycopg2.connect(host=config_pg['dev']['host'],
                           port=config_pg['dev']['puerto'],
                           database=config_pg['dev']['db_nm'],
                           user=config_pg['dev']['usr'],
                           password=config_pg['dev']['pass'])
curs_pg = conn_pg.cursor()

In [4]:
# Ejecutamos una consulta
curs_pg.execute(""" SELECT * FROM information_schema.columns
                    WHERE table_schema = 'brz_sales_mx' AND table_name = 'dim_salesperson'; """ )

# Recorremos los resultados y los mostramos
for row_pg in curs_pg.fetchall() :
    print(row_pg)

('dev_ventas', 'brz_sales_mx', 'dim_salesperson', 'employeeid', 1, None, 'NO', 'smallint', None, None, 16, 2, 0, None, None, None, None, None, None, None, None, None, None, None, None, 'dev_ventas', 'pg_catalog', 'int2', None, None, None, None, '1', 'NO', 'NO', None, None, None, None, None, 'NO', 'NEVER', None, 'YES')
('dev_ventas', 'brz_sales_mx', 'dim_salesperson', 'employeekey', 2, None, 'NO', 'integer', None, None, 32, 2, 0, None, None, None, None, None, None, None, None, None, None, None, None, 'dev_ventas', 'pg_catalog', 'int4', None, None, None, None, '2', 'NO', 'NO', None, None, None, None, None, 'NO', 'NEVER', None, 'YES')
('dev_ventas', 'brz_sales_mx', 'dim_salesperson', 'namemain', 3, None, 'NO', 'character varying', 100, 400, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, 'dev_ventas', 'pg_catalog', 'varchar', None, None, None, None, '3', 'NO', 'NO', None, None, None, None, None, 'NO', 'NEVER', None, 'YES')
('dev_ventas', 'brz_sale

In [5]:
display(pd.read_sql_query(""" SELECT * FROM information_schema.columns
                          WHERE table_schema = 'brz_sales_mx' AND table_name = 'dim_salesperson'; """,con=conn_pg))

  display(pd.read_sql_query(""" SELECT * FROM information_schema.columns


Unnamed: 0,table_catalog,table_schema,table_name,column_name,ordinal_position,column_default,is_nullable,data_type,character_maximum_length,character_octet_length,...,is_identity,identity_generation,identity_start,identity_increment,identity_maximum,identity_minimum,identity_cycle,is_generated,generation_expression,is_updatable
0,dev_ventas,brz_sales_mx,dim_salesperson,employeeid,1,,NO,smallint,,,...,NO,,,,,,NO,NEVER,,YES
1,dev_ventas,brz_sales_mx,dim_salesperson,employeekey,2,,NO,integer,,,...,NO,,,,,,NO,NEVER,,YES
2,dev_ventas,brz_sales_mx,dim_salesperson,namemain,3,,NO,character varying,100.0,400.0,...,NO,,,,,,NO,NEVER,,YES
3,dev_ventas,brz_sales_mx,dim_salesperson,lastname,4,,NO,character varying,100.0,400.0,...,NO,,,,,,NO,NEVER,,YES
4,dev_ventas,brz_sales_mx,dim_salesperson,position,5,,NO,character varying,100.0,400.0,...,NO,,,,,,NO,NEVER,,YES
5,dev_ventas,brz_sales_mx,dim_salesperson,mail,6,,NO,character varying,100.0,400.0,...,NO,,,,,,NO,NEVER,,YES
6,dev_ventas,brz_sales_mx,dim_salesperson,usr_load,7,,NO,character varying,10.0,40.0,...,NO,,,,,,NO,NEVER,,YES
7,dev_ventas,brz_sales_mx,dim_salesperson,ts_creation,8,,NO,timestamp without time zone,,,...,NO,,,,,,NO,NEVER,,YES


## Lectura de insumos para ingestar en las diferentes capas

In [6]:
# Read_dataframes

pd_sp = pd.read_csv('./files_sales/persons.csv')
pd_region = pd.read_csv('./files_sales/region.csv')
pd_products = pd.read_csv('./files_sales/products.csv')
pd_dates = pd.read_csv('./files_sales/dates.csv')
pd_sales = pd.read_csv('./files_sales/sales.csv')

## Ingesta de tabla de personas

In [7]:
pd_sp.head()

Unnamed: 0,EmployeeID,EmployeeKey,NameMain,LastName,Position,Mail
0,500,90841,Nicki,Mendoza,Sales Representative,nicki_mendoza@salesmx.com
1,501,52104,Carlos,Cervantes,Sales Representative,carlos_cervantes@salesmx.com
2,502,74241,Sarah,Vega,Sales Representative,sarah_vega@salesmx.com
3,503,48316,Ramon,Xiao,Sales Representative,ramon_xiao@salesmx.com
4,504,26118,Sarah,Jiang,Manager,sarah_jiang@salesmx.com


### Inserción de datos a la capa BRONCE

Insertaremos datos de prueba a la tabla **brz_sales_mx.dim_salesperson**

In [8]:
pd_sp_test = pd_sp[pd_sp['Position'] == 'Director of sales']

# Agregando columnas de control
pd_sp_test['usr_load'] = config_pg['dev']['usr_developer']
pd_sp_test['dt_creation'] = datetime.datetime.now()
display(pd_sp_test)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pd_sp_test['usr_load'] = config_pg['dev']['usr_developer']
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pd_sp_test['dt_creation'] = datetime.datetime.now()


Unnamed: 0,EmployeeID,EmployeeKey,NameMain,LastName,Position,Mail,usr_load,dt_creation
17,517,66754,Thalia,Vega,Director of sales,thalia_vega@salesmx.com,hs_mx,2024-01-07 16:53:59.361534
42,542,80518,Jason,Monroy,Director of sales,jason_monroy@salesmx.com,hs_mx,2024-01-07 16:53:59.361534


In [9]:
tp_to_send = list(zip(*map(pd_sp_test.get, pd_sp_test))) # forma de enviar datos
display(tp_to_send)

[(517,
  66754,
  'Thalia',
  'Vega',
  'Director of sales',
  'thalia_vega@salesmx.com',
  'hs_mx',
  Timestamp('2024-01-07 16:53:59.361534')),
 (542,
  80518,
  'Jason',
  'Monroy',
  'Director of sales',
  'jason_monroy@salesmx.com',
  'hs_mx',
  Timestamp('2024-01-07 16:53:59.361534'))]

In [11]:
display(pd.read_sql_query(""" SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))

  display(pd.read_sql_query(""" SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))


Unnamed: 0,employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation


In [12]:
brz_sp_qry = """INSERT INTO brz_sales_mx.dim_salesperson (employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_sp_qry,
               tp_to_send)

conn_pg.commit()

In [13]:
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))


Unnamed: 0,employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation
0,517,66754,Thalia,Vega,Director of sales,thalia_vega@salesmx.com,hs_mx,2024-01-07 16:53:59.361534
1,542,80518,Jason,Monroy,Director of sales,jason_monroy@salesmx.com,hs_mx,2024-01-07 16:53:59.361534


Trunquemos la tabla viendo que la inserción fue correcta

In [14]:
curs_pg.execute("TRUNCATE brz_sales_mx.dim_salesperson")
conn_pg.commit()

In [15]:
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg))


Unnamed: 0,employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation


Insertemos los datos a las capas correspondientes

In [16]:
# Transform and Load to "Bronce" layer
pd_sp['usr_load'] = config_pg['dev']['usr_developer']
pd_sp['dt_creation'] = datetime.datetime.now()
tp_to_send = list(zip(*map(pd_sp.get, pd_sp)))

In [17]:
len(tp_to_send)

55

In [18]:
# INSERTING ALL DATA
brz_sp_qry = """INSERT INTO brz_sales_mx.dim_salesperson (employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_sp_qry,
               tp_to_send)

conn_pg.commit()

In [19]:
# PREVIEW OF DATA
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_salesperson""",con=conn_pg).head())


Unnamed: 0,employeeid,employeekey,namemain,lastname,position,mail,usr_load,ts_creation
0,500,90841,Nicki,Mendoza,Sales Representative,nicki_mendoza@salesmx.com,hs_mx,2024-01-07 16:54:19.871339
1,501,52104,Carlos,Cervantes,Sales Representative,carlos_cervantes@salesmx.com,hs_mx,2024-01-07 16:54:19.871339
2,502,74241,Sarah,Vega,Sales Representative,sarah_vega@salesmx.com,hs_mx,2024-01-07 16:54:19.871339
3,503,48316,Ramon,Xiao,Sales Representative,ramon_xiao@salesmx.com,hs_mx,2024-01-07 16:54:19.871339
4,504,26118,Sarah,Jiang,Manager,sarah_jiang@salesmx.com,hs_mx,2024-01-07 16:54:19.871339


In [20]:
# INSERTED RECORDS
display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_salesperson""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_salesperson""",con=conn_pg).head())


Unnamed: 0,count
0,55


### Inserción a la capa SILVER

Insertaremos datos a la tabla **slv_sales_mx.dim_salesperson**

In [21]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE slv_sales_mx.dim_salesperson")
conn_pg.commit()

In [22]:
# TRANSFORM AND LOAD TO SILVER LAYER
slv_sp_qry = f"""
WITH BASE AS
(
SELECT
	ROW_NUMBER() OVER(PARTITION BY CONCAT(NAMEMAIN,' ',LASTNAME) ORDER BY EMPLOYEEID DESC) AS ID_RN,
	employeeid AS EMPLOYEE_ID,
	employeekey AS EMPLOYEE_KEY,
	CONCAT(NAMEMAIN,' ',LASTNAME) AS FULL_NAME,
	position,
	mail,
	'{config_pg['dev']['usr_developer']}' AS USR_LOAD,
	CURRENT_TIMESTAMP AS TS_CREATION
FROM brz_sales_mx.dim_salesperson
)
INSERT INTO slv_sales_mx.dim_salesperson (employee_id,employee_key,full_name,position,mail,usr_load,ts_creation)
SELECT EMPLOYEE_ID,EMPLOYEE_KEY,FULL_NAME,POSITION,MAIL,USR_LOAD,TS_CREATION
FROM BASE
WHERE ID_RN = 1"""

curs_pg.execute(slv_sp_qry)
conn_pg.commit()

In [23]:
display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_salesperson""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_salesperson""",con=conn_pg).head())


Unnamed: 0,employee_id,employee_key,full_name,position,mail,usr_load,ts_creation
0,536,52677,Alejandro Monroy,Sales Representative,alejandro_monroy@salesmx.com,hs_mx,2024-01-07 16:54:26.531824
1,554,12198,Alejandro Parker,Manager,alejandro_parker@salesmx.com,hs_mx,2024-01-07 16:54:26.531824
2,521,86850,Alejandro Smith,Sales Representative,alejandro_smith@salesmx.com,hs_mx,2024-01-07 16:54:26.531824
3,531,23532,Alejandro Sotelo,Sales Representative,alejandro_sotelo@salesmx.com,hs_mx,2024-01-07 16:54:26.531824
4,533,99628,Alejandro Wayne,Sales Representative,alejandro_wayne@salesmx.com,hs_mx,2024-01-07 16:54:26.531824


In [24]:
display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_salesperson""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_salesperson""",con=conn_pg).head())


Unnamed: 0,count
0,50


## Ingesta de la tabla region

In [25]:
display(pd_region.head())

Unnamed: 0,TerritoryKey,Note,Rate
0,1,Northwest|United States|North America,0.15
1,2,Northeast|United States|North America,0.25
2,3,Central|United States|North America,0.64
3,4,Southwest|Mexico|North America,0.11
4,5,South MX|Mexico|North America,0.25


In [26]:
pd_region.shape

(13, 3)

### Inserción a la capa BRONCE

Insertaremos datos a la tabla **brz_sales_mx.dim_region**

In [27]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE brz_sales_mx.dim_region")
conn_pg.commit()

In [28]:
# SENDING DATA TO POSTGRESQL

# Agregando columnas de control
pd_region['usr_load'] = config_pg['dev']['usr_developer']
pd_region['dt_creation'] = datetime.datetime.now()

tp_lst_region = list(zip(*map(pd_region.get, pd_region))) # forma de enviar datos

brz_region_qry = """INSERT INTO brz_sales_mx.dim_region (territorykey,note,rate,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_region_qry,
               tp_lst_region)

conn_pg.commit()

In [29]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_region""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_region""",con=conn_pg).head())


Unnamed: 0,territorykey,note,rate,usr_load,ts_creation
0,1,Northwest|United States|North America,0.15,hs_mx,2024-01-07 16:54:34.658388
1,2,Northeast|United States|North America,0.25,hs_mx,2024-01-07 16:54:34.658388
2,3,Central|United States|North America,0.64,hs_mx,2024-01-07 16:54:34.658388
3,4,Southwest|Mexico|North America,0.11,hs_mx,2024-01-07 16:54:34.658388
4,5,South MX|Mexico|North America,0.25,hs_mx,2024-01-07 16:54:34.658388


In [30]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_region""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_region""",con=conn_pg).head())


Unnamed: 0,count
0,13


### Inserción a la capa SILVER

Insertaremos datos a la tabla **slv_sales_mx.dim_region**

In [31]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE slv_sales_mx.dim_region")
conn_pg.commit()

In [32]:
# TRANSFORM AND LOAD TO SILVER LAYER
slv_region_qry = f"""
INSERT INTO slv_sales_mx.dim_region (territory_id,region,country,continent_group,rate,usr_load,ts_creation)
SELECT
TERRITORYKEY AS TERRITORY_ID,
SPLIT_PART(NOTE, '|', 1) AS REGION,
SPLIT_PART(NOTE, '|', 2) AS COUNTRY,
SPLIT_PART(NOTE, '|', 3) AS CONTINENT_GROUP,
RATE,
'{config_pg['dev']['usr_developer']}' AS USR_LOAD,
CURRENT_TIMESTAMP AS TS_CREATION
FROM brz_sales_mx.dim_region"""

curs_pg.execute(slv_region_qry)
conn_pg.commit()

In [33]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_region""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_region""",con=conn_pg).head())


Unnamed: 0,territory_id,region,country,continent_group,rate,usr_load,ts_creation
0,1,Northwest,United States,North America,0.15,hs_mx,2024-01-07 16:54:41.465090
1,2,Northeast,United States,North America,0.25,hs_mx,2024-01-07 16:54:41.465090
2,3,Central,United States,North America,0.64,hs_mx,2024-01-07 16:54:41.465090
3,4,Southwest,Mexico,North America,0.11,hs_mx,2024-01-07 16:54:41.465090
4,5,South MX,Mexico,North America,0.25,hs_mx,2024-01-07 16:54:41.465090


In [34]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_region""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_region""",con=conn_pg).head())


Unnamed: 0,count
0,13


## Ingesta de la tabla productos

In [35]:
display(pd_products.head())

Unnamed: 0,ProductName,StandarCost,Category,Subcategory,Color,SizeStr,IndexStr,KeyProduct
0,Bikes Mountain,974.028973,Bikes,Mountain,Black,XS,0,BIKMOU00000
1,Bikes Mountain,2597.699463,Bikes,Mountain,Black,S,1,BIKMOU00001
2,Bikes Mountain,2260.763557,Bikes,Mountain,Black,M,2,BIKMOU00002
3,Bikes Mountain,720.622975,Bikes,Mountain,Black,L,3,BIKMOU00003
4,Bikes Mountain,7799.992997,Bikes,Mountain,Black,XL,4,BIKMOU00004


In [36]:
pd_products.shape

(801, 8)

### Inserción a la capa BRONCE

Insertaremos datos a la tabla **brz_sales_mx.dim_products**

In [37]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE brz_sales_mx.dim_products")
conn_pg.commit()

In [38]:
# SENDING DATA TO POSTGRESQL

# Agregando columnas de control
pd_products['usr_load'] = config_pg['dev']['usr_developer']
pd_products['dt_creation'] = datetime.datetime.now()

tp_lst_products = list(zip(*map(pd_products.get, pd_products))) # forma de enviar datos

brz_products_qry = """INSERT INTO brz_sales_mx.dim_products (productname,standarcost,category,subcategory,color,sizestr,indexstr,keyproduct,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_products_qry,
               tp_lst_products)

conn_pg.commit()

In [39]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_products""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_products""",con=conn_pg).head())


Unnamed: 0,productname,standarcost,category,subcategory,color,sizestr,indexstr,keyproduct,usr_load,ts_creation
0,Bikes Mountain,974.028973,Bikes,Mountain,Black,XS,0,BIKMOU00000,hs_mx,2024-01-07 16:54:51.625578
1,Bikes Mountain,2597.699463,Bikes,Mountain,Black,S,1,BIKMOU00001,hs_mx,2024-01-07 16:54:51.625578
2,Bikes Mountain,2260.763557,Bikes,Mountain,Black,M,2,BIKMOU00002,hs_mx,2024-01-07 16:54:51.625578
3,Bikes Mountain,720.622975,Bikes,Mountain,Black,L,3,BIKMOU00003,hs_mx,2024-01-07 16:54:51.625578
4,Bikes Mountain,7799.992997,Bikes,Mountain,Black,XL,4,BIKMOU00004,hs_mx,2024-01-07 16:54:51.625578


In [40]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_products""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_products""",con=conn_pg).head())


Unnamed: 0,count
0,801


### Inserción a la capa SILVER

Insertaremos datos a la tabla **slv_sales_mx.dim_products**

In [41]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE slv_sales_mx.dim_products")
conn_pg.commit()

In [42]:
# TRANSFORM AND LOAD TO SILVER LAYER
slv_products_qry = f"""
INSERT INTO slv_sales_mx.dim_products (product_id,product_name,standar_cost,category,subcategory,color_product,size_product,usr_load,ts_creation)
SELECT
KEYPRODUCT AS PRODUCT_ID,
PRODUCTNAME AS PRODUCT_NAME,
STANDARCOST AS STANDAR_COST,
CATEGORY,
SUBCATEGORY,
COLOR AS COLOR_PRODUCT,
SIZESTR AS SIZE_PRODUCT,
'{config_pg['dev']['usr_developer']}' AS USR_LOAD,
CURRENT_TIMESTAMP AS TS_CREATION
FROM brz_sales_mx.dim_products"""

curs_pg.execute(slv_products_qry)
conn_pg.commit()

In [43]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_products""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_products""",con=conn_pg).head())


Unnamed: 0,product_id,product_name,standar_cost,category,subcategory,color_product,size_product,usr_load,ts_creation
0,BIKMOU00000,Bikes Mountain,974.03,Bikes,Mountain,Black,XS,hs_mx,2024-01-07 16:54:58.827464
1,BIKMOU00001,Bikes Mountain,2597.7,Bikes,Mountain,Black,S,hs_mx,2024-01-07 16:54:58.827464
2,BIKMOU00002,Bikes Mountain,2260.76,Bikes,Mountain,Black,M,hs_mx,2024-01-07 16:54:58.827464
3,BIKMOU00003,Bikes Mountain,720.62,Bikes,Mountain,Black,L,hs_mx,2024-01-07 16:54:58.827464
4,BIKMOU00004,Bikes Mountain,7799.99,Bikes,Mountain,Black,XL,hs_mx,2024-01-07 16:54:58.827464


In [44]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_products""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_products""",con=conn_pg).head())


Unnamed: 0,count
0,801


## Ingesta de la tabla calendario

In [45]:
display(pd_dates.head())

Unnamed: 0,Date_dt,Year_dt,Month_dt,Day_dt,WeekDay_dt
0,2017-01-01,2017,1,1,6
1,2017-01-02,2017,1,2,0
2,2017-01-03,2017,1,3,1
3,2017-01-04,2017,1,4,2
4,2017-01-05,2017,1,5,3


In [46]:
pd_dates.shape

(2922, 5)

### Inserción a la capa BRONCE

Insertaremos datos a la tabla **brz_sales_mx.dim_calendar**

In [47]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE brz_sales_mx.dim_calendar")
conn_pg.commit()

In [48]:
# SENDING DATA TO POSTGRESQL

# Agregando columnas de control
pd_dates['usr_load'] = config_pg['dev']['usr_developer']
pd_dates['dt_creation'] = datetime.datetime.now()

tp_lst_cal = list(zip(*map(pd_dates.get, pd_dates))) # forma de enviar datos

brz_cal_qry = """INSERT INTO brz_sales_mx.dim_calendar (date_dt,year_dt,month_dt,day_dt,weekday_dt,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_cal_qry,
               tp_lst_cal)

conn_pg.commit()

In [49]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_calendar""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.dim_calendar""",con=conn_pg).head())


Unnamed: 0,date_dt,year_dt,month_dt,day_dt,weekday_dt,usr_load,ts_creation
0,2017-01-01,2017,1,1,6,hs_mx,2024-01-07 17:01:42.314832
1,2017-01-02,2017,1,2,0,hs_mx,2024-01-07 17:01:42.314832
2,2017-01-03,2017,1,3,1,hs_mx,2024-01-07 17:01:42.314832
3,2017-01-04,2017,1,4,2,hs_mx,2024-01-07 17:01:42.314832
4,2017-01-05,2017,1,5,3,hs_mx,2024-01-07 17:01:42.314832


In [50]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_calendar""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.dim_calendar""",con=conn_pg).head())


Unnamed: 0,count
0,2922


### Inserción a la capa SILVER

Insertaremos datos a la tabla **slv_sales_mx.dim_calendar**

In [51]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE slv_sales_mx.dim_calendar")
conn_pg.commit()

In [52]:
# TRANSFORM AND LOAD TO SILVER LAYER
slv_products_qry = f"""
INSERT INTO slv_sales_mx.dim_calendar (date_ymd,yr,mnth,dy,wk_dy,usr_load,ts_creation)
SELECT
DATE_DT AS DATE_YMD,
YEAR_DT AS YR,
MONTH_DT AS MNTH,
DAY_DT AS DY,
WEEKDAY_DT AS WK_DY,
'{config_pg['dev']['usr_developer']}' AS USR_LOAD,
CURRENT_TIMESTAMP AS TS_CREATION
FROM brz_sales_mx.dim_calendar"""

curs_pg.execute(slv_products_qry)
conn_pg.commit()

In [53]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_calendar""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.dim_calendar""",con=conn_pg).head())


Unnamed: 0,date_ymd,yr,mnth,dy,wk_dy,usr_load,ts_creation
0,2017-01-01,2017,1,1,6,hs_mx,2024-01-07 17:08:14.068497
1,2017-01-02,2017,1,2,0,hs_mx,2024-01-07 17:08:14.068497
2,2017-01-03,2017,1,3,1,hs_mx,2024-01-07 17:08:14.068497
3,2017-01-04,2017,1,4,2,hs_mx,2024-01-07 17:08:14.068497
4,2017-01-05,2017,1,5,3,hs_mx,2024-01-07 17:08:14.068497


In [54]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_calendar""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.dim_calendar""",con=conn_pg).head())


Unnamed: 0,count
0,2922


## Ingesta de la tabla ventas

In [55]:
display(pd_sales.head())

Unnamed: 0,OrderDate,KeyProduct,TerritoryKey,EmployeeKey,Quantity
0,2023-01-20,CLOSHO00129,1,508,5
1,2020-09-13,COMCHA00355,5,545,12
2,2021-08-08,BIKMOU00003,11,509,14
3,2021-11-02,COMPED00419,3,547,7
4,2020-08-18,BIKROA00070,7,531,14


In [56]:
pd_sales.shape

(350000, 5)

### Inserción a la capa BRONCE

Insertaremos datos a la tabla **brz_sales_mx.fact_sales**

In [57]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE brz_sales_mx.fact_sales")
conn_pg.commit()

In [58]:
# SENDING DATA TO POSTGRESQL

# Agregando columnas de control
pd_sales['usr_load'] = config_pg['dev']['usr_developer']
pd_sales['dt_creation'] = datetime.datetime.now()

tp_lst_sales = list(zip(*map(pd_sales.get, pd_sales))) # forma de enviar datos

brz_sales_qry = """INSERT INTO brz_sales_mx.fact_sales (orderdate,keyproduct,territorykey,employeekey,quantity,usr_load,ts_creation) VALUES %s"""

execute_values(curs_pg,
               brz_sales_qry,
               tp_lst_sales)

conn_pg.commit()

In [59]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.fact_sales""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM brz_sales_mx.fact_sales""",con=conn_pg).head())


Unnamed: 0,orderdate,keyproduct,territorykey,employeekey,quantity,usr_load,ts_creation
0,2023-01-20,CLOSHO00129,1,508,5,hs_mx,2024-01-07 17:14:39.234961
1,2020-09-13,COMCHA00355,5,545,12,hs_mx,2024-01-07 17:14:39.234961
2,2021-08-08,BIKMOU00003,11,509,14,hs_mx,2024-01-07 17:14:39.234961
3,2021-11-02,COMPED00419,3,547,7,hs_mx,2024-01-07 17:14:39.234961
4,2020-08-18,BIKROA00070,7,531,14,hs_mx,2024-01-07 17:14:39.234961


In [60]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.fact_sales""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM brz_sales_mx.fact_sales""",con=conn_pg).head())


Unnamed: 0,count
0,350000


### Inserción a la capa SILVER

Insertaremos datos a la tabla **slv_sales_mx.fact_sales**

In [61]:
# INSERT OVERWRITE
curs_pg.execute("TRUNCATE slv_sales_mx.fact_sales")
conn_pg.commit()

In [62]:
# TRANSFORM AND LOAD TO SILVER LAYER
slv_sales_qry = f"""
INSERT INTO slv_sales_mx.fact_sales (order_date,product_id,territory_id,employee_id,quantity,usr_load,ts_creation)
SELECT
ORDERDATE AS ORDER_DATE,
KEYPRODUCT AS PRODUCT_ID,
TERRITORYKEY AS TERRITORY_ID,
EMPLOYEEKEY AS EMPLOYEE_ID,
QUANTITY AS QUANTITY,
'{config_pg['dev']['usr_developer']}' AS USR_LOAD,
CURRENT_TIMESTAMP AS TS_CREATION
FROM brz_sales_mx.fact_sales"""

curs_pg.execute(slv_sales_qry)
conn_pg.commit()

In [63]:
# VERIFYING
display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.fact_sales""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT * FROM slv_sales_mx.fact_sales""",con=conn_pg).head())


Unnamed: 0,order_date,product_id,territory_id,employee_id,quantity,usr_load,ts_creation
0,2023-01-20,CLOSHO00129,1,508,5,hs_mx,2024-01-07 17:20:06.130650
1,2020-09-13,COMCHA00355,5,545,12,hs_mx,2024-01-07 17:20:06.130650
2,2021-08-08,BIKMOU00003,11,509,14,hs_mx,2024-01-07 17:20:06.130650
3,2021-11-02,COMPED00419,3,547,7,hs_mx,2024-01-07 17:20:06.130650
4,2020-08-18,BIKROA00070,7,531,14,hs_mx,2024-01-07 17:20:06.130650


In [64]:
# VERIFYING
display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.fact_sales""",con=conn_pg).head())

  display(pd.read_sql_query("""SELECT COUNT(*) FROM slv_sales_mx.fact_sales""",con=conn_pg).head())


Unnamed: 0,count
0,350000


In [65]:
conn_pg.close()

In [66]:
curs_pg.close()