## Instalando as bibliotecas

In [1]:
!pip install nest_asyncio
!pip install aiohttp

Collecting nest_asyncio
  Downloading nest_asyncio-1.5.5-py3-none-any.whl (5.2 kB)
Installing collected packages: nest-asyncio
Successfully installed nest-asyncio-1.5.5
Collecting aiohttp
  Downloading aiohttp-3.8.1-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 6.5 MB/s eta 0:00:01
[?25hCollecting multidict<7.0,>=4.5
  Downloading multidict-5.2.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (159 kB)
[K     |████████████████████████████████| 159 kB 21.8 MB/s eta 0:00:01
[?25hCollecting yarl<2.0,>=1.0
  Downloading yarl-1.7.2-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (270 kB)
[K     |████████████████████████████████| 270 kB 9.4 MB/s eta 0:00:01
[?25hCollecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting typing-exten

## Lendo a tabela geolocation do HIVE

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext

hive_context = HiveContext(sc)

spark = SparkSession \
    .builder \
    .appName("Teste API") \
    .enableHiveSupport() \
    .getOrCreate()

geo = spark.read.orc('/datalake/dadosbrutos/olist_geolocation_dataset.orc')

In [3]:
geo.select('geolocation_zip_code_prefix').distinct().count()

19015

## Carregando os CEPs distintos em uma lista

In [6]:
cep_array = [str(row.geolocation_zip_code_prefix) for row in geo.select('geolocation_zip_code_prefix').distinct().collect()]
print(f"{len(cep_array)} CEPs distintos encontrados no olist_geolocation_dataset.orc")

19015 CEPs distintos encontrados no olist_geolocation_dataset.orc


## Consultando CEPs no site do Correios com apenas os 5 primeiros digitos
* Usando a API pycep conseguimos consultar apenas com o CEP completo, 8 digitos
* Consultando direto no site do Correios conseguimos consultar com 5 digitos e descobrir a cidade e estado do CEP

In [4]:
import asyncio
import time
import aiohttp
import nest_asyncio
import pandas as pd
import json
from pyspark.sql import Row

global URL
# URL do site do correios
URL = 'https://buscacepinter.correios.com.br/app/endereco/carrega-cep-endereco.php'
global ceps_com_erro
ceps_com_erro = []
# Função para pegar o primeiro resultado da pesquisa de CEP com apenas 5 digitos
async def get_address(session, cep):
    async with session.post(url=URL, data={'endereco': cep, 'tipoCEP': 'ALL'}) as response:
        response = await response.text()
        try:
            for i in range(len(json.loads(response)["dados"])):
                data = json.loads(response)["dados"][i]
                if data["cep"] != '' and data["cep"][0:5] == cep: 
                    data_selected = {
                        "cep": data["cep"],
                        "uf": data["uf"],
                        "cidade": data["localidade"]
                    }
                    results.append(data_selected)
                    print(f"{str(len(results)).zfill(6)} CEPs consultados", end="\r")
                    break
        except Exception as e:
            #print(f"ERRO: {e}", end="\r")
            ceps_com_erro.append(cep)
            pass

# Função para criar as tasks assíncronas, uma task para cada cep
async def get_all_addresses(ceps):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for cep in ceps:
            task = asyncio.ensure_future(get_address(session, cep))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=False)
        

# Função prncipal para iniciar o loop assíncrono e criar o Dataframe com os resultados
def consulta_lote(ceps_array):
    global results
    results = []
    nest_asyncio.apply()
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(get_all_addresses(ceps_array))
    
    df = spark.createDataFrame((Row(**x) for x in results))
    
    duration = time.time() - start_time
    print(f"Downloaded {len(ceps_array)} ceps in {duration/60} minutes")
    return df

In [7]:
import math
import numpy as np

start_time = time.time()

tamanho_lote = 1000
qtd_lotes = math.floor(len(cep_array)/tamanho_lote)

print(f"Iniciando a consulta de {qtd_lotes} lotes com aprox. {tamanho_lote} ceps cada.")
cep_lotes = np.array_split(cep_array, qtd_lotes)

dataframes = {}
counter = 0
for lote in cep_lotes:
    counter += 1
    print(f"Consultando lote {counter}")
    dataframes[f"df_part{counter}"] = consulta_lote(lote)

duration = time.time() - start_time
print(f"Tempo total da carga: {duration/60} minutos")
print(f"Total de CEPs não encontrados: {len(ceps_com_erro)}")

Iniciando a consulta de 19 lotes com aprox. 1000 ceps cada.
Consultando lote 1
Downloaded 1001 ceps in 0.20070825417836508 minutes
Consultando lote 2
Downloaded 1001 ceps in 0.19917014042536418 minutes
Consultando lote 3
Downloaded 1001 ceps in 0.20558483997980753 minutes
Consultando lote 4
Downloaded 1001 ceps in 0.18404589891433715 minutes
Consultando lote 5
Downloaded 1001 ceps in 0.18405269384384154 minutes
Consultando lote 6
Downloaded 1001 ceps in 0.1641371488571167 minutes
Consultando lote 7
Downloaded 1001 ceps in 0.1727285663286845 minutes
Consultando lote 8
Downloaded 1001 ceps in 0.1647661288579305 minutes
Consultando lote 9
Downloaded 1001 ceps in 0.16578193108240763 minutes
Consultando lote 10
Downloaded 1001 ceps in 0.208054780960083 minutes
Consultando lote 11
Downloaded 1001 ceps in 0.156492547194163 minutes
Consultando lote 12
Downloaded 1001 ceps in 0.17083444197972616 minutes
Consultando lote 13
Downloaded 1001 ceps in 0.1676690419514974 minutes
Consultando lote 14
D

## Unindo os lotes em apenas um dataframe

In [8]:
df_final = dataframes["df_part1"]
#dataframes.pop("df_part1")
for df in dataframes:
    if df == "df_part1": continue
    print(f"Unindo o {df} ao df principal", end = "\r")
    df_final = df_final.union(dataframes[df])

Unindo o df_part2 ao df principalUnindo o df_part3 ao df principalUnindo o df_part4 ao df principalUnindo o df_part5 ao df principalUnindo o df_part6 ao df principalUnindo o df_part7 ao df principalUnindo o df_part8 ao df principalUnindo o df_part9 ao df principalUnindo o df_part10 ao df principalUnindo o df_part11 ao df principalUnindo o df_part12 ao df principalUnindo o df_part13 ao df principalUnindo o df_part14 ao df principalUnindo o df_part15 ao df principalUnindo o df_part16 ao df principalUnindo o df_part17 ao df principalUnindo o df_part18 ao df principalUnindo o df_part19 ao df principal

In [10]:
df_final.count()

18617

In [11]:
import pyspark.sql.functions as F
df_final = df_final.withColumn('cep_5_digitos', F.col("cep").substr(1,5))
df_final.show(truncate=False)

+--------+---------------------+---+-------------+
|cep     |cidade               |uf |cep_5_digitos|
+--------+---------------------+---+-------------+
|49630970|Siriri               |SE |49630        |
|55445000|Batateira            |PE |55445        |
|77303970|Rio da Conceição     |TO |77303        |
|83450970|Bocaiúva do Sul      |PR |83450        |
|48370970|Esplanada            |BA |48370        |
|89669970|Ipira                |SC |89669        |
|76976970|Primavera de Rondônia|RO |76976        |
|45300970|Amargosa             |BA |45300        |
|89555970|Ipoméia              |SC |89555        |
|86900970|Jandaia do Sul       |PR |86900        |
|70078900|Brasília             |DF |70078        |
|74605125|Goiânia              |GO |74605        |
|59318959|Serra Negra do Norte |RN |59318        |
|65927970|Davinópolis          |MA |65927        |
|18130649|São Roque            |SP |18130        |
|64793970|Coronel José Dias    |PI |64793        |
|16250970|Clementina           

## Escrevendo o resultado final no HDFS

In [12]:
df_final.write.orc('/datalake/dadosbrutos/geolocation_correios.orc', 'overwrite')