In [160]:
#Installing libraries
!pip install pandas
!pip install pymongo
!pip install boto3
!pip install datetime
!pip install kafka



In [163]:
import os
import glob
import pandas as pd
import boto3
import botocore
from datetime import datetime
from io import StringIO
import json
from json import loads

# Data extraction and cleansing

In [9]:
#Reading file with states data
states = pd.read_csv('estados.csv')
states

Unnamed: 0,codigo_uf,uf,nome,latitude,longitude,regiao
0,101,AWARI-1,Awari Estado 1,-10.83,-63.34,Norte
1,102,AWARI-2,Awari Estado 2,-8.77,-70.55,Norte


In [10]:
#Reading file with cities data
cities = pd.read_csv('municipios.csv')
cities

Unnamed: 0,codigo_ibge,nome,latitude,longitude,capital,codigo_uf,siafi_id,ddd,fuso_horario
0,9900001,Minha cidade 1,-16.7573,-49.4412,0,52,1050,100,America/Sao_Paulo
1,9900002,Minha cidade 2,-18.4831,-47.3916,0,31,4001,101,America/Sao_Paulo


In [11]:
#Reading file with states data - full
states_full = pd.read_csv('estados_batch.csv')
states_full

Unnamed: 0,codigo_uf,uf,nome,latitude,longitude,regiao
0,11,RO,Rondônia,-10.83,-63.34,Norte
1,12,AC,Acre,-8.77,-70.55,Norte
2,13,AM,Amazonas,-3.47,-65.1,Norte
3,14,RR,Roraima,1.99,-61.33,Norte
4,15,PA,Pará,-3.79,-52.48,Norte
5,16,AP,Amapá,1.41,-51.77,Norte
6,17,TO,Tocantins,-9.46,-48.26,Norte
7,21,MA,Maranhão,-5.42,-45.44,Nordeste
8,22,PI,Piauí,-6.6,-42.28,Nordeste
9,23,CE,Ceará,-5.2,-39.53,Nordeste


In [14]:
#Joining states info with cities info
df_cities = states_full.join(cities.set_index('codigo_uf'), on='codigo_uf', how='inner', lsuffix='_state', rsuffix='_city')
df_cities

Unnamed: 0,codigo_uf,uf,nome_state,latitude_state,longitude_state,regiao,codigo_ibge,nome_city,latitude_city,longitude_city,capital,siafi_id,ddd,fuso_horario
16,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,9900002,Minha cidade 2,-18.4831,-47.3916,0,4001,101,America/Sao_Paulo
25,52,GO,Goiás,-15.98,-49.86,Centro-Oeste,9900001,Minha cidade 1,-16.7573,-49.4412,0,1050,100,America/Sao_Paulo


In [27]:
#Appending new states info if needed
states.rename(columns={"nome":"nome_state", 'latitude':'latitude_state', 'longitude':'longitude_state'}, inplace=True)
df = pd.concat([df_cities, states])
df

Unnamed: 0,codigo_uf,uf,nome_state,latitude_state,longitude_state,regiao,codigo_ibge,nome_city,latitude_city,longitude_city,capital,siafi_id,ddd,fuso_horario
16,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,9900002.0,Minha cidade 2,-18.4831,-47.3916,0.0,4001.0,101.0,America/Sao_Paulo
25,52,GO,Goiás,-15.98,-49.86,Centro-Oeste,9900001.0,Minha cidade 1,-16.7573,-49.4412,0.0,1050.0,100.0,America/Sao_Paulo
0,101,AWARI-1,Awari Estado 1,-10.83,-63.34,Norte,,,,,,,,
1,102,AWARI-2,Awari Estado 2,-8.77,-70.55,Norte,,,,,,,,


# Data ingestion setup

In [71]:
#Creating client to connect to MinIO
client = boto3.client('s3',
    endpoint_url='http://awari-minio-nginx:9000',
    aws_access_key_id='E3sfE0hkvnUXBcpU',
    aws_secret_access_key='fc2kh6jZjHPyl6opKA8bTYA9xLKw2J8I',
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False,
    region_name='sa-east-1'
)

In [147]:
#Creating function to save in S3/MinIO
def save_key_to_s3(data_frame, key):
    csv_buffer = StringIO()
    csv = data_frame.to_csv(csv_buffer, index=False)
    client.put_object(Body=csv_buffer.getvalue(), Bucket='exercicio-aula-06', Key=key)
    response = client.get_object(Bucket='exercicio-aula-06', Key=key)
    return response

# Data ingestion - incremental method

In [151]:
x=pd.read_csv(client.get_object(Bucket='exercicio-aula-06', Key='MG/cidades.csv').get("Body"))
x

Unnamed: 0.1,Unnamed: 0,codigo_uf,uf,nome_state,latitude_state,longitude_state,regiao,codigo_ibge,nome_city,latitude_city,longitude_city,capital,siafi_id,ddd,fuso_horario
0,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3100104.0,Abadia dos Dourados,-18.4831,-47.3916,0.0,4001.0,34.0,America/Sao_Paulo
1,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3100203.0,Abaeté,-19.1551,-45.4444,0.0,4003.0,37.0,America/Sao_Paulo
2,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3100302.0,Abre Campo,-20.2996,-42.4743,0.0,4005.0,31.0,America/Sao_Paulo
3,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3100401.0,Acaiaca,-20.3590,-43.1439,0.0,4007.0,31.0,America/Sao_Paulo
4,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3100500.0,Açucena,-19.0671,-42.5419,0.0,4009.0,33.0,America/Sao_Paulo
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
849,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3171907.0,Virgolândia,-18.4738,-42.3067,0.0,5439.0,33.0,America/Sao_Paulo
850,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3172004.0,Visconde do Rio Branco,-21.0127,-42.8361,0.0,5441.0,32.0,America/Sao_Paulo
851,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3172103.0,Volta Grande,-21.7671,-42.5375,0.0,5443.0,32.0,America/Sao_Paulo
852,16.0,31,MG,Minas Gerais,-18.1,-44.38,Sudeste,3172202.0,Wenceslau Braz,-22.5368,-45.3626,0.0,5421.0,35.0,America/Sao_Paulo


In [148]:
#Creating a loop to validate the new data with each state file

for i in range(pd.unique(df['uf']).size):
    state_name = pd.unique(df['uf'])[i]
    cities_from_state = df[df['uf'] == pd.unique(df['uf'])[i]]
    key_cities = state_name + '/cidades.csv'
    print(state_name)
    
    #Checking if the file already exists or creating a new empty file
    try:
        response = client.get_object(Bucket='exercicio-aula-06', Key=key_cities)
        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "NoSuchKey":
            #If key_cities doesn't exist
            status_df = pd.DataFrame(columns = df.columns)
            response = save_key_to_s3(status_df, key_cities)
            print('File succesfully created!')
    cities_df_minio = pd.read_csv(response.get("Body"))
    
    #Appending data in the datalake with the new data
    new_cities = pd.concat([cities_df_minio, cities_from_state])
    
    #Saving new files in the datalake
    response = save_key_to_s3(new_cities, key_cities)
    
    print('OK!\n')

MG
OK!

GO
OK!

AWARI-1
File succesfully created!
OK!

AWARI-2
File succesfully created!
OK!

