
En el presente proyecto se procederá a realizar el proceso ETL mediante el lenguaje de programación Python y la librería de SQLAlchemy que tiene la misma función que otras librerías como sqlite3. A continuación se presenta los pasos que se realizarán para el proceso ETL:


1. Primero, se usará sólamente la tabla Invoice de la base de datos chinook.db
2. Para el proceso de transformación se realizará lo siguiente:
-Se calculará la facturación promedio por factura por país
-Se procesará los valores nulos en las facturas de tal forma que se los rellene con "Ninguno" y "12345".
3. Se cargará los datos transformados dentro de la base de datos chinook.db en una nueva tabla que se llamará "New_Invoice".

In [None]:
import pandas as pd

In [None]:
import sqlalchemy

In [None]:
sqlalchemy.__version__

'1.4.31'

In [None]:
# Conexión con la base de datos

connection_uri = "sqlite:///chinook.db"


In [None]:
# Creando un engine: db_engine
db_engine = sqlalchemy.create_engine(connection_uri)

## ETL
### 1. Preprocessing of Extract

In [None]:
# Realizamos un inspector con el engine anterior creado para buscar el nombre de las tablas de la base de datos
inspector = sqlalchemy.inspect(db_engine)
table_names = inspector.get_table_names()
table_names

['New_Invoice',
 'albums',
 'artists',
 'customers',
 'employees',
 'genres',
 'invoice_items',
 'invoices',
 'media_types',
 'playlist_track',
 'playlists',
 'sqlite_sequence',
 'sqlite_stat1',
 'tracks']

In [None]:
# Abrimos conexión con el engine: con
con = db_engine.connect()

In [None]:
# Se realiza una pequeña consulta de prueba
qr_invoice = con.execute("SELECT * FROM invoices")

type(qr_invoice)

sqlalchemy.engine.cursor.LegacyCursorResult

In [None]:
qr_invoice.keys()

RMKeyView(['InvoiceId', 'CustomerId', 'InvoiceDate', 'BillingAddress', 'BillingCity', 'BillingState', 'BillingCountry', 'BillingPostalCode', 'Total'])

In [None]:
df = pd.DataFrame(qr_invoice.fetchall())
df.columns = qr_invoice.keys()

In [None]:
df

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86
...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,,Portugal,,8.91
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,,Finland,00530,13.86


### 2. Preproceso de transformación
Se calculará la facturación promedio por factura por país -Se procesará los valores nulos en las facturas de tal forma que se los rellene con "Ninguno" y "12345".

Se cargará los datos transformados dentro de la base de datos chinook.db en una nueva tabla que se llamará "New_Invoice".

Cálculo del primedio de facturación por factura por país.

In [None]:
# Se calcula el promedio de factoración por país
df_g = df.groupby(['BillingCountry'])[['Total']].mean()
df_g

Unnamed: 0_level_0,Total
BillingCountry,Unnamed: 1_level_1
Argentina,5.374286
Australia,5.374286
Austria,6.088571
Belgium,5.374286
Brazil,5.431429
Canada,5.427857
Chile,6.66
Czech Republic,6.445714
Denmark,5.374286
Finland,5.945714


In [None]:
df_g = df_g.reset_index()
df_g.rename(columns = {"Total":"Promedio"}, inplace=True)
df_g.head(5)

Unnamed: 0,BillingCountry,Promedio
0,Argentina,5.374286
1,Australia,5.374286
2,Austria,6.088571
3,Belgium,5.374286
4,Brazil,5.431429


In [None]:
# Uniendo el promedio calculado en el conjunto de datos
df = df.merge(df_g, how="left", left_on = "BillingCountry", right_on = "BillingCountry")
df.head(5)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Promedio
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96,5.66
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912


##### Procesando los valores nulos en los atributos y llenándolos con "ninguno" y "12345".

In [None]:
# Se realiza una consulta para verificar el número de campos nulos en cada atributo
display('Número de campos nulos por cada atributo', df.isnull().sum())

'Número de campos nulos por cada atributo'

