## Description

The goal of this notebook its to make a simple ingestion of data on the AWS RDS, here will be done in a notebook using python and the library psycopg2. The reason of using notebooks, was manly for fast dev and its a one time thing in this project. 

* The reason of this RDS its only to simulate a data migration using DMS in AWS.

## IMPORTS

In [1]:
import requests
import pandas as pd
import boto3
import json
import datetime
import pprint
import os
from dotenv import load_dotenv

### Custom modules

In [2]:
from resources.brapi_api import Brapi_STOCKS_API, Brapi_notSTOCKS__API
from resources.historical_process import process_Df_hist_stocks, process_inflation_hist, process_dividends_df, move_column
from resources.postegres import PostgresDb, infer_postgres_schema

load_dotenv('resources/.env')
TOKEN_GENERATOR = os.getenv("token_generator")
POSTGRES_USER = os.getenv("user_postegress")
POSTGRES_PASSWORD = os.getenv("pass_postegress")
POSTGRES_HOST = os.getenv("host_postegress")
POSTGRES_DB = os.getenv("db_postegress")


### STOCKS and FIIS

In [3]:
# Imput vars

tickers = ["PETR4", "WEGE3", "VALE3", "RAIZ4", "ITSA4", "KLBN4", "B3SA3", "ALPA3", "ELET3", "MGLU3"]



params = {
    'range': '6mo',
    'interval': '1d',
    'fundamental': 'true',
    'modules': 'balanceSheetHistory',
    'token': TOKEN_GENERATOR,
}


URL = f"https://brapi.dev/api/quote/"

### *Requesting Stocks historical data from Brapi API*

In [4]:
# Here are a cliche class for using requests to make an API call:
## OBS: Just used it cause i want to organize my code and wil reutileze it alot.

api = Brapi_STOCKS_API(
    url=URL,
    tickers=tickers, 
    params=params
)

data = api.get_quotes()

In [5]:
# Here whats going on behind this function is that pyndatic wil force the type of the columns so, we have more
# security of what wil be ingest in the database. The rest its just simple json treatment with pandas.

historical_df =  process_Df_hist_stocks(data)


# col for update algorithm in pushing the data to the database.
col_aux = 'update_key'
historical_df[col_aux] = historical_df.date + historical_df.symbol

# Postegres it's case sensitive for columns names...
historical_df.columns = historical_df.columns.str.lower()

historical_df.head()

Unnamed: 0,date,open,high,low,close,volume,adjustedclose,symbol,update_key
0,2023-11-21,36.54,36.54,35.91,36.36,49597800,32.6339,PETR4,2023-11-21PETR4
1,2023-11-22,34.9,35.16,34.25,35.16,58578300,32.7786,PETR4,2023-11-22PETR4
2,2023-11-23,35.13,35.25,34.81,35.17,24068100,32.788,PETR4,2023-11-23PETR4
3,2023-11-24,34.98,35.82,34.71,35.27,53456200,32.8812,PETR4,2023-11-24PETR4
4,2023-11-27,35.0,35.4,34.75,35.07,36996000,32.6947,PETR4,2023-11-27PETR4


#### *Load to RDS*

In [6]:
table_stocks = "stocks_historical"
schema_name_rds = "firsttry"
schema_stocks = infer_postgres_schema(historical_df)

In [7]:
def save_db_postgress_RDS(dataframe, schema_name_rds, table_name_rds, columns_schema_rds, key_column_rds):
    df = dataframe
    db = PostgresDb(POSTGRES_HOST, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD, 5432)
    db.connect()
    db.create_update_table(df, schema_name_rds, table_name_rds, columns_schema_rds, key_column_rds)
    print("Data saved into PostgreSQL")

In [8]:
stocks_load_rds = save_db_postgress_RDS(historical_df, schema_name_rds, table_stocks, schema_stocks, col_aux)

Data saved into PostgreSQL


### Stocks and FIIS dividends'

In [9]:
tickers = ["PETR4", "WEGE3", "VALE3", "RAIZ4", "ITSA4", "KLBN4", "B3SA3", "ALPA3", "ELET3", "MGLU3"]


params = {
    'range': '6mo',
    'interval': '1d',
    'fundamental': 'false',
    'dividends': 'true',
    'token': TOKEN_GENERATOR,
}


URL = f"https://brapi.dev/api/quote/"

In [10]:
api_div = Brapi_STOCKS_API(
    url=URL,
    tickers=tickers, 
    params=params
)

data_div= api_div.get_quotes()

In [11]:
# security of what wil be ingest in the database. The rest its just simple json treatment with pandas.

div_df = process_dividends_df(data_div)


# col for update algorithm in pushing the data to the database.
col_aux_div = 'update_key'
div_df[col_aux_div] = div_df.paymentDate + div_df.symbol

# Postegres it's case sensitive for columns names...
div_df.columns = div_df.columns.str.lower()

# Reorder columns
div_df = move_column(div_df, 'symbol', 1)


div_df.head()

