In [9]:
pip install psycopg2


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Users/douglasportella/anaconda3/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [24]:
# src/extract.py

import pandas as pd
import requests
import time
from datetime import datetime
import os
# def extract_data():
#     data = pd.read_csv('usuarios.csv')
#     df = pd.DataFrame(data)
#     return df

def extract_data():
    
    def collect_date(page):
        url = "https://jsonplaceholder.typicode.com/posts"
        params = {
        "_page": page
        }
        
        response = requests.get(url, params)
        return response

    # PARA O LIMITE DE REQUISICOES POR PAGE
    limite_diferentes_consultas = 12
    tempo_limite_minutos = 15
    requisicoes_diferentes_consultas = 0
    tempo_inicio = time.time()
    
    today = datetime.now().date()
    directory = r"/Users/douglasportella/date"
    directory_today = os.path.join(directory, today.strftime('%d-%m-%Y'))

    # Cria o diretório se não existir
    if not os.path.exists(directory_today):
        os.makedirs(directory_today)
    
    new_table = pd.DataFrame()
    page = 1
    while True:
        
        ## LIMINTE DE REQUISICOES A CADA 10 PAGE
        tempo_decorrido = time.time() - tempo_inicio
        if requisicoes_diferentes_consultas >= limite_diferentes_consultas:
            if tempo_decorrido < (tempo_limite_minutos * 60):
                tempo_espera = (tempo_limite_minutos * 60) - tempo_decorrido
                print(f"Limite de requisições para consultas diferentes atingido. Esperando {tempo_espera:.1f} segundos.")
                time.sleep(tempo_espera)
            requisicoes_diferentes_consultas = 0
            tempo_inicio = time.time()       
        
        try:
            response = collect_date(page)
            print(f"requisicao feita na pagina {page}")
        except request.RequestException as e:
            print(f"erro ao realizar requisicao {e}")
            break
            
        if response.status_code == 200:
            print("Requisicao 200 bem sucedida")
        elif response.status_code == 429:
            print("Requisicao bloqueada")
            time.sleep(60 * 60)
            continue
        else:
            print("Erro ao acessar a api")
            print(f"Erro ao acessar a API: {response.status_code}")
        try:
            converted_file = response.json()
        except ValueError as e:
            print(f"Erro ao decodificar json")
            break
        if not converted_file:
            print(F"Nenum dado recebido na padina {page}")
            break
        
        table = pd.DataFrame(converted_file)
        new_table = pd.concat([new_table, table], ignore_index=True)
        page += 1
        requisicoes_diferentes_consultas += 1
    
    timestamp = datetime.now().strftime('%d-%m-%Y_%H-%M-%S')
    csv_file = os.path.join(directory_today, f"dados_coletados_{timestamp}.xlsx")
    new_table.to_excel(csv_file, index=False)
    
    
    return new_table


In [25]:
# src/transform.py

import pandas as pd
from datetime import datetime

def transform_data(df):
    df_cols = pd.DataFrame(columns=['userId', 'id', 'title', 'body'])
    df_data = pd.concat([df_cols, df], ignore_index=True, join='inner')

    df_data = df_data.rename({
        'userId': 'userid',
        'id': 'id',
        'title': 'title',
        'body':'body'
    }, axis=1)

    df_data = df_data.astype('string')
    df_data['inserted_at'] = datetime.now()
    #df_data['cpf'] = df_data['cpf'].str.replace('.', '', regex=False).str.replace('-', '', regex=False)

    return df_data


In [26]:
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os

load_dotenv()

def sqlConnector():
    
    server = os.getenv('HOST')
    database = os.getenv('DATABASE')
    username = os.getenv('USER')
    password = os.getenv('PASSWORD')

    engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{server}:5432/{database}')
    return engine

def load_data(df_data):
    table_name = 'usuario'
    schema = 'usuarios'

    try:
        engine = sqlConnector()
        df_data.to_sql(name=table_name, index=False, con=engine, schema=schema, if_exists='append', method='multi',
                       chunksize=((2100 // len(df_data.columns)) - 1))
        print(f'Inserted into {schema}.{table_name}')
    except Exception as e:
        print('Error loading data into the database:', e)

    try:
        engine = sqlConnector()
        with engine.connect() as conn:
            conn.execute(text(f"""
                DELETE FROM {schema}.{table_name} t
                USING (
                    SELECT userid, MAX(inserted_at) AS max_data
                    FROM {schema}.{table_name}
                    GROUP BY userid
                ) cte
                WHERE t.userid = cte.userid AND t.inserted_at <> cte.max_data
            """))
        print('Data deduplication completed successfully.')
        conn.commit()
        conn.close()
    except Exception as e:
        print('Error deduplicating data:', e)


In [27]:
def run_pipeline():
    print('Starting ETL...')

    # Extraction
    data = extract_data()

    # Transformation
    df = transform_data(data)

    # Load
    load_data(df)

    print('ETL finished')

if __name__ == "__main__":
    run_pipeline()

Starting ETL...
requisicao feita na pagina 1
Requisicao 200 bem sucedida
requisicao feita na pagina 2
Requisicao 200 bem sucedida
requisicao feita na pagina 3
Requisicao 200 bem sucedida
requisicao feita na pagina 4
Requisicao 200 bem sucedida
requisicao feita na pagina 5
Requisicao 200 bem sucedida
requisicao feita na pagina 6
Requisicao 200 bem sucedida
requisicao feita na pagina 7
Requisicao 200 bem sucedida
requisicao feita na pagina 8
Requisicao 200 bem sucedida
requisicao feita na pagina 9
Requisicao 200 bem sucedida
requisicao feita na pagina 10
Requisicao 200 bem sucedida
requisicao feita na pagina 11
Requisicao 200 bem sucedida
Nenum dado recebido na padina 11
Error loading data into the database: (psycopg2.OperationalError) FATAL:  password authentication failed for user "douglasportella"

(Background on this error at: https://sqlalche.me/e/14/e3q8)
Error deduplicating data: (psycopg2.OperationalError) FATAL:  password authentication failed for user "douglasportella"

(Backgr