# Data pipeline from Excel files to Big Query Data Warehouse with Python

## Data pipeline concept

Data pipelines collect, transform, and store diverse data sources. *ETL and ELT are subprocesses of data pipelines*.

A data pipeline is a method whereby raw data is ingested from various data sources and then stored into a data lake or data warehouse.

Before data flows into a data repository, commonly some data processing is applied.

Data processing may include: 

* Filtering, 
* Masking or labeling 
* Aggregations 

The above steps ensure appropriate data quality, above all, when the destination is a relational database wherein data columns and types should match to update existing with new data.


For further information, you can check out: [What is a data pipeline?](https://www.ibm.com/topics/data-pipeline)

## Environment settings

In [14]:
import numpy as np
import pandas as pd
import polars as pl
import gspread
import duckdb
# Import authenticator and gspread to manage g-sheets
from oauth2client.service_account import ServiceAccountCredentials
from google.oauth2 import service_account
from google.cloud import bigquery
#import connectorx as cx
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Create sheets and drive scopes to authenticate
scopes = ['https://www.googleapis.com/auth/spreadsheets',
        'https://www.googleapis.com/auth/drive',
        'https://www.googleapis.com/auth/analytics.readonly']

# Read google credentials
api = '../APIS/gepp-538-db.json'
# connect to google sheets
gs_credentials = ServiceAccountCredentials.from_json_keyfile_name(api, scopes)
gc = gspread.authorize(gs_credentials)
# connect to big query
bq_credentials = service_account.Credentials.from_service_account_file(api)
project_id = 'gepp-538'
client = bigquery.Client(credentials=bq_credentials,project=project_id)

## SQL queries with Duckdb

In [27]:
# consulta detallada usuarios
mpe_users = pl.read_csv('../usuarios.csv', separator=';', ignore_errors=True)

In [28]:
# create sql queries with duckdb
# read file with duckdb (csv, parquet, json)
duckdb.sql(
    '''
    SELECT
        Origen, COUNT(Nombre) AS usuarios
    FROM '../usuarios.csv'
    GROUP BY Origen
    ORDER BY usuarios DESC
    '''
).pl() # and pipeline to polars dataframe
#others formats include: .df() for pandas, .arrow() for arrow, and .fetchnumpy() for numpy arrays

Origen,usuarios
str,i64
"""WA""",765173
"""Ecommerce""",97913


**create files from duckdb queries**

* ` duckdb.sql('SELECT 42').write_parquet('out.parquet')` # Write to a Parquet file
* ` duckdb.sql('SELECT 42').write_csv('out.csv')` # Write to a CSV file
* ` duckdb.sql("COPY (SELECT 42) TO 'out.parquet'")` # Copy to a parquet file

## Create functions to upload Excel data sources 

In [47]:
def mpe_usuarios(file):

    mpe_usuarios = (
    pl.read_excel(file, sheet_name='Cantidades'
        ).select(
        pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
        ).with_columns(pl.col('fecha').str.strptime(pl.Datetime, strict=False)
        )
    )
    return mpe_usuarios

In [83]:
def mpe_pedidos(file):
    
    mpe_general = (
        pl.read_excel(file, sheet_name='Pedidos General',
        read_csv_options={'infer_schema_length':0})
    .select(
        pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
    ).rename(
    {
        'fecha_modificación':'fecha_modificacion',
        'hora_modificación':'hora_modificacion',
        'teléfono_de_contacto':'tel_contacto',
        'alias_de_dirección':'alias_dir',
        'dirección_completa':'dir_completa',
    }
    ).with_columns(
    pl.col('id_bodega').cast(pl.Int64),
    pl.col('cantidad_de_productos').cast(pl.Int64),
    pl.col('total_de_orden').cast(pl.Float64),
    pl.col('promociones').cast(pl.Int64),
    pl.col('ruta_de_entrega').cast(pl.Float64),
    pl.col('nud').cast(pl.Float64),
    pl.col('fecha_de_pedido').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
    pl.col('fecha_modificacion').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
    pl.col('fecha_entrega').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
    ).drop('nombre_completo')
    )
    return mpe_general

In [11]:
def mpe_detalle(file):
    
    mpe_det = (pl.read_excel(file, sheet_name='Pedidos Detalle',
        read_csv_options={'infer_schema_length':0})
    .select(
        pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
    ).rename(
    {
        'clave_de_promoción':'clave_de_promocion',
        'producto(s)':'productos'
    }
    ).with_columns(
    pl.col('cantidad').cast(pl.Int64),
    pl.col('precio_por_unidad').cast(pl.Float64),
    pl.col('descuento_por_unidad').cast(pl.Float64),
    pl.col('precio_acumulado').cast(pl.Float64),
    pl.col('descuento_acumulado').cast(pl.Float64),
    )
    )
    return mpe_det

In [84]:
# Access MPE bodegas worksheet by url
sheet_id = '1RdEyiPZULVGwy274uPsB6NKxg6MtWCw9rHwcuDyCwyw'
workbook = gc.open_by_key(sheet_id)
# Access data by worksheet sheet number
mpe_bodegas = workbook.worksheet('warehouses')
# Save data to table
mpe_bodegas = mpe_bodegas.get_all_values()
# Save accessed data from google sheets to dataframes
mpe_bodegas = pd.DataFrame(mpe_bodegas[1:],columns=mpe_bodegas[0])
# convert to polars dataframe and manipulate data
mpe_bodegas = pl.from_pandas(mpe_bodegas).select(
    pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
    ).with_columns(pl.col('clave_bodega').cast(pl.Int64)
    ).rename({'clave_bodega':'id_bodega'}
    ).drop(['clave_bodega_destino','tipo_preventa_tradicional','clave_bodega_origen',]
    )

## Execute functions to upload files

In [49]:
mpe_usuarios = mpe_usuarios('~/Downloads/mpe-usuarios.xlsx')

In [13]:
mpe_detalle = mpe_detalle('~/Downloads/mpe-pedidos.xlsx')

In [85]:
mpe_pedidos = mpe_pedidos('~/Downloads/mpe-pedidos.xlsx').join(mpe_bodegas, on='id_bodega', how='left')

## Create functions to upload other Excel data sources

In [144]:
# Pedidos General
def getm_pedidos(file):
    
    getm_pedidos = (
    pl.read_excel('~/Downloads/getm-pedidos.xlsx', sheet_name='Pedidos General', 
    read_csv_options={'infer_schema_length':0}
    ).select(
        pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
    ).rename({
        'fecha_de_creación':'fecha_de_creacion',
        'hora_de_creación':'hora_de_creacion',
        'fecha_de_modificación':'fecha_de_modificacion',
        'hora_de_modificación':'hora_de_modificacion',
    }).with_columns(
        pl.col('id_bodega').cast(pl.Int64),
        pl.col('nud').cast(pl.Int64),
        pl.col('rutappp').cast(pl.Int64),
        pl.col('ruta_de_desarrollo').cast(pl.Int64),
        pl.col('rutavpp').cast(pl.Int64),
        pl.col('monto_total').cast(pl.Float64),
        pl.col('total_de_productos').cast(pl.Int64),
        pl.col('promociones').cast(pl.Int64),
        pl.col('fecha_de_creacion').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
        pl.col('fecha_de_modificacion').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
        pl.col('fecha_de_despacho').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
        pl.col('fecha_de_entrega').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
        pl.col('fecha_cancelado').str.strptime(pl.Datetime, format='%d-%m-%Y', strict=False),
    ).drop('nombre_del_cliente_registrado')
    )
    return getm_pedidos

In [168]:
# Pedidos Detalle
def getm_detalle(file):
    
    getm_detalle = (
    pl.read_excel(file, sheet_name='Pedidos Detalle', 
    read_csv_options={'infer_schema_length':0}
    ).select(
        pl.col('*').map_alias(lambda col_name: col_name.lower().replace(' ', '_'))
    ).rename({
        'clave_de_promoción':'clave_de_promocion',
        'tipo_de_promoción':'tipo_de_promocion',
        'descripción_de_promoción':'descripcion_de_promocion',
        'producto(s)':'productos',
    }).with_columns(
        pl.col('id_bodega').cast(pl.Int64),
        pl.col('nud').cast(pl.Int64),
        pl.col('cantidad').cast(pl.Float64),
        pl.col('precio_por_unidad').cast(pl.Float64),
        pl.col('descuento_por_unidad').cast(pl.Float64),
        pl.col('precio_acumulado').cast(pl.Float64),
        pl.col('descuento_acumulado').cast(pl.Float64),
    )
    )
    return getm_detalle

## Execute functions

In [145]:
getm_pedidos = getm_pedidos('~/Downloads/getm-pedidos.xlsx')

In [169]:
getm_detalle = getm_detalle('~/Downloads/getm-pedidos.xlsx')

## Connect to Google Sheets to ingest more data sources

In [172]:
# Access GETM push notifications worksheet by url
sheet_id = '1STkVkyDdCjEtTAgMOWbptf8oRqUBMPRV1ufi-fuY8xk'
workbook = gc.open_by_key(sheet_id)
# Access data by worksheet sheet number
getm_push = workbook.worksheet('Push Notifications')
# Save data to table
getm_values_push = getm_push.get_all_values()
# Save accessed data from google sheets to dataframes
getm_push = pd.DataFrame(getm_values_push[1:],columns=getm_values_push[0])
# drop irrelevant columns
getm_push = getm_push.drop(['Año','Mes','Proyecto','Publicación','Caracteres','Espacios','Tipo','ID'],
    axis=1)

In [173]:
# GETM push notifications
# convert to lowercase
getm_push.columns = getm_push.columns.str.lower()
# convert to datetime
getm_push['fecha'] = getm_push['fecha'].apply(pd.to_datetime, dayfirst=True)
# replace "," with noaught and convert to numeric
getm_push['clientes'] = getm_push['clientes'].str.replace(',', '', regex=True).apply(pd.to_numeric)
# rename columns
getm_push = getm_push.rename(columns={'categorias':'categoria','área':'area'})
# drop empty rows
getm_push.dropna(subset=['fecha'], inplace=True)

## Send extracted data sources to Big Query Data Warehouse

### Create Big Query schema (first upload)

In [72]:
# create dataset gbq api
#client.create_dataset('back_office')

Dataset(DatasetReference('gepp-538', 'back_office'))

### Store excel data sources in Big Query

In [50]:
mpe_usuarios = mpe_usuarios.to_pandas()
mpe_usuarios.to_gbq('gepp-538.back_office.mpe_usuarios',
                    project_id='gepp-538',
                    if_exists='append',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 9177.91it/s]


In [87]:
mpe_pedidos = mpe_pedidos.to_pandas()
mpe_pedidos.to_gbq('gepp-538.back_office.mpe_pedidos',
                    project_id='gepp-538',
                    if_exists='append',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 2457.12it/s]


In [52]:
mpe_detalle = mpe_detalle.to_pandas()
mpe_detalle.to_gbq('gepp-538.back_office.mpe_detalle',
                    project_id='gepp-538',
                    if_exists='append',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 7108.99it/s]


### Store more data sources

In [147]:
getm_pedidos = getm_pedidos.to_pandas()
getm_pedidos.to_gbq('gepp-538.back_office.getm_pedidos',
                    project_id='gepp-538',
                    if_exists='append',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 2716.52it/s]


In [171]:
getm_detalle = getm_detalle.to_pandas()
getm_detalle.to_gbq('gepp-538.back_office.getm_detalle',
                    project_id='gepp-538',
                    if_exists='append',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 8630.26it/s]


### Store Google Sheets data sources

In [176]:
getm_push.to_gbq('gepp-538.back_office.getm_notifications',
                    project_id='gepp-538',
                    if_exists='replace',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 7294.44it/s]


### Query datasets stored in Big Query

#### Create query and save data with pandas

In [15]:
# create query to bigquery
query = '''
    SELECT * FROM `gepp-538.transformation.getm_push_notifications`
'''
# convert query to pandas dataframe
mpe_catalogo = pd.read_gbq(query, credentials=bq_credentials)
mpe_catalogo.sample(10)

Unnamed: 0,fecha,categoria,creatividad,clientes,region,area
797,2022-07-27 00:00:00+00:00,Ejecución,No olvides tus fotos,,,Digital
932,2022-03-14 00:00:00+00:00,Contenido Premiados,¿Ya realizaste tu rutina? 🤸,85.0,Distribuidores,Palancas
583,2022-06-22 00:00:00+00:00,Califícanos,¡Compártenos tus ideas!✅,22138.0,Norte,Digital
166,2023-04-25 00:00:00+00:00,Juegos,Tómate un respiro 😄,39766.0,Metro,
353,2022-11-18 00:00:00+00:00,Ejecución,No olvides compartirnos tus fotos 📷,1782.0,Pacífico,
14,2022-07-09 00:00:00+00:00,Juegos,¿Ya revisaste tu app?,29202.0,Centro,Digital
1103,2021-11-24 00:00:00+00:00,Palancas,¿Ya hiciste el Canje?,6544.0,Metro,Palancas
341,2022-11-14 00:00:00+00:00,Ejecución,¿Ya mandaste tu foto? 🤔​,1803.0,Pacífico,
338,2022-11-14 00:00:00+00:00,Ejecución,¿Ya mandaste tu foto? 🤔​,2991.0,Metro,
322,2022-10-26 00:00:00+00:00,Ejecución,¿Tu enfriador está impecable? ✨​,4970.0,Occidente,


#### Create query and save data with polars

In [12]:
# create query to bigquery
QUERY = ('''
    SELECT * FROM `gepp-538.transformation.getm_push_notifications`
'''
        )
# API request
query_job = client.query(QUERY)
# Waits for query to finish
rows = query_job.result()
# convert query to polars dataframe
pl_df = pl.from_arrow(rows.to_arrow())
pl_df.sample(10)

fecha,categoria,creatividad,clientes,region,area
"datetime[μs, UTC]",str,str,f64,str,str
2022-10-18 00:00:00 UTC,"""Juegos""","""Mejora tu punt…",41310.0,"""Occidente""",""""""
2022-10-11 00:00:00 UTC,"""Contenido Prem…","""Conoce más de …",3297.0,"""Pacífico""",""""""
2022-10-03 00:00:00 UTC,"""Ejecución""","""¡Que no se te …",4939.0,"""Norte""",""""""
2022-08-25 00:00:00 UTC,"""Ejecución""","""¡Es tiempo de …",4600.0,"""Occidente""",""""""
2021-11-11 00:00:00 UTC,"""Palancas""","""¡Felicidades 🎉…",386.0,"""Centro""","""Palancas"""
2023-01-30 00:00:00 UTC,"""Juegos""","""¿Un tiempo lib…",14527.0,"""Sur""",""""""
2022-08-16 00:00:00 UTC,"""Califícanos""","""¿Tienes algún …",30968.0,"""Metro""",""""""
2021-11-15 00:00:00 UTC,"""Palancas""","""¿Ya sabes cómo…",101.0,"""Distribuidores…","""Palancas"""
2023-04-26 00:00:00 UTC,"""Ejecución""","""¿Qué tal unas …",2838.0,"""Metro""",""""""
2021-11-12 00:00:00 UTC,"""Palancas""","""¿Estás listo p…",8333.0,"""Occidente""","""Palancas"""


## Contact

<!-- Avatar -->
<img src="../Pictures/profile2.png" alt="me" width="75" height="80">
<!-- Text with color, font, fontsize and specific size -->
<p style="color:#323232; font-family: Helevetica; font-size: 20px;">Jesus L. Monroy<br>Economist | Data Scientist</p>
<!-- Insert url links in logos -->
<!-- Telegram -->
<a href="https://t.me/j3suslm" target="_blank" rel="noreferrer"> <img src="https://upload.wikimedia.org/wikipedia/commons/thumb/e/ef/Telegram_X_2019_Logo.svg/2048px-Telegram_X_2019_Logo.png?size=16&color=3b3b3b" alt="telegram" width="30" height="22" style="padding-left:8px"/>
<!-- Twitter -->
<a href="https://www.twitter.com/sqlalchemist" target="_blank" rel="noreferrer"> <img src="https://toppng.com/public/uploads/preview/twitter-x-new-logo-round-icon-png-11692480241tdbz6jparr.webp?size=16&color=3b3b3b" alt="twitter" width="30" height="22" style="padding-left:8px"/>
<!-- Github -->
<a href="https://github.com/SqlAlchemist/My-portfolio" target="_blank" rel="noreferrer"> <img src="https://icongr.am/devicon/github-original.svg?size=16&color=3b3b3b" alt="github" width="30" height="30" style="padding-left:8px"/>
<!-- Linkedin -->
<a href="https://www.linkedin.com/in/j3sus-lmonroy" target="_blank" rel="noreferrer"> <img src="https://icongr.am/simple/linkedin.svg?size=16&color=3b3b3b" alt="linkedin" width="30" height="30" style="padding-left:8px"/>
<!-- Medium -->
<a href="https://medium.com/@jesus_lmonroy" target="_blank" rel="noreferrer"> <img src="https://cdn1.iconfinder.com/data/icons/social-media-and-logos-12/32/Logo_medium-512.png?size=55&color=3b3b3b" alt="medium" width="30" height="33" style="padding-left:8px"/>