In [1]:
from IPython.display import HTML
etl = '<center><img src="https://www.dataprix.com/files/uploads/32image/Respinosamilla_bi/etl_chart.jpg" width="400" ></center>'
etl2 = '<center><img src="https://upload.wikimedia.org/wikipedia/commons/f/fa/Conventional_ETL_Diagram.jpg" width="1000" ></center>'

import sqlalchemy #orm
import pandas as pd #manejo de tablas
from sqlalchemy.orm import sessionmaker
import requests # solicitudes http
import json 
from datetime import datetime
import datetime
import sqlite3 #bd

from dotenv import load_dotenv, dotenv_values #pip install python-dotenv

- Nombre: Nicolás Otárola Teillier
- Profesor: Juan Pablo Salazar
- Ramo: Arquitectura de Sistemas
- Fecha: 04/10/2021
- Git: https://github.com/NicolasOtarolaTeillier/arquitectura-sistemas-2021-2
- Api Spotify: https://developer.spotify.com/console/get-recently-played/
- Cliente sql: https://dbeaver.io/
- Referencias:
 - https://www.youtube.com/watch?v=eg8t2-E69ew
 - https://aprenderbigdata.com/herramientas-etl/
 - https://www.obsbusiness.school/blog/herramientas-etl-que-son-y-cuales-recomendamos
 - https://www.dataprix.com/es/blog-it/respinosamilla/herramientas-etl-son-valen-productos-mas-conocidos-etls-open-source
 - https://github.com/karolina-sowinska/free-data-engineering-course-for-beginners/blob/master/main.py
 - https://www.youtube.com/channel/UCAxnMry1lETl47xQWABvH7g


### Indice:

1. ¿Para que sirven?
2. ¿Cómo funcionan?
3. ¿Cuáles son las más usadas?
4. Ejemplo práctico

 # Herramientas ETL y el patrón de tuberías

### Conceptos clave:

- **Pipeline (Tubería)**: 
 - para los igenieros de datos, **es una ruta de un extremo a otro**, donde cada ruta tiene una o varias fuentes y sistemas de destino para acceder y manipular los datos disponibles. Dentro de cada tubería, los datos pasan por numerosas etapas de transformacion, validacion, normalizacion.
 
- **Extraer (Extract)**. Consiste en la extracción de datos de fuentes heterogéneas.

- **Transformar (Transform**). De los datos en bruto extraídos, pasamos a transformarlos en información y conocimiento útiles para la empresa y para sus objetivos. (limpiar, normalizar, validacion, etc)

- **Cargar (Load)**. Con los datos “purificados”, es decir, convertidos en información útil, pasamos a almacenar esta en un mismo lugar. Es lo que se conoce como almacén de datos o data mart. 

## 1 ¿Para que sirven?

- permite a las organizaciones mover datos desde múltiples fuentes, reformatearlos y limpiarlos, y cargarlos en otra base de datos, data mart, o data warehouse para analizar, o en otro sistema operacional para apoyar un proceso de negocio.

In [2]:
HTML(etl)

## 2 ¿Cómo funcionan?

In [3]:
HTML(etl2)

##### Algunas Funcionalidades generales de los ETL
- Control de la extracción de los datos y su automatización, disminuyendo el tiempo empleado en el descubrimiento de procesos no documentados, minimizando el margen de error y permitiendo mayor flexibilidad.
- Acceso a diferentes tecnologías, haciendo un uso efectivo del hardware, software, datos y recursos humanos existentes.
- Proporcionar la gestión integrada del Data Warehouse y los Data Marts existentes, integrando la extracción, transformación y carga para la construcción del Data Warehouse corporativo y de los Data Marts.
- Uso de la arquitectura de metadatos, facilitando la definición de los objetos de negocio y las reglas de consolidación.
- Acceso a una gran variedad de fuentes de datos diferentes.
- Manejo de excepciones.

## 3 ¿Cuáles son las más usadas?

https://aprenderbigdata.com/herramientas-etl/

## 4 Ejemplo

##### VERIFICACIONES

In [4]:
def check_if_valid_data(df: pd.DataFrame)-> bool:
    # Check if datafreame is empty 
    if df.empty:
        print("no song downloaded, finishing execution")
        return False
        
    # Primary Key Check
    if pd.Series(df['played_at']).is_unique:
        pass
    else:
        raise Exception("Primary Key Check is violated")
    
    # Check for nulls
    if df.isnull().values.any():
        raise Exception("Null valued found")

    # Check that all timestamps are of yesterday's date
    #yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
    #yesterday = yesterday.replace(hour=0, minute=0, second=0,microsecond=0)
    
    #timestamps = df["timestamp"].tolist()
    #for timestamp in tçimestamps:
    #    if datetime.datetime.strptime(timestamp,"%Y-%m-%d") != yesterday:
    #        raise Exception("At least one of the returned songs does not come from within the last 24 hours")
    return True

##### ETL

In [5]:
def ETL(TOKEN,DATABASE_LOCATION, LIMITE, LISTA_REPRODUCCION):
       
                                             # headers for request                             
    headers ={
        "Accept" : "application/json",
        "Content-Type" : "application/json",
        "Authorization" : "Bearer {token}".format(token=TOKEN)
    }
                                            # ETL process : Extract
        
    r = requests.get("{endpoint}?limit={limit}".format(endpoint = LISTA_REPRODUCCION,limit=LIMITE),headers=headers)
    data = r.json()
    print(data)
    
    try :
        data["error"] 
        print("Error : ",data["error"])
        return
    except:
        print("Response recived!")
    
                                            # ETL process : Transform
        
    song_names, artist_names, played_at_list= [],[],[]
    for song in data["items"]:

        song_names.append(song["track"]["name"])
        artist_names.append(song["track"]["album"]["artists"][0]["name"])
        played_at_list.append(song["played_at"])
        
    song_dict = {"song_name": song_names,"artist_name": artist_names,"played_at" : played_at_list}
    
                                        
        
    clean_song_df = pd.DataFrame(song_dict, columns = ["song_name","artist_name","played_at"])
    
                                                 # Validate
    if check_if_valid_data(clean_song_df):
        print("Data valid, proceed to Load stage")
        
                                            # ETL process : Load
    
    engine = sqlalchemy.create_engine(DATABASE_LOCATION)
    conn = sqlite3.connect('my_played_tracks.sqlite')
    cursosr = conn.cursor()
    
    sql_query = """
    CREATE TABLE IF NOT EXISTS my_played_tracks(
        song_name VARCHAR(200),
        artist_name VARCHAR(200),
        played_at VARCHAR(200),
        CONSTRAINT primary_key_constraint PRIMARY KEY (played_at)
    )
    """
    cursosr.execute(sql_query)
    print("Opened database succesfully")
    try :
        clean_song_df.to_sql("my_played_tracks",engine, index=False, if_exists='append')
    except:
        print("Data already exist in the database")
    
    
    display(clean_song_df)
    
    
    

 * Ejemplo de configuración de variables de entorno : 

~~~
TOKEN=BQDZjCRsJWbhf5C8npbQtYnOe
DB=sqlite:///my_played_tracks.sqlite
~~~

In [None]:
%load_ext dotenv
%dotenv
env = dotenv_values(".env") # importar variables de entorno
DATABASE_LOCATION = env["DB"]
TOKEN = env["TOKEN"]
LIMITE = 20
ENDPOINT = "https://api.spotify.com/v1/me/player/recently-played"

ETL(TOKEN, DATABASE_LOCATION, LIMITE, ENDPOINT) # ejecutamos el ETL