In [None]:
# Instalación de librerías:

%pip install pandas
%pip install requests
%pip install psycopg2
%pip install datetime

In [112]:
# Importación de librerías a utilizar:

import os
import datetime
import json
import pandas as pd
import requests as req
import psycopg2 as pg2
from psycopg2.extras import execute_values

In [113]:
# Seteo de variables presentes en el archivo '.env' y en el archivo 'variables.json':

CLAVE_API = os.environ.get("API_KEY")
USUARIO_BD = os.environ.get("DB_USER")
CONTRASENA_BD = os.environ.get("DB_PASSWORD")
HOST_BD = os.environ.get("DB_HOST")
PUERTO_BD = os.environ.get("DB_PORT")
NOMBRE_BD = os.environ.get("DB_NAME")

with open('variables.json') as json_file:
        variables_json = json.load(json_file)

nombres = variables_json["Nombres_Empresas"]
columnas_df = variables_json["Columnas_DataFrame"]
dict_aws = variables_json["Columnas_Redshift"]
list_aws = list(variables_json["Columnas_Redshift"].keys())

In [114]:
# Configuración para mostrar un número definido de registros y campos

pd.set_option('display.max_rows', 10)
pd.set_option('display.max_columns', 10)

In [115]:
# Extracción desde la API de los datos diarios de "Los 7 Magníficos" (Alphabet, Amazon, Apple, Meta, Microsoft, Nvidia y Tesla) correspondiente al NASDAQ tomando en cuenta el último año (sin contar Sabados y Domingos):

api = 'https://api.twelvedata.com/time_series?symbol=GOOGL,AMZN,AAPL,META,MSFT,NVDA,TSLA&exchange=NASDAQ&interval=1day&format=JSON&outputsize=260&apikey=' + CLAVE_API
peticion = req.get(api)

peticion

In [116]:
# Conversión de datos a formato JSON:

datos_json = peticion.json()

datos_json

In [117]:
# Normalización de datos y agregado de columnas:

datos_df = pd.DataFrame()
claves = list(datos_json.keys())
contador = 0

for empresa in datos_json:
    tabla = pd.json_normalize(datos_json[empresa]['values'])
    tabla['Codigo'] = claves[contador]
    tabla['Empresa'] = nombres[claves[contador]]
    datos_df = pd.concat([datos_df, tabla], axis = 0)
    contador += 1

datos_df.columns = columnas_df

datos_df = datos_df.drop_duplicates()
datos_df = datos_df.sort_values(by = ['Dia', 'Empresa'], ascending = [False, True],ignore_index = True)

datos_df['Clave_Compuesta'] = datos_df.Codigo.str.cat(datos_df.Dia)
datos_df['Columna_Temporal'] = datetime.datetime.now()

datos_df

In [None]:
# Conexión a la base de datos en Amazon Redshift y creación del cursor:

try:
    conexion = pg2.connect(host = HOST_BD, port = PUERTO_BD, dbname = NOMBRE_BD, user = USUARIO_BD, password = CONTRASENA_BD)
    print('Conexión exitosa a la base de datos.')
except Exception as e:
    print('Conexión fallida a la base de datos.')
    print(e)

cursor = conexion.cursor()

In [120]:
# Creación de las tablas principal y staging en Amazon Redshift:

columnas_query_create = ''
contador_create = 1

for i in dict_aws:
        if contador_create != len(dict_aws):
                columnas_query_create = columnas_query_create + i + ' ' + dict_aws[i] + ','
        else:
                columnas_query_create = columnas_query_create + i + ' ' + dict_aws[i]
        contador_create += 1

query_create = 'CREATE TABLE IF NOT EXISTS b_arganaraz_londero_coderhouse.cotizacion_magnificos(' + columnas_query_create + ');' + '''

CREATE TABLE IF NOT EXISTS b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging(''' + columnas_query_create + ');'

cursor.execute(query_create)
conexion.commit()

In [121]:
# Insercion de datos a la tabla staging creada en Amazon Redshift:

columnas_query_insert = ''
contador_insert = 1

for x in dict_aws:
        if contador_insert != len(dict_aws):
                columnas_query_insert = columnas_query_insert + x + ', '
        else:
                columnas_query_insert = columnas_query_insert + x
        contador_insert += 1

query_insert = 'INSERT INTO b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging(' + columnas_query_insert + ') VALUES %s;'

valores = [tuple(var) for var in datos_df[list_aws].to_numpy()]
execute_values(cursor, query_insert, valores)
conexion.commit()

In [122]:
# Actualización incremental de la tabla principal en base a los datos de la tabla staging en Amazon Redshift:

query_incremental = '''
DELETE FROM b_arganaraz_londero_coderhouse.cotizacion_magnificos 
USING b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging 
WHERE b_arganaraz_londero_coderhouse.cotizacion_magnificos.Clave_Compuesta = b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging.Clave_Compuesta;

INSERT INTO b_arganaraz_londero_coderhouse.cotizacion_magnificos SELECT * FROM b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging;

DROP TABLE b_arganaraz_londero_coderhouse.cotizacion_magnificos_staging;
'''

cursor.execute(query_incremental)
conexion.commit()

In [None]:
# Seleccion de los datos insertados a la tabla creada en Amazon Redshift para comprobación de carga correcta:

query_seleccion = '''
        SELECT * FROM b_arganaraz_londero_coderhouse.cotizacion_magnificos
        '''

cursor.execute(query_seleccion)
conexion.commit()
resultados = cursor.fetchall()

resultados = pd.DataFrame(resultados)
resultados.columns = list_aws
resultados.set_index('Clave_Compuesta', inplace = True)

resultados

In [124]:
# Cierre de cursor y conexión:

cursor.close()
conexion.close()