In [1]:
from utils.utils_bigquery import *
from google.oauth2 import service_account
from google.cloud import bigquery
import pandas as pd
from datetime import *
import requests

In [2]:
# Conectamos con Bigquery
bigquery = BigQueryUtils(key_path)

In [3]:
# Ruta credenciales de CGP
key_path = key_path

# Datos de origen de datos en Bigquery
project = project_id
dataset = 'bronze'
table = 'bronze_fred_macro_data'
table_conca = f'{project}.{dataset}.{table}'

In [4]:
# FED api key
api_key = '3c51707aa453bc92cf99a67b6db413a6'

In [5]:
# Definimos la url para obtener datos de la API
base_url = "https://api.stlouisfed.org/fred/"
obs_endpoint = 'series/observations'

# Parameters
macro_data = {
    "GDP": "Producto Interno Bruto (PIB)",
    "CPIAUCSL": "Índice de Precios al Consumidor (IPC)",
    "UNRATE": "Tasa de Desempleo",
    "FEDFUNDS": "Tasa de Interés de la Reserva Federal",
    "RSAFS": "Ventas Minoristas",
    "PPIACO": "Índice de Precios al Productor (IPP)",
    "UMCSENT": "Confianza del Consumidor (UMich Sentiment)",
    "BOPGSTB": "Balanza Comercial",
    "NAPM": "Índice de Gestores de Compras (PMI)",
    "WCESTP11": "Inventarios de Petróleo"
}
start_date = '2015-01-01'
end_date = date.today()
ts_frequency = 'd'
ts_units = 'pc1'

In [6]:
obs_params = {
    "series_id": 'GDP',
    "api_key": api_key,
    "file_type": "json",
    "observation_start": start_date,
    "observation_end": end_date,
    'units': ts_units
}

In [7]:
# DataFrame para almacenar todos los datos
df = pd.DataFrame()

# Bucle para obtener los datos de cada Series ID
for series_id, name in macro_data.items():
    obs_params = {
        "series_id": series_id,
        "api_key": api_key,
        "file_type": "json",
        "observation_start": start_date,
        "observation_end": end_date
    }
    
    response = requests.get(base_url + obs_endpoint, params=obs_params)
    
    if response.status_code == 200:
        res_data = response.json()
        obs_data = pd.DataFrame(res_data['observations'])
        obs_data['date'] = pd.to_datetime(obs_data['date'])
        obs_data.set_index('date', inplace=True)
        obs_data['value'] = obs_data['value'].astype(float)
        obs_data['serie_id'] = series_id
        
        # Concatenar los datos directamente
        df = pd.concat([df, obs_data], ignore_index=False)

        # Print de los datos obtenidos
        print(f"Datos para {name} (Series ID: {series_id}):")
        print(obs_data.head())  
    else:
        print(f"Error {response.status_code} al obtener datos para {name} (Series ID: {series_id})")


Datos para Producto Interno Bruto (PIB) (Series ID: GDP):
           realtime_start realtime_end      value serie_id
date                                                      
2015-01-01     2024-08-29   2024-08-29  18063.529      GDP
2015-04-01     2024-08-29   2024-08-29  18279.784      GDP
2015-07-01     2024-08-29   2024-08-29  18401.626      GDP
2015-10-01     2024-08-29   2024-08-29  18435.137      GDP
2016-01-01     2024-08-29   2024-08-29  18525.933      GDP
Datos para Índice de Precios al Consumidor (IPC) (Series ID: CPIAUCSL):
           realtime_start realtime_end    value  serie_id
date                                                     
2015-01-01     2024-08-29   2024-08-29  234.747  CPIAUCSL
2015-02-01     2024-08-29   2024-08-29  235.342  CPIAUCSL
2015-03-01     2024-08-29   2024-08-29  235.976  CPIAUCSL
2015-04-01     2024-08-29   2024-08-29  236.222  CPIAUCSL
2015-05-01     2024-08-29   2024-08-29  237.001  CPIAUCSL
Datos para Tasa de Desempleo (Series ID: UNRATE):
 

In [8]:
# Reset del indice para que la fecha sea una columna mas
df = df.reset_index()
df

Unnamed: 0,date,realtime_start,realtime_end,value,serie_id
0,2015-01-01,2024-08-29,2024-08-29,18063.529,GDP
1,2015-04-01,2024-08-29,2024-08-29,18279.784,GDP
2,2015-07-01,2024-08-29,2024-08-29,18401.626,GDP
3,2015-10-01,2024-08-29,2024-08-29,18435.137,GDP
4,2016-01-01,2024-08-29,2024-08-29,18525.933,GDP
...,...,...,...,...,...
836,2024-02-01,2024-08-29,2024-08-29,-69006.000,BOPGSTB
837,2024-03-01,2024-08-29,2024-08-29,-68582.000,BOPGSTB
838,2024-04-01,2024-08-29,2024-08-29,-74462.000,BOPGSTB
839,2024-05-01,2024-08-29,2024-08-29,-75006.000,BOPGSTB


In [9]:
# Campos que compenen el campo id
id_fields = ['serie_id', 'date']

# Generamos el campo id
df['id'] = df.apply(generate_id, axis=1, fields=id_fields)

df

Unnamed: 0,date,realtime_start,realtime_end,value,serie_id,id
0,2015-01-01,2024-08-29,2024-08-29,18063.529,GDP,c9c54655bf3f26795cde838f06a25bd7
1,2015-04-01,2024-08-29,2024-08-29,18279.784,GDP,14c74cebfff6bb3f9917359c00a3a0bb
2,2015-07-01,2024-08-29,2024-08-29,18401.626,GDP,ffff25f5e55f40990b2e90a4c7165ceb
3,2015-10-01,2024-08-29,2024-08-29,18435.137,GDP,f1ed1f9a501d4609d184eafeb2574206
4,2016-01-01,2024-08-29,2024-08-29,18525.933,GDP,54b3d90583a5b0d5287deae543708ac6
...,...,...,...,...,...,...
836,2024-02-01,2024-08-29,2024-08-29,-69006.000,BOPGSTB,3f051cb51254dc9c37be85c0e8153a91
837,2024-03-01,2024-08-29,2024-08-29,-68582.000,BOPGSTB,7a284d8ee4a1d47c2a932f8de30235e8
838,2024-04-01,2024-08-29,2024-08-29,-74462.000,BOPGSTB,b040d0737ceada938aea776e262cc96f
839,2024-05-01,2024-08-29,2024-08-29,-75006.000,BOPGSTB,0f87a39401474be535e8a202a9ac1cd9


In [11]:
try:
    # Filtramos solamente los nuevos registros
    df_incremental = bigquery.select_for_incremental(id='id', table=table_conca, new_df=df)

    if not df_incremental.empty:
        # Guardamos los datos en Bigquery
        bigquery.save_dataframe(df_incremental, project, dataset, table, if_exists='append', schema=None)
        print(f'New records loaded')
    else:
        print('No new records to load.')

# En el caso de no tener datos en Bigquery, guardamos todo el df
except Exception as e:
    bigquery.save_dataframe(df, project, dataset, table, if_exists='replace', schema=None)
    print('New data persisted')
    print(f'Exception encountered: {e}')

No new records to load.
