# ETL

O objetivo deste notebook é fazer a leitura dos dados na tabela de input do streaming(todas as colunas em 1 linha) e escrever os dados na tabela 'laps', com as colunas separadas e assim conseguirmos fazer analises.

In [1]:
# !pip install confluent-kafka psycopg2-binary

In [2]:
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

In [3]:
ano="2022"

## Extract

In [4]:
# Configurações de conexão com o PostgreSQL
db_config = {
    'host': 'postgres',
    'database': 'f1',
    'user': 'admin',
    'password': 'admin'
}
try:
    connection = psycopg2.connect(**db_config)
    cursor = connection.cursor()
    query = f"select col_1 from f1_schema.laps_{ano}"

    cursor.execute(query)
    dados = cursor.fetchall()

    registro=[]
    for linha in dados:
        registro.append(linha[0].replace('\n','').split(';'))
    df = pd.DataFrame(registro, columns=['raceId', 'driverId', 'lap', 'position', 'time', 'milliseconds'])

except (Exception, psycopg2.Error) as error:
    print("Erro coletando dados do PostgreSQL", error)

finally:
    # closing database connection.
    if connection:
        cursor.close()
        connection.close()
        print("Conexão encerrada com o PostgreSQL")

Conexão encerrada com o PostgreSQL


## Transform

In [5]:
df

Unnamed: 0,raceId,driverId,lap,position,time,milliseconds
0,1074,844,1,1,1:39.070,99070
1,1074,830,1,2,1:40.236,100236
2,1074,832,1,3,1:41.006,101006
3,1074,1,1,4,1:41.555,101555
4,1074,825,1,5,1:42.333,102333
...,...,...,...,...,...,...
1067,1074,849,54,18,1:38.707,98707
1068,1074,854,54,12,1:38.747,98747
1069,1074,830,54,13,2:04.094,124094
1070,1074,844,55,1,1:35.427,95427


In [6]:
# verificando os tipos de colunas
df.dtypes

raceId          object
driverId        object
lap             object
position        object
time            object
milliseconds    object
dtype: object

In [7]:
# ajustando os tipos das colunas 
df[['raceId','driverId', 'lap', 'position', 'milliseconds']] = df[['raceId','driverId', 'lap', 'position', 'milliseconds']].apply(pd.to_numeric)

In [8]:
# verificando os tipos de colunas
df.dtypes

raceId           int64
driverId         int64
lap              int64
position         int64
time            object
milliseconds     int64
dtype: object

## LOAD

In [9]:
# inserindo dados no postgres
engine = create_engine('postgresql://admin:admin@postgres:5432/f1') 
df.to_sql(name=f'tb_laps_{ano}', con=engine, schema='f1_schema', if_exists='replace',index=False)

72