In [1]:
# import libraries

import boto3, s3fs, awswrangler as wr, pandas as pd

from time import sleep

In [2]:
# set s3 buckets

s3_bucket = 's3://datalake-igti-fabio-desafio-mod2/staging-zone/'
athena_output = 's3://datalake-igti-fabio-desafio-mod2/athena-results/'

In [3]:
# create Pandas dataframe

s3 = s3fs.S3FileSystem()

df = wr.s3.read_parquet(s3_bucket)

print(df.shape)
print(df.columns)
print(df.dtypes)
df.head()

(20996744, 32)
Index(['CNPJ_BASICO', 'CNPJ_ORDEM', 'CNPJ_DV', 'IDENTIFICADOR_MATRIZ_FILIAL',
       'NOME_FANTASIA', 'SITUACAO_CADASTRAL', 'DATA_SITUACAO_CADASTRAL',
       'MOTIVO_SITUACAO_CADASTRAL', 'NOME_DA_CIDADE_NO_EXTERIOR', 'PAIS',
       'DATA_INICIO_ATIVIDADE', 'CNAE_PRINCIPAL', 'CNAE_SECUNDARIA',
       'TIPO_LOGRADOURO', 'LOGRADOURO', 'NUMERO', 'COMPLEMENTO', 'BAIRRO',
       'CEP', 'UF', 'MUNICIPIO', 'DDD_1', 'TEL_1', 'DDD_2', 'TEL_2', 'DDD_FAX',
       'FAX', 'CORREIO_ELETRONICO', 'SITUACAO_ESPECIAL',
       'DATA_SITUACAO_ESPECIAL', 'CNAE', 'DESCRICAO_CNAE'],
      dtype='object')
CNPJ_BASICO                      Int32
CNPJ_ORDEM                       Int32
CNPJ_DV                          Int32
IDENTIFICADOR_MATRIZ_FILIAL      Int32
NOME_FANTASIA                   string
SITUACAO_CADASTRAL               Int32
DATA_SITUACAO_CADASTRAL          Int32
MOTIVO_SITUACAO_CADASTRAL        Int32
NOME_DA_CIDADE_NO_EXTERIOR      string
PAIS                             Int32
DATA_IN

Unnamed: 0,CNPJ_BASICO,CNPJ_ORDEM,CNPJ_DV,IDENTIFICADOR_MATRIZ_FILIAL,NOME_FANTASIA,SITUACAO_CADASTRAL,DATA_SITUACAO_CADASTRAL,MOTIVO_SITUACAO_CADASTRAL,NOME_DA_CIDADE_NO_EXTERIOR,PAIS,...,TEL_1,DDD_2,TEL_2,DDD_FAX,FAX,CORREIO_ELETRONICO,SITUACAO_ESPECIAL,DATA_SITUACAO_ESPECIAL,CNAE,DESCRICAO_CNAE
0,7396865,1,68,1,,8,20170210,1,,,...,33851125.0,47.0,33851125.0,47.0,33851125.0,,,,1412602,"Confec��o, sob medida, de pe�as do vestu�rio, ..."
1,64904295,18,51,2,,8,20161110,1,,,...,36491000.0,31.0,33880436.0,82.0,33118379.0,CLAUDIO.GIGLIO@CAMIL.COM.BR,,,4639701,Com�rcio atacadista de produtos aliment�cios e...
2,76016369,3,16,2,,3,20060203,21,,,...,,,,,,,,,4632001,Com�rcio atacadista de cereais e leguminosas b...
3,52302726,1,82,1,,4,20210406,63,,,...,,,,,,,,,4712100,"Com�rcio varejista de mercadorias em geral, co..."
4,7396923,1,53,1,,8,20140115,1,,,...,69658088.0,,,,,,,,4721102,Padaria e confeitaria com predomin�ncia de rev...


In [4]:
# create client to AWS Athena

client = boto3.client('athena')

# function to build query

def athena_query(sql, athena_output):

    response = client.start_query_execution(
        QueryString = sql,
        QueryExecutionContext = {'Database': 'edc_mod2_desafio_database'}, 
        ResultConfiguration = {'OutputLocation': athena_output}
    )

    query_status = []

    while not query_status == 'SUCCEEDED':
        query_status= client.get_query_execution(QueryExecutionId=response["QueryExecutionId"])
        query_status = query_status["QueryExecution"]["Status"]["State"]
        sleep(2)

    return response["QueryExecutionId"]

# function to read query results

def query_results(query_id):

    response = client.get_query_results(
        QueryExecutionId=query_id
    )

    return response

# function to convert query results to dataframe

def results_to_df(query_results):
 
    columns = [
        col['Label']
        for col in query_results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []

    for res in query_results['ResultSet']['Rows'][1:]:         
        values = []

        for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return pd.DataFrame(listed_results)

In [5]:
# Pergunta 1: Quantos estabelecimentos existem?

sql = ('''

    SELECT 
        COUNT(*)
        
    FROM staging_zone

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

Unnamed: 0,_col0
0,20996744


In [6]:
# Pergunta 4: Quantos estabelecimentos não tem logradouro informado?

print(df.loc[df['LOGRADOURO'].isnull()].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE LOGRADOURO IS NULL

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

828


Unnamed: 0,_col0
0,828


In [12]:
# Pergunta 5: Quantos logradouros são avenidas?

print(df.loc[df['LOGRADOURO'].str.upper().str.startswith('AVENIDA', na=False)].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE
            UPPER(LOGRADOURO) LIKE 'AVENIDA%'

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

52587


Unnamed: 0,_col0
0,52587


In [13]:
# Pergunta 6: Quantos CEPs distintos existem entre os estabelecimentos? 

print(df.drop_duplicates(subset='CEP').shape[0])

sql = ('''

        SELECT
            COUNT(DISTINCT cep)
            
        FROM staging_zone

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

889886


Unnamed: 0,_col0
0,889885


In [15]:
# Pergunta 8: Quantos estabelecimentos possuem um CNAE relacionado a cultivo?

print(df.loc[df['DESCRICAO_CNAE'].str.upper().str.startswith('CULTIVO', na=False)].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE
            UPPER(DESCRICAO_CNAE) LIKE 'CULTIVO%'

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

200243


Unnamed: 0,_col0
0,200243


In [16]:
# Pergunta 11: Quantos estabelecimentos são filiais?

print(df.loc[df['IDENTIFICADOR_MATRIZ_FILIAL'] == 2].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE IDENTIFICADOR_MATRIZ_FILIAL = 2

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

1093082


Unnamed: 0,_col0
0,1093082
