<a href="https://colab.research.google.com/github/Carlos090989/Data-Engineering/blob/main/Data_Extractor_from_API.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install requests
!pip install pandas
!pip install parquet
!pip install  -q fastparquet
!pip install sqlalchemy
!pip install psycopg2-binary

Collecting parquet
  Downloading parquet-1.3.1-py3-none-any.whl (24 kB)
Collecting thriftpy2 (from parquet)
  Downloading thriftpy2-0.5.1.tar.gz (781 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m781.6/781.6 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting ply<4.0,>=3.4 (from thriftpy2->parquet)
  Downloading ply-3.11-py2.py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: thriftpy2
  Building wheel for thriftpy2 (pyproject.toml) ... [?25l[?25hdone
  Created wheel for thriftpy2: filename=thriftpy2-0.5.1-cp310-cp310-linux_x86_64.whl size=1749593 sha256=9cf89c18e0f7c1596d9e3067864a37c0494ac5b99af38dbe659211b3030013cd
  Stored in directory: /root/.cache

In [None]:
import requests
import pandas as pd
import parquet
import fastparquet
import os
from pprint import pprint
from datetime import datetime, timedelta
import sqlalchemy as sa
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from configparser import ConfigParser

In [None]:
# Extraccion de datos estaticos. Tasas de intereses de distintos bancos.
base_url = 'https://api.argentinadatos.com/v1/'
endpoint = '/finanzas/tasas/plazoFijo'
data_field = 'results'


def get_data(base_url, endpoint, data_field, params=None, headers=None):
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()
        try:
            data = response.json()


        except KeyError:
            print("El formato de respuesta no es el esperado")
            return None
        return data
    except requests.exceptions.RequestException as e:
        print(f"La petición ha fallado. Código de error: {e}")
        return None

json_data = get_data(base_url, endpoint, data_field)
df_tasas = pd.json_normalize(json_data)
df_tasas.head(25)

Unnamed: 0,entidad,logo,tnaClientes,tnaNoClientes,enlace
0,BANCO DE LA NACION ARGENTINA,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.33,,
1,BANCO SANTANDER ARGENTINA S.A.,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.31,,
2,BANCO DE GALICIA Y BUENOS AIRES S.A.U.,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.3,,
3,BANCO DE LA PROVINCIA DE BUENOS AIRES,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.33,,
4,BANCO BBVA ARGENTINA S.A.,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.31,,
5,BANCO MACRO S.A.,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.34,,
6,HSBC BANK ARGENTINA S.A.,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.31,,
7,BANCO CREDICOOP COOPERATIVO LIMITADO,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.33,0.33,https://www.bancocredicoop.coop/personas/solic...
8,INDUSTRIAL AND COMMERCIAL BANK OF CHINA (ARGEN...,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.31,,
9,BANCO DE LA CIUDAD DE BUENOS AIRES,https://www.bcra.gob.ar/Imagenes/logosbancos1/...,0.31,,


In [None]:
# Creo directorio de almacenamiento.
os.makedirs('datalake/bronze/argentinadatos/tasas', exist_ok=True)

In [None]:
# Proceso almacenamiento.
df_tasas.to_parquet(
    "datalake/bronze/argentinadatos/tasas/data.parquet",
    engine="fastparquet"
    )

In [None]:
# Extraccion incremental, actualizados cada una hr, sobre el valor de dolar, en cada tipo de casa de venta.
endpoint = "cotizaciones/dolares"
datetime.utcnow()
start_date = datetime.utcnow() - timedelta(hours=1)

end_date = start_date.strftime("%Y-%m-%dt%H:59:59")
start_date = start_date.strftime("%Y-%m-%dt%H:00:00")

params = {
    "start": start_date,
    "end": end_date
    }
cotizaciones = get_data(base_url, endpoint, data_field)
df_cotizaciones = pd.json_normalize(cotizaciones)
df_cotizaciones.head()

Unnamed: 0,casa,compra,venta,fecha
0,blue,4.0,4.0,2011-01-03
1,mayorista,3.97,3.98,2011-01-03
2,oficial,4.0,4.0,2011-01-03
3,blue,4.0,4.0,2011-01-04
4,mayorista,3.97,3.98,2011-01-04


In [None]:
# Creo directorio de almacenamiento.
os.makedirs('datalake/bronze/argentinadatos/dolar', exist_ok=True)

In [None]:
# Almaceno, particionando los datos,  por los tipos de la columna "Casa"
df_cotizaciones.to_parquet(
    "datalake/bronze/argentinadatos/dolar/",
    engine="fastparquet",
    partition_cols=["casa"]
    )

In [None]:
# dentro de una de la particiones, en este caso "Casa=blue", particiono por fecha, año y mes.
df_cotizaciones["fecha"] = pd.to_datetime(df_cotizaciones.fecha)
df_cotizaciones["Año"] = df_cotizaciones.fecha.dt.year
df_cotizaciones["Mes"] = df_cotizaciones.fecha.dt.month

df_cotizaciones.to_parquet(
    "datalake/bronze/argentinadatos/dolar/casa=blue",
    engine="fastparquet",
    partition_cols=["Año", "Mes"]
    )


In [None]:
# Renombro columnas
df_tasas = df_tasas.rename(
    columns= {
        "tnaClientes":"Tasa Nominal Anual (Clientes)",
        "tnaNoClientes":"Tasa Nominal Anual (No Clientes)",
        "enlace":"Web",
        "entidad":"Entidad"
    }
)

In [None]:
# Mapeo datos nulos.
imputation_mapping ={
    "Web":-1,
    "Tasa Nominal Anual (No Clientes)":0.0
}
df_tasas = df_tasas.fillna(imputation_mapping)

In [None]:
# Elimino la columna "Logo" ya que la creo innecesaria y ocupa memoria.
df_tasas = df_tasas.drop(columns=["logo"])

In [None]:
# Paso a mayuscula la primera letra de los valores de la columna ENTIDAD.
df_tasas["Entidad"] = df_tasas["Entidad"].str.capitalize()

In [None]:
# Creo una columna para asignarle un ID a cada entidad.
df_tasas['Id'] = df_tasas.index + 1

**Asi quedarian los datos procesados.**

In [None]:
df_tasas.head(25)

Unnamed: 0,Entidad,Tasa Nominal Anual (Clientes),Tasa Nominal Anual (No Clientes),Web,Id
0,Banco de la nacion argentina,0.33,0.0,-1,1
1,Banco santander argentina s.a.,0.31,0.0,-1,2
2,Banco de galicia y buenos aires s.a.u.,0.3,0.0,-1,3
3,Banco de la provincia de buenos aires,0.33,0.0,-1,4
4,Banco bbva argentina s.a.,0.31,0.0,-1,5
5,Banco macro s.a.,0.34,0.0,-1,6
6,Hsbc bank argentina s.a.,0.31,0.0,-1,7
7,Banco credicoop cooperativo limitado,0.33,0.33,https://www.bancocredicoop.coop/personas/solic...,8
8,Industrial and commercial bank of china (argen...,0.31,0.0,-1,9
9,Banco de la ciudad de buenos aires,0.31,0.0,-1,10


In [None]:
# Creo conexion.

parser = ConfigParser()
parser.read('pipeline.config')
credencial_conexion = parser['conexion_database']
host = credencial_conexion['host']
port = credencial_conexion['port']
database = credencial_conexion['database']
username = credencial_conexion['username']
password = credencial_conexion['password']

conn_string = f"postgresql://{username}:{password}@{host}:{port}/{database}"

engine = create_engine(conn_string)

In [None]:
# Creo tabla
query = text("""
CREATE TABLE IF NOT EXISTS public.tasas (
    id INT PRIMARY KEY,
    entidad VARCHAR(100),
    tna_clientes FLOAT,
    tna_noclientes FLOAT
);
""")

# Ejecutamos la query
with engine.connect() as conn, conn.begin():
    conn.execute(query)

OperationalError: (psycopg2.OperationalError) could not translate host name ""cursodata-cursodataengineer1.l.aivencloud.com"" to address: Name or service not known

(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
# Renombro columnas para que coincidan con la creada en la nube.
df_tasas = df_tasas[["Id","Entidad", "Tasa Nominal Anual (Clientes)", "Tasa Nominal Anual (No Clientes)", "Web"]]
df_tasas.columns = ["id","entidad", "tna_clientes", "tna_noclientes", "web"]

In [None]:
with engine.connect() as conn, conn.begin():
    df_tasas.head(25).to_sql(
        "tasas",
        schema="public",
        con=conn,
        if_exists="append",
        method="multi",
        index=False
    )

OperationalError: (psycopg2.OperationalError) could not translate host name ""cursodata-cursodataengineer1.l.aivencloud.com"" to address: Name or service not known

(Background on this error at: https://sqlalche.me/e/20/e3q8)