## CoderHouse - Data Engineering - Kevin Schiebelbein - Entregable 2

> Antes de ejecutar el codigo se necesita instalar las dependencias con ` pip3 install -r requeriments.txt ` en la carpeta raiz del Notebook

Se importan las librearias necesarias

In [2]:
from utils import getFakeData
from dotenv import dotenv_values
import redshift_connector
import json
import pandas as pd
from datetime import date, timedelta
from sqlalchemy import create_engine

pd.set_option('display.max_columns', None)

Se define la configuracion para la consulta al Data Warehouse

In [None]:
config = dotenv_values(".env")
driver = config["DRIVER"]
host = config["HOST"]
db = config["DB"]
user = config["USER"]
password = config["PASSDW"]
port = config["PORT"]

Se extraen los datos de la API publica y se organizan los datos para la insercion

In [3]:
try:
  api = "https://fakestoreapi.com/products"
  result = getFakeData(api)
  products = json.loads(result.text)
  print(products)
  df = pd.DataFrame(products)
except Exception as e:
  print(e)


[{'id': 1, 'title': 'Fjallraven - Foldsack No. 1 Backpack, Fits 15 Laptops', 'price': 109.95, 'description': 'Your perfect pack for everyday use and walks in the forest. Stash your laptop (up to 15 inches) in the padded sleeve, your everyday', 'category': "men's clothing", 'image': 'https://fakestoreapi.com/img/81fPKd-2AYL._AC_SL1500_.jpg', 'rating': {'rate': 3.9, 'count': 120}}, {'id': 2, 'title': 'Mens Casual Premium Slim Fit T-Shirts ', 'price': 22.3, 'description': 'Slim-fitting style, contrast raglan long sleeve, three-button henley placket, light weight & soft fabric for breathable and comfortable wearing. And Solid stitched shirts with round neck made for durability and a great fit for casual fashion wear and diehard baseball fans. The Henley style round neckline includes a three-button placket.', 'category': "men's clothing", 'image': 'https://fakestoreapi.com/img/71-3HjGNDUL._AC_SY879._SX._UX._SY._UY_.jpg', 'rating': {'rate': 4.1, 'count': 259}}, {'id': 3, 'title': 'Mens Cotto

In [13]:
low_price = df[(df["price"] < 50)]
print(len(low_price))

9


In [20]:
low_price = df[(df["price"] < 50)]
low_price.count()

id             9
title          9
price          9
description    9
category       9
image          9
rating         9
dtype: int64

ETL - Asigno columna rate y count por separado de rating

In [None]:
df = pd.concat([df.drop(['rating'], axis=1), df['rating'].apply(pd.Series)], axis=1)

ETL - Verifico si el producto esta en oferta y lo agrego al dataframe

In [None]:
df['status'] = ['On Sale' if x < 50 else '' for x in df['price']]
df.head()

ETL - Agrego fecha de producto

In [None]:
df["fecha"] = date.today()
df.head()

Test funcionamiento insercion de datos - Agrego registros con el mismo ID pero con diferente fecha para la creacion de los registros en el DW, persistiendo lo historico

In [None]:
manana = date.today() + timedelta(days=1) 
df.loc[len(df.index)] = [20, "Producto nuevo 1 - Lente camara", 500, "Alguna description 1", "IT", "https://johnstillk8.scusd.edu/sites/main/files/main-images/camera_lense_0.jpeg", 3.5, 9, "On Sale", manana]
df.loc[len(df.index)] = [21, "Producto nuevo 2 - Notebook", 3000, "Alguna description 2", "IT", "https://www.oberlo.com/media/1603969791-image-1.jpg?fit=max&fm=webp&w=1824", 6, 25, "On Sale", manana]
pasado = date.today() + timedelta(days=2) 
df.loc[len(df.index)] = [21, "Producto nuevo editado 1 - Notebook", 3500, "Alguna description 2", "IT", "https://www.oberlo.com/media/1603969791-image-1.jpg?fit=max&fm=webp&w=1824", 6, 25, "On Sale", pasado]
df.tail()

ETL - Cantidad de productos por categoria

In [None]:
categories = df.groupby(["category"]).count()
categories

ETL - Top 10 productos

In [None]:
top_10_products = df.sort_values("rate", ascending=False).head(10)
top_10_products

ETL - Elimino la descripcion antes de guardar en RedShift

In [None]:
df = df.drop(['description'], axis=1)
df.head()

Se insertan los datos en el destino

In [None]:
# Connects to Redshift cluster using AWS credentials
connection = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
with redshift_connector.connect(host=host,database=db,user=user,password=password) as conn:
  with conn.cursor() as cursor:
    conn.autocommit = True
    cursor: redshift_connector.Cursor = conn.cursor()

    # Creo tabla stage para dejar los nuevos registros hasta que haga el insert incremental en la tabla final
    tabla_staging = f"""
      CREATE TABLE IF NOT EXISTS products_staging (
      id INTEGER,
      title VARCHAR(128),
      price FLOAT8,
      category VARCHAR(256),
      image TEXT,
      rate FLOAT8,
      count FLOAT8,
      status VARCHAR(32),
      fecha DATE
      ) DISTKEY(id) SORTKEY(rate);
    """
    truncate_table = "TRUNCATE TABLE products_staging;"
    try:
      cursor.execute(tabla_staging)
      cursor.execute(truncate_table)
      df.to_sql('products_staging', connection, index=False, if_exists='replace')
      try:
        tabla = f"""
          CREATE TABLE IF NOT EXISTS products (
          id INTEGER,
          title VARCHAR(128),
          price FLOAT8,
          category VARCHAR(256),
          image TEXT,
          rate FLOAT8,
          count FLOAT8,
          status VARCHAR(32),
          fecha DATE
          ) DISTKEY(id) SORTKEY(rate);
        """
        cursor.execute(tabla)

        # Comparo identifico los registros nuevos en la tabla staging y los guardo en la tabla final
        cursor.execute("begin transaction;")
        cursor.execute(f"delete from products using products_staging where products.id = products_staging.id and products.fecha >= '{date.today()}'")
        cursor.execute(f"insert into products select * from products_staging")
        cursor.execute("end transaction;")
        cursor.close()
      except Exception as e:
        cursor.close()
        print(f"Error al guardar los datos: {e}") 
    except Exception as e:
      print(f"Error al guardar los datos: {e}")