Unnamed: 0,approvedon,symbol,assetissued,isincode,label,lastdateprior,paymentdate,rate,relatedto,remarks,update_key
0,2024-05-03T13:00:00.000Z,PETR4,BRPETRACNPR6,BRPETRACNPR6,DIVIDENDO,2024-05-03T13:00:00.000Z,2024-05-03T13:00:00.000Z,1.764165,2º Trimestre/2024,,2024-05-03T13:00:00.000ZPETR4
0,2024-04-26T13:00:00.000Z,PETR4,BRPETRACNPR6,BRPETRACNPR6,DIVIDENDO,2024-04-26T13:00:00.000Z,2024-04-26T13:00:00.000Z,1.141431,2º Trimestre/2024,,2024-04-26T13:00:00.000ZPETR4
0,2023-11-22T13:00:00.000Z,PETR4,BRPETRACNPR6,BRPETRACNPR6,DIVIDENDO,2023-11-22T13:00:00.000Z,2023-11-22T13:00:00.000Z,1.355278,Novembro/2023,,2023-11-22T13:00:00.000ZPETR4
0,2023-08-22T13:00:00.000Z,PETR4,BRPETRACNPR6,BRPETRACNPR6,DIVIDENDO,2023-08-22T13:00:00.000Z,2023-08-22T13:00:00.000Z,1.149304,Agosto/2023,,2023-08-22T13:00:00.000ZPETR4
0,2023-06-13T13:00:00.000Z,PETR4,BRPETRACNPR6,BRPETRACNPR6,DIVIDENDO,2023-06-13T13:00:00.000Z,2023-06-13T13:00:00.000Z,1.893576,Junho/2023,,2023-06-13T13:00:00.000ZPETR4


In [12]:
table_div = "dividends_table"
schema_name_rds = "firsttry"
schema_dividends = infer_postgres_schema(div_df)
schema_dividends

{'approvedon': 'VARCHAR(255)',
 'symbol': 'VARCHAR(255)',
 'assetissued': 'VARCHAR(255)',
 'isincode': 'VARCHAR(255)',
 'label': 'VARCHAR(255)',
 'lastdateprior': 'VARCHAR(255)',
 'paymentdate': 'VARCHAR(255)',
 'rate': 'FLOAT',
 'relatedto': 'VARCHAR(255)',
 'remarks': 'VARCHAR(255)',
 'update_key': 'VARCHAR(255)'}

In [13]:
dvis_load_rds = save_db_postgress_RDS(div_df, schema_name_rds, table_div, schema_dividends, col_aux_div)

Data saved into PostgreSQL


#### *Params* (variables for Selic and inflation)

In [14]:
contry = 'brazil'
start = '01/01/2023'
end = '01/01/2024'
sortby = 'date'
sortorder = 'asc'

### Inflation

In [15]:
url_inflation = "https://brapi.dev/api/v2/inflation"
params = {
    'country': contry,
    'start': start,
    'end': end,
    'sortBy': sortby,
    'sortOrder': sortorder,
    'token': TOKEN_GENERATOR,
}

In [16]:
### *Requesting Inflation historical data from Brapi API*

api_inflation = Brapi_notSTOCKS__API(
    url=url_inflation,
    params=params
)

data_inflation = api_inflation.get_other_quotes()

In [17]:
df_inflation = process_inflation_hist(data_inflation['inflation'])


# col for update algorithm in pushing the data to the database.
col_aux_infla = 'update_key'


df_inflation[col_aux_infla] = df_inflation['date'].astype(str) + '_' + df_inflation['value'].astype(str)

# Postegres it's case sensitive for columns names...
df_inflation.columns = df_inflation.columns.str.lower()

df_inflation.head()

Unnamed: 0,date,value,update_key
0,2023-01-01,5.77,2023-01-01_5.77
1,2023-02-01,5.6,2023-02-01_5.6
2,2023-03-01,4.65,2023-03-01_4.65
3,2023-04-01,4.18,2023-04-01_4.18
4,2023-05-01,3.94,2023-05-01_3.94


### Selic

In [18]:
# Same schema / metod that inflation...

url_selic = "https://brapi.dev/api/v2/prime-rate"
params = {
    'country': contry,
    'start': start,
    'end': end,
    'sortBy': sortby,
    'sortOrder': sortorder,
    'token': TOKEN_GENERATOR,
}

In [19]:
api_selic = Brapi_notSTOCKS__API(
    url=url_selic,
    params=params
)

data_selic = api_selic.get_other_quotes()

In [20]:
df_selic = process_inflation_hist(data_selic['prime-rate'])
df_selic = df_selic.rename(columns={'value': 'selic_value'}).reset_index(drop=True)
df_selic.columns = df_selic.columns.str.lower()
df_selic.head()

Unnamed: 0,date,selic_value
0,2023-01-01,13.75
1,2023-01-02,13.75
2,2023-01-03,13.75
3,2023-01-04,13.75
4,2023-01-05,13.75


Because inflation and selic tables are in the same struct and are short,
they will be merged  for simplicity of the upload.

In [21]:
# Lets start changing the name of values in inflation too.

df_inflation = df_inflation.rename(columns={'value': 'inflation_value'}).reset_index(drop=True)

# mergin 

df_merge = pd.merge(df_inflation, df_selic, how='left', on='date')
columns = ['date', 'inflation_value', 'selic_value', 'update_key']
df_merge = df_merge[columns]

df_merge.head()

Unnamed: 0,date,inflation_value,selic_value,update_key
0,2023-01-01,5.77,13.75,2023-01-01_5.77
1,2023-02-01,5.6,13.75,2023-02-01_5.6
2,2023-03-01,4.65,13.75,2023-03-01_4.65
3,2023-04-01,4.18,13.75,2023-04-01_4.18
4,2023-05-01,3.94,13.75,2023-05-01_3.94


#### *Load to RDS*

In [22]:
table_infla_selic = "infla_selic_historical"
#schema_name_rds = "same variable then stocks"
schema_infla_selic = infer_postgres_schema(df_merge)
schema_infla_selic

{'date': 'VARCHAR(255)',
 'inflation_value': 'FLOAT',
 'selic_value': 'FLOAT',
 'update_key': 'VARCHAR(255)'}

In [23]:
infla_load_rds = save_db_postgress_RDS(df_merge, schema_name_rds, table_infla_selic, schema_infla_selic, col_aux_infla)

Data saved into PostgreSQL
