## Extract

In [None]:
import pandas as pd
from sqlalchemy import text
import sys
import os
# sys.path.append(os.path.join(os.getcwd(),'..'))
# sys.path.append('c:\\Users\\andres\\Desktop\\SEPTIMO SEMESTRE\\DATA SCIENCE\\etlexample\\proyectoetl\\utils\\')
from connection import connect
from translate_language import convert_language
from model_loader import ModelRegistry


# Conexion
co_oltp, etl_conn, etl_conn_or = connect()



# Extraccion de datos desde el OLTP
# Se basa en humanresources.employee + person.person + department

query_employee = text("""
SELECT
    e.business_entity_id AS employee_alternate_key,
    p.title,
    p.first_name,
    p.middle_name,
    p.last_name,
    p.suffix,
    e.gender,
    e.marital_status,
    e.birth_date,
    e.hire_date,
    e.salaried_flag,
    e.vacation_hours,
    e.sick_leave_hours,
    e.current_flag,
    e.organization_level,
    e.job_title,
    e.login_id,
    ea.email_address,
    e.national_idnumber as employee_national_idalternate_key,
    pp.phone_number AS phone,
    d.name AS department_name,
    h.start_date,
    h.end_date
FROM hr.employee AS e
INNER JOIN person.person AS p
    ON e.business_entity_id = p.business_entity_id
LEFT JOIN person.email_address AS ea
    ON p.business_entity_id = ea.business_entity_id
LEFT JOIN person.person_phone AS pp
    ON p.business_entity_id = pp.business_entity_id
LEFT JOIN hr.employee_department_history AS h
    ON e.business_entity_id = h.business_entity_id
LEFT JOIN hr.department AS d
    ON h.department_id = d.department_id
""")

contact_table = pd.read_sql("""
SELECT
    p.first_name || ' ' || p.last_name AS emergency_contact_name,
    t1.phone_number AS emergency_contact_phone,
    e.national_idnumber 
FROM
    hr.employee AS e
JOIN
    person.person AS p ON e.business_entity_id = p.business_entity_id
JOIN
    -- Subquery to rank and select the best phone number
    (
        SELECT
            pp.business_entity_id,
            pp.phone_number,
            -- Assign a rank based on the phone number type
            ROW_NUMBER() OVER (
                PARTITION BY pp.business_entity_id 
                ORDER BY
                    CASE pt.name
                        WHEN 'Work' THEN 1  
                        WHEN 'Home' THEN 2
                        WHEN 'Cell' THEN 3  
                        ELSE 4
                    END
            ) AS rn
        FROM
            person.person_phone AS pp
        JOIN
            person.phone_number_type AS pt ON pt.phone_number_type_id = pp.phone_number_type_id
    ) AS t1 ON t1.business_entity_id = e.business_entity_id
WHERE
    t1.rn = 1; -- Select only the row with the highest priority (rank 1)
 


""",co_oltp)

# Enlaza con DimSalesTerritory (territorio de ventas)
df_terr = pd.read_sql(
    text("SELECT sales_territory_key, sales_territory_alternate_key FROM dim_sales_territory;"),
    etl_conn
)

# Algunos empleados pueden tener territorio asignado si son vendedores
# (se usa la tabla sales.sales_person)
sales_person = pd.read_sql(
    text("SELECT business_entity_id AS employee_alternate_key, territory_id FROM sales.sales_person;"),
    co_oltp
)

sales_person = pd.read_sql(
    text("SELECT business_entity_id AS employee_alternate_key, territory_id FROM sales.sales_person;"),
    co_oltp
)

pay_frec_t=pd.read_sql("""
SELECT
    E.national_idnumber,
    LatestPay.pay_frequency
FROM
    hr.employee AS E
-- LEFT JOIN LATERAL is the exact equivalent of OUTER APPLY
LEFT JOIN LATERAL (
    SELECT
        PH.pay_frequency
    FROM
        hr.employee_pay_history AS PH
    WHERE
        PH.business_entity_id = E.business_entity_id
    ORDER BY
        PH.rate_change_date DESC
    LIMIT 1 
) AS LatestPay ON true;
""",co_oltp)

table_base_rate=pd.read_sql("""
                            SELECT e.national_idnumber,h.rate as base_rate
                            FROM hr.employee as e
                            JOIN hr.employee_pay_history as h
                            ON e.business_entity_id=h.business_entity_id
                            WHERE h.rate_change_date=(
                            SELECT MAX(rate_change_date)
                            FROM hr.employee_pay_history
                            WHERE business_entity_id=e.business_entity_id
                            )
                            
                            """,co_oltp)