InvoiceId              0
CustomerId             0
InvoiceDate            0
BillingAddress         0
BillingCity            0
BillingState         202
BillingCountry         0
BillingPostalCode     28
Total                  0
Promedio               0
dtype: int64

In [None]:
 df= df.fillna({"BillingState": "Ninguno", "BillingPostalCode":"12345"})

In [None]:
display('Número de campos nulos por cada atributo: ', df.isnull().sum())

'Número de campos nulos por cada atributo: '

InvoiceId            0
CustomerId           0
InvoiceDate          0
BillingAddress       0
BillingCity          0
BillingState         0
BillingCountry       0
BillingPostalCode    0
Total                0
Promedio             0
dtype: int64

In [None]:
df

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Promedio
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,Ninguno,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,Ninguno,Norway,0171,3.96,5.660000
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,Ninguno,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912
...,...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96,5.747912
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94,5.427857
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,Ninguno,Portugal,12345,8.91,5.517143
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,Ninguno,Finland,00530,13.86,5.945714


### 3. Preproceso de carga
* Se comienza a hacer la carga al  sql

In [None]:
con

<sqlalchemy.engine.base.Connection at 0x7fb25c67e450>

In [None]:
sqlite_table = "New_Invoice"
df.to_sql(sqlite_table, con, if_exists='fail')

ValueError: ignored

In [None]:
con.close()

### 4. Creación de las funciones ETL

Funciónes de estracción: 
Nombre de la función: **extract_table_to_pandas**

In [None]:
# Función para extraer a través del engine y la conexión de la base de datos. 
def extract_database(path):
    '''
    Input:
        path: Path of the database
        
    Output:
        return 
        db_engine : The engine is get database
        db_connect: The connect is get connection with engine
    '''
    db_engine = sqlalchemy.create_engine(path)
    db_connect = db_engine.connect()
    
    return db_engine, db_connect

In [None]:
# Función para extraer la tabla hacia el dataframe

def extract_table_to_pandas(tablename, db_connect):
    '''
    Input:
        Table Name: Name of the table to be extracted
        db_connect: The connect to get connection with engine
    Output:
        return 
        df        : The dataframe to be transformed
    '''
    query = "SELECT * FROM {}".format(tablename)
    result = db_connect.execute(query)

    df = pd.DataFrame(result.fetchall())
    df.columns = result.keys()
    
    return df

Creando funciones de transformación: 
Nombre de la función:
- **transform_avg_billing**
- **transform_fill_null**

In [None]:
# Función de transformación para el promedio 
def transform_avg_billing(data):
    '''
    Group by Country and extract average billing per Invoice
    Input:
        data : The dataframe to get to transform
    Output:
        return dataframe transformed
    '''
    # Cálculo del promedio por país
    df_g = data.groupby(['BillingCountry'])[['Total']].mean()
    df_g = df_g.reset_index()
    df_g.rename(columns = {"Total":"Average"}, inplace=True)
    
    df = data.merge(df_g, how="left", left_on = "BillingCountry", right_on = "BillingCountry")

    return df

In [None]:
# Función de transformación para rellenar los valores faltantes
def transform_fill_null(data):
    '''
    Some processing about Null values
        "BillingState" > "None"
        "BillingPostalCode" > "99999"
    Input:
        data : The dataframe to get to transform
    Output:
        return dataframe transformed
    '''
    # Proceso de llenado de los valores restantes
    data = data.fillna({"BillingState": "None", "BillingPostalCode":"99999"})

    return data

Creación de las funciones de carga: 

In [None]:
# Función para cargar un dataframe dentro de una tabla en la base de datos
def loading_to_sql(data, connect, sqlite_table='New_Table'):
    '''
    Input:
        data        : The dataframe to get to loading
        connect     : Name of the engine connection 
        sqlite_table: Table name for loaing
    '''
   
    data.to_sql(sqlite_table, connect, if_exists='fail')
    connect.close()
    return 'Loading is Done'

##### 4.4. Testing functions of ETL

In [None]:
# Main donde se ejecutan las funciones
path = "sqlite:///chinook.db"

