# PRIMERA PARTE (Conexión a Redshift)

In [184]:
import pandas as pd
import psycopg2
import os
from dotenv import load_dotenv
from psycopg2.extras import execute_values
from time import sleep


In [185]:
load_dotenv()

# Se cargan las credenciales, a partir de un archivo .env, de la base de datos y se realiza la conexión.

CODER_REDSHIFT_HOST = os.environ.get('CODER_REDSHIFT_HOST')
CODER_REDSHIFT_DB = os.environ.get('CODER_REDSHIFT_DB')
CODER_REDSHIFT_USER = os.environ.get('CODER_REDSHIFT_USER')
CODER_REDSHIFT_PASS = os.environ.get('CODER_REDSHIFT_PASS')
CODER_REDSHIFT_PORT = os.environ.get('CODER_REDSHIFT_PORT')

try:
    conn = psycopg2.connect(
        host=CODER_REDSHIFT_HOST,
        dbname=CODER_REDSHIFT_DB,
        user=CODER_REDSHIFT_USER,
        password=CODER_REDSHIFT_PASS,
        port=CODER_REDSHIFT_PORT,

    )
    print("Connected to Redshift successfully!")
    
except Exception as e:
    print("Unable to connect to Redshift.")
    print(e)

Connected to Redshift successfully!


# SEGUNDA PARTE (Conexión y Parsing de Datos a partir de API AlphaVantage)

In [186]:
import requests

load_dotenv()
alphavantage_api_key = os.environ.get('ALPHAVANTAGE_API_KEY')

In [207]:
def get_json(symbol):
    url = f"https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY_ADJUSTED&symbol={symbol}&interval=5min&apikey={alphavantage_api_key}"
    r = requests.get(url)
    print(r)
    return r.json()
def json_a_diccionario(json):
    diccionario = {}
    for llave in json.keys():
        diccionario[llave] = json[llave]
    return diccionario
def format_json(json, symbol):
    df = pd.DataFrame(json['Monthly Adjusted Time Series']).T
    df.rename(columns=lambda x: x[3:], inplace=True)
    # add index column
    df['date'] = df.index
    df.reset_index(drop=True, inplace=True)
    df['date'] = pd.to_datetime(df['date'])
    df['open'] = pd.to_numeric(df['open'])
    df['high'] = pd.to_numeric(df['high'])
    df['low'] = pd.to_numeric(df['low'])
    df['close'] = pd.to_numeric(df['close'])
    df['adjusted close'] = pd.to_numeric(df['adjusted close'])
    # Cambiar volumen a millones
    df['volume'] = pd.to_numeric(df['volume'])
    df['volume'] = round(df['volume'] / 1000000)
    df['volume'] = df['volume'].astype(int)
    df['dividend amount'] = pd.to_numeric(df['dividend amount'])
    df['symbol'] = symbol
    df['symbol'] = df['symbol'].astype(str)
    return df

In [197]:
data = get_json('AAPL')
df_aapl = format_json(data, 'AAPL')
df_aapl.head(2)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,171.22,179.85,170.82,179.8,179.8,394,0.0,2023-10-11,AAPL
1,189.485,189.98,167.62,171.21,171.21,1338,0.0,2023-09-29,AAPL


In [208]:
dict = json_a_diccionario(data)
dict