## Transform

In [44]:
registry = ModelRegistry()
registry.preload_model('en', 'es')
registry.preload_model('en', 'fr')
tokenizer_es, model_es = registry.get_model('en', 'es')
tokenizer_fr, model_fr = registry.get_model('en', 'fr')

df_emp = pd.read_sql(query_employee, co_oltp)
print(f"Registros extraidos: {len(df_emp)}")
print(df_emp.head(3))



df_emp = df_emp.merge(
    sales_person,
    on='employee_alternate_key',
    how='left'
)

df_emp = df_emp.merge(
    df_terr,
    left_on='territory_id',
    right_on='sales_territory_alternate_key',
    how='left'
).drop(['territory_id', 'sales_territory_alternate_key'], axis=1)

# Transformaciones
df_emp['name_style'] = 0
df_emp['sales_person_flag'] = df_emp['sales_territory_key'].notnull().astype(int)
df_emp['current_flag'] = df_emp['current_flag'].astype(int)

# Limpia columnas textuales nulas
for col in ['department_name', 'title', 'job_title']:
    df_emp[col] = df_emp[col].fillna('Unknown')



# Traduccion opcional de departamentos
df_emp = convert_language('department_name', 'department_name_es', tokenizer_es,model_es,df_emp)
df_emp = convert_language('department_name', 'department_name_fr', tokenizer_fr,model_fr,df_emp)


# Selecciona las columnas finales 
final_columns = [
    'employee_alternate_key',
    'sales_territory_key',
    'first_name',
    'last_name',
    'middle_name',
    'title',
    'gender',
    'marital_status',
    'birth_date',
    'hire_date',
    'login_id',
    'email_address',
    'phone',
    'salaried_flag',
    'vacation_hours',
    'sick_leave_hours',
    'current_flag',
    'sales_person_flag',
    'department_name',
    'start_date',
    'end_date'
]

ModelRegistry inicializado.
--- Loading model Helsinki-NLP/opus-mt-en-es (this should only happen once). ---




--- Model Helsinki-NLP/opus-mt-en-es loaded and cached. ---
--- Loading model Helsinki-NLP/opus-mt-en-fr (this should only happen once). ---
--- Model Helsinki-NLP/opus-mt-en-fr loaded and cached. ---
Registros extraidos: 296
   employee_alternate_key title first_name middle_name   last_name suffix  \
0                       1  None        Ken           J     Sánchez   None   
1                       2  None      Terri         Lee       Duffy   None   
2                       3  None    Roberto        None  Tamburello   None   

  gender marital_status  birth_date   hire_date  ...  current_flag  \
0      M              S  1969-01-29  2009-01-14  ...          True   
1      F              S  1971-08-01  2008-01-31  ...          True   
2      M              M  1974-11-12  2007-11-11  ...          True   

   organization_level                      job_title  \
0                 NaN        Chief Executive Officer   
1                 1.0  Vice President of Engineering   
2               

In [45]:
df_emp = pd.read_sql(query_employee, co_oltp)
print(f"Registros extraidos: {len(df_emp)}")
print(df_emp.head(3))

# Enlaza con DimSalesTerritory (territorio de ventas)
df_terr = pd.read_sql(
    text("SELECT sales_territory_key, sales_territory_alternate_key FROM dim_sales_territory;"),
    etl_conn
)

# Algunos empleados pueden tener territorio asignado si son vendedores
# (se usa la tabla sales.sales_person)


df_emp = df_emp.merge(
    sales_person,
    on='employee_alternate_key',
    how='left'
)

df_emp = df_emp.merge(
    df_terr,
    left_on='territory_id',
    right_on='sales_territory_alternate_key',
    how='left'
).drop(['territory_id', 'sales_territory_alternate_key'], axis=1)

# Transformaciones
df_emp['name_style'] = 0
df_emp['sales_person_flag'] = df_emp['sales_territory_key'].notnull().astype(int)
df_emp['current_flag'] = df_emp['current_flag'].astype(int)

# Limpia columnas textuales nulas
for col in ['department_name', 'title', 'job_title']:
    df_emp[col] = df_emp[col].fillna('Unknown')



# Traduccion opcional de departamentos
df_emp = convert_language('department_name', 'department_name_es', tokenizer_es,model_es,df_emp)
df_emp = convert_language('department_name', 'department_name_fr', tokenizer_fr,model_fr,df_emp)