# Extrayendo de la base de datos la tabla invoices
extract_db = extract_database(path)

tablename = 'invoices'
engine = extract_db[0]
extract = extract_table_to_pandas(tablename, engine)

# Transformación
transform = transform_avg_billing(extract)
transform = transform_fill_null(transform)

# Carga
data = transform
connect = extract_db[1]
sqlite_table
data.head


<bound method NDFrame.head of      InvoiceId  CustomerId  ...  Total   Average
0            1           2  ...   1.98  5.588571
1            2           4  ...   3.96  5.660000
2            3           8  ...   5.94  5.374286
3            4          14  ...   8.91  5.427857
4            5          23  ...  13.86  5.747912
..         ...         ...  ...    ...       ...
407        408          25  ...   3.96  5.747912
408        409          29  ...   5.94  5.427857
409        410          35  ...   8.91  5.517143
410        411          44  ...  13.86  5.945714
411        412          58  ...   1.99  5.789231

[412 rows x 10 columns]>

Exception during reset or similar
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/pool/base.py", line 739, in _finalize_fairy
    fairy._reset(pool)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/pool/base.py", line 988, in _reset
    pool._dialect.do_rollback(self)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 682, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 139776164472704 and this is thread id 139775504013056.
Exception closing connection <sqlite3.Connection object at 0x7f200356de30>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/pool/base.py", line 739, in _finalize_fairy
    fairy._reset(pool)
  File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/pool/base.py", line 988, in _reset
    pool._dialect.do_rol

In [None]:
# Resultados obtenidos
transform

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Average
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96,5.660000
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912
...,...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96,5.747912
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94,5.427857
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,,Portugal,99999,8.91,5.517143
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,,Finland,00530,13.86,5.945714


In [None]:
# Verificamos que ningun atributo esté vacío
transform.isnull().sum()

InvoiceId            0
CustomerId           0
InvoiceDate          0
BillingAddress       0
BillingCity          0
BillingState         0
BillingCountry       0
BillingPostalCode    0
Total                0
Average              0
dtype: int64

Cerramos la conexión con la base de datos para evitar errores.

In [None]:

con.close()

In [None]:
# Consulta final de la nueva tabla creada y ver los datos que contiene para darnos cuenta que los campos han sido rellenados con "ninguno" 
# y "12345"
inspector.get_columns('New_Invoice')

with engine.connect() as con:
    
    rs = con.execute('SELECT * FROM New_Invoice')
    
    for row in rs:
        print(row)
con.close()

(0, 1, 2, '2009-01-01 00:00:00', 'Theodor-Heuss-Straße 34', 'Stuttgart', 'None', 'Germany', '70174', 1.98, 5.588571428571428)
(1, 2, 4, '2009-01-02 00:00:00', 'Ullevålsveien 14', 'Oslo', 'None', 'Norway', '0171', 3.96, 5.659999999999999)
(2, 3, 8, '2009-01-03 00:00:00', 'Grétrystraat 63', 'Brussels', 'None', 'Belgium', '1000', 5.94, 5.374285714285714)
(3, 4, 14, '2009-01-06 00:00:00', '8210 111 ST NW', 'Edmonton', 'AB', 'Canada', 'T6G 2C7', 8.91, 5.427857142857142)
(4, 5, 23, '2009-01-11 00:00:00', '69 Salem Street', 'Boston', 'MA', 'USA', '2113', 13.86, 5.747912087912091)
(5, 6, 37, '2009-01-19 00:00:00', 'Berger Straße 10', 'Frankfurt', 'None', 'Germany', '60316', 0.99, 5.588571428571428)
(6, 7, 38, '2009-02-01 00:00:00', 'Barbarossastraße 19', 'Berlin', 'None', 'Germany', '10779', 1.98, 5.588571428571428)
(7, 8, 40, '2009-02-01 00:00:00', '8, Rue Hanovre', 'Paris', 'None', 'France', '75002', 1.98, 5.574285714285712)
(8, 9, 42, '2009-02-02 00:00:00', '9, Place Louis Barthou', 'Bordea