{'Meta Data': {'1. Information': 'Monthly Adjusted Prices and Volumes',
  '2. Symbol': 'AAPL',
  '3. Last Refreshed': '2023-10-11',
  '4. Time Zone': 'US/Eastern'},
 'Monthly Adjusted Time Series': {'2023-10-11': {'1. open': '171.2200',
   '2. high': '179.8500',
   '3. low': '170.8200',
   '4. close': '179.8000',
   '5. adjusted close': '179.8000',
   '6. volume': '393793087',
   '7. dividend amount': '0.0000'},
  '2023-09-29': {'1. open': '189.4850',
   '2. high': '189.9800',
   '3. low': '167.6200',
   '4. close': '171.2100',
   '5. adjusted close': '171.2100',
   '6. volume': '1337873796',
   '7. dividend amount': '0.0000'},
  '2023-08-31': {'1. open': '196.2350',
   '2. high': '196.7300',
   '3. low': '171.9600',
   '4. close': '187.8700',
   '5. adjusted close': '187.8700',
   '6. volume': '1323817340',
   '7. dividend amount': '0.2400'},
  '2023-07-31': {'1. open': '193.7800',
   '2. high': '198.2300',
   '3. low': '186.6000',
   '4. close': '196.4500',
   '5. adjusted close': '1

In [189]:
data = get_json('AMZN')
df_amzn = format_json(data, 'AMZN')
df_amzn.head(2)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,127.28,132.05,124.13,131.83,131.83,352,0.0,2023-10-11,AMZN
1,139.455,145.86,123.04,127.12,127.12,1121,0.0,2023-09-29,AMZN


In [190]:
data = get_json('GOOG')
df_goog = format_json(data, 'GOOG')
df_goog.head(2)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,132.155,142.22,132.065,141.7,141.7,155,0.0,2023-10-11,GOOG
1,138.43,139.93,128.19,131.85,131.85,390,0.0,2023-09-29,GOOG


In [191]:
data = get_json('MSFT')
df_msft = format_json(data, 'MSFT')
df_msft.head(2)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,316.28,332.82,311.215,332.42,332.42,165,0.0,2023-10-11,MSFT
1,331.31,340.86,309.45,315.75,315.75,417,0.0,2023-09-29,MSFT


In [192]:
data = get_json('IBM')
df_ibm = format_json(data, 'IBM')
df_ibm.head(2)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,140.04,143.415,139.86,143.22,143.22,24,0.0,2023-10-11,IBM
1,147.26,151.9299,139.61,140.3,140.3,83,0.0,2023-09-29,IBM


In [None]:
# Se visualiza el tipo de dato de cada columna para la creación de la tabla
df_aapl.dtypes

open                      float64
high                      float64
low                       float64
close                     float64
adjusted close            float64
volume                      int32
dividend amount           float64
date               datetime64[ns]
symbol                     object
dtype: object

In [None]:
def cargar_en_redshift(conn, table_name, dataframe):
    dtypes = dataframe.dtypes
    cols = list(dtypes.index)
    print(cols)
    tipos = list(dtypes.values)
    type_map = {
        'float64': 'FLOAT',
        'int32': 'INT',
        'datetime64[ns]': 'TIMESTAMP',
        'object': 'VARCHAR(255)'
    }
    sql_dtypes = [type_map.get(str(dtype), 'VARCHAR(255)') for dtype in tipos]

    # Definir formato SQL VARIABLE TIPO_DATO
    column_defs = [f'"{name}" {data_type}' for name, data_type in zip(cols, sql_dtypes)]

    # Combine column definitions into the CREATE TABLE statement
    table_schema = f"""
        CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(column_defs)});
        """

    # Crear la tabla
    cur = conn.cursor()
    try:
        cur.execute(table_schema)

        # Generar los valores a insertar
        values = [tuple(x) for x in dataframe.values]

        # Definir el INSERT
        insert_sql = f"INSERT INTO {table_name} (\"open\", \"high\", \"low\", \"close\", \"adjusted close\", \"volume\", \"dividend amount\", \"date\", \"symbol\") VALUES %s"

        # Execute the transaction to insert the data
        cur.execute("BEGIN")
        execute_values(cur, insert_sql, values)
        cur.execute("COMMIT")
        print('Proceso terminado')
    except Exception as e:
        print(f"Error: {e}")
        conn.rollback()  # Rollback the transaction on error

def drop_table(conn, table_name):
    cur = conn.cursor()
    try:
        cur.execute(f"DROP TABLE IF EXISTS {table_name}")
        conn.commit()
        print('Proceso terminado')
    except Exception as e:
        print(f"Error: {e}")
        conn.rollback()  # Rollback the transaction on error


In [None]:
drop_table(conn=conn, table_name='apple_example')

Proceso terminado


In [None]:
cargar_en_redshift(conn=conn, table_name='apple_example', dataframe=df_aapl)

['open', 'high', 'low', 'close', 'adjusted close', 'volume', 'dividend amount', 'date', 'symbol']
Proceso terminado


# TERCERA PARTE (Unión de Datos y Subida a Redshift)

In [None]:
result_df = pd.concat([df_aapl, df_amzn, df_goog, df_msft, df_ibm], ignore_index=True)
result_df = result_df.sort_values(by=['date'], ascending=False)
result_df = result_df.reset_index(drop=True)
result_df.head(10)

Unnamed: 0,open,high,low,close,adjusted close,volume,dividend amount,date,symbol
0,171.22,179.85,170.82,179.8,179.8,394,0.0,2023-10-11,AAPL
1,32.95,34.11,32.76,33.1,33.1,205,0.0,2023-10-11,PFE
2,316.28,332.82,311.215,332.42,332.42,165,0.0,2023-10-11,MSFT
3,127.28,132.05,124.13,131.83,131.83,352,0.0,2023-10-11,AMZN
4,132.155,142.22,132.065,141.7,141.7,155,0.0,2023-10-11,GOOG
5,140.04,143.415,139.86,143.22,143.22,24,0.0,2023-10-11,IBM
6,138.43,139.93,128.19,131.85,131.85,390,0.0,2023-09-29,GOOG
7,35.64,36.29,31.7744,33.17,33.17,555,0.0,2023-09-29,PFE
8,331.31,340.86,309.45,315.75,315.75,417,0.0,2023-09-29,MSFT
9,147.26,151.9299,139.61,140.3,140.3,83,0.0,2023-09-29,IBM


In [None]:
drop_table(conn=conn, table_name='monthly_stocks_over_time')

Proceso terminado


In [None]:
cargar_en_redshift(conn=conn, table_name='monthly_stocks_over_time', dataframe=result_df)

['open', 'high', 'low', 'close', 'adjusted close', 'volume', 'dividend amount', 'date', 'symbol']
Proceso terminado