# Selecciona las columnas finales 
final_columns = [
    'employee_alternate_key',
    'sales_territory_key',
    'first_name',
    'last_name',
    'middle_name',
    'title',
    'gender',
    'marital_status',
    'birth_date',
    'hire_date',
    'login_id',
    'email_address',
    'phone',
    'salaried_flag',
    'vacation_hours',
    'sick_leave_hours',
    'current_flag',
    'sales_person_flag',
    'department_name',
    'start_date',
    'end_date'
]

Registros extraidos: 296
   employee_alternate_key title first_name middle_name   last_name suffix  \
0                       1  None        Ken           J     Sánchez   None   
1                       2  None      Terri         Lee       Duffy   None   
2                       3  None    Roberto        None  Tamburello   None   

  gender marital_status  birth_date   hire_date  ...  current_flag  \
0      M              S  1969-01-29  2009-01-14  ...          True   
1      F              S  1971-08-01  2008-01-31  ...          True   
2      M              M  1974-11-12  2007-11-11  ...          True   

   organization_level                      job_title  \
0                 NaN        Chief Executive Officer   
1                 1.0  Vice President of Engineering   
2                 2.0            Engineering Manager   

                   login_id                 email_address  \
0      adventure-works\ken0      ken0@adventure-works.com   
1    adventure-works\terri0    terri0@

In [46]:
# base_rate

table_base_rate

df_emp=df_emp.merge(
    table_base_rate,
    left_on='employee_national_idalternate_key',
    right_on='national_idnumber',
    how='left'
)

df_emp



Unnamed: 0,employee_alternate_key,title,first_name,middle_name,last_name,suffix,gender,marital_status,birth_date,hire_date,...,department_name,start_date,end_date,sales_territory_key,name_style,sales_person_flag,department_name_es,department_name_fr,national_idnumber,base_rate
0,1,Unknown,Ken,J,Sánchez,,M,S,1969-01-29,2009-01-14,...,Executive,2009-01-14,,,0,0,Ejecutivo,Pouvoir exécutif,295847284,125.5000
1,2,Unknown,Terri,Lee,Duffy,,F,S,1971-08-01,2008-01-31,...,Engineering,2008-01-31,,,0,0,Ingeniería,Ingénierie,245797967,63.4615
2,3,Unknown,Roberto,,Tamburello,,M,M,1974-11-12,2007-11-11,...,Engineering,2007-11-11,,,0,0,Ingeniería,Ingénierie,509647174,43.2692
3,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,Tool Design,2010-05-31,,,0,0,Diseño de herramientas,Conception d'outils,112457891,29.8462
4,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,Engineering,2007-12-05,2010-05-30,,0,0,Ingeniería,Ingénierie,112457891,29.8462
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
291,286,Unknown,Lynn,N,Tsoflias,,F,S,1977-02-14,2013-05-30,...,Sales,2013-05-30,,9.0,0,1,Ventas,Ventes,758596752,23.0769
292,287,Unknown,Amy,E,Alberts,,F,M,1957-09-20,2012-04-16,...,Sales,2012-04-16,,,0,0,Ventas,Ventes,982310417,48.1010
293,288,Unknown,Rachel,B,Valdez,,F,S,1975-07-09,2013-05-30,...,Sales,2013-05-30,,8.0,0,1,Ventas,Ventes,954276278,23.0769
294,289,Unknown,Jae,B,Pak,,F,M,1968-03-17,2012-05-30,...,Sales,2012-05-30,,10.0,0,1,Ventas,Ventes,668991357,23.0769


In [47]:
df_emp=df_emp.merge(
    contact_table,
    left_on='employee_national_idalternate_key',
    right_on='national_idnumber',
    how='left'
)

In [48]:
def get_sales_employee_image(row):

    filename_path=f"../images/employee/employee_{row['employee_national_idalternate_key']}.png"
    with open(filename_path,'rb') as f:
        image_bytes=f.read()
    return image_bytes

df_emp['employee_photo']=df_emp.apply(get_sales_employee_image,axis=1)


df_emp

Unnamed: 0,employee_alternate_key,title,first_name,middle_name,last_name,suffix,gender,marital_status,birth_date,hire_date,...,name_style,sales_person_flag,department_name_es,department_name_fr,national_idnumber_x,base_rate,emergency_contact_name,emergency_contact_phone,national_idnumber_y,employee_photo
0,1,Unknown,Ken,J,Sánchez,,M,S,1969-01-29,2009-01-14,...,0,0,Ejecutivo,Pouvoir exécutif,295847284,125.5000,Ken Sánchez,697-555-0142,295847284,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
1,2,Unknown,Terri,Lee,Duffy,,F,S,1971-08-01,2008-01-31,...,0,0,Ingeniería,Ingénierie,245797967,63.4615,Terri Duffy,819-555-0175,245797967,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
2,3,Unknown,Roberto,,Tamburello,,M,M,1974-11-12,2007-11-11,...,0,0,Ingeniería,Ingénierie,509647174,43.2692,Roberto Tamburello,212-555-0187,509647174,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
3,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,0,0,Diseño de herramientas,Conception d'outils,112457891,29.8462,Rob Walters,612-555-0100,112457891,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
4,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,0,0,Ingeniería,Ingénierie,112457891,29.8462,Rob Walters,612-555-0100,112457891,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
291,286,Unknown,Lynn,N,Tsoflias,,F,S,1977-02-14,2013-05-30,...,0,1,Ventas,Ventes,758596752,23.0769,Lynn Tsoflias,1 (11) 500 555-0190,758596752,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
292,287,Unknown,Amy,E,Alberts,,F,M,1957-09-20,2012-04-16,...,0,0,Ventas,Ventes,982310417,48.1010,Amy Alberts,775-555-0164,982310417,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
293,288,Unknown,Rachel,B,Valdez,,F,S,1975-07-09,2013-05-30,...,0,1,Ventas,Ventes,954276278,23.0769,Rachel Valdez,1 (11) 500 555-0140,954276278,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...
294,289,Unknown,Jae,B,Pak,,F,M,1968-03-17,2012-05-30,...,0,1,Ventas,Ventes,668991357,23.0769,Jae Pak,1 (11) 500 555-0145,668991357,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...


In [49]:
#pay_ferecuency

df_emp=df_emp.merge(
    pay_frec_t,
    right_on='national_idnumber',
    left_on='employee_national_idalternate_key',
    how='left'
)

In [50]:
#status

df_emp['status']=df_emp['end_date'].apply(lambda x: 'Current' if x is None else None)
df_emp

Unnamed: 0,employee_alternate_key,title,first_name,middle_name,last_name,suffix,gender,marital_status,birth_date,hire_date,...,department_name_fr,national_idnumber_x,base_rate,emergency_contact_name,emergency_contact_phone,national_idnumber_y,employee_photo,national_idnumber,pay_frequency,status
0,1,Unknown,Ken,J,Sánchez,,M,S,1969-01-29,2009-01-14,...,Pouvoir exécutif,295847284,125.5000,Ken Sánchez,697-555-0142,295847284,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,295847284,2,Current
1,2,Unknown,Terri,Lee,Duffy,,F,S,1971-08-01,2008-01-31,...,Ingénierie,245797967,63.4615,Terri Duffy,819-555-0175,245797967,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,245797967,2,Current
2,3,Unknown,Roberto,,Tamburello,,M,M,1974-11-12,2007-11-11,...,Ingénierie,509647174,43.2692,Roberto Tamburello,212-555-0187,509647174,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,509647174,2,Current
3,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,Conception d'outils,112457891,29.8462,Rob Walters,612-555-0100,112457891,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,112457891,2,Current
4,4,Unknown,Rob,,Walters,,M,S,1974-12-23,2007-12-05,...,Ingénierie,112457891,29.8462,Rob Walters,612-555-0100,112457891,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,112457891,2,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
291,286,Unknown,Lynn,N,Tsoflias,,F,S,1977-02-14,2013-05-30,...,Ventes,758596752,23.0769,Lynn Tsoflias,1 (11) 500 555-0190,758596752,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...,758596752,2,Current
292,287,Unknown,Amy,E,Alberts,,F,M,1957-09-20,2012-04-16,...,Ventes,982310417,48.1010,Amy Alberts,775-555-0164,982310417,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,982310417,2,Current
293,288,Unknown,Rachel,B,Valdez,,F,S,1975-07-09,2013-05-30,...,Ventes,954276278,23.0769,Rachel Valdez,1 (11) 500 555-0140,954276278,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,954276278,2,Current
294,289,Unknown,Jae,B,Pak,,F,M,1968-03-17,2012-05-30,...,Ventes,668991357,23.0769,Jae Pak,1 (11) 500 555-0145,668991357,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01...,668991357,2,Current


In [51]:
df_emp=df_emp.drop(['national_idnumber_x',
 'department_name_es',
 'suffix',
 'organization_level',
 'national_idnumber_y',
 'employee_alternate_key',
 'department_name_fr',
 'job_title',
 'national_idnumber'],axis=1)


df_emp=df_emp.rename(columns={'employee_national_idalternate_key':'employee_national_id_alternate_key'})

df_emp['salaried_flag']=df_emp['salaried_flag'].apply(lambda x: 1 if x else 0)

## Load

In [52]:
df_emp.to_sql("dim_employee",etl_conn,if_exists='append',index=False)

296