In [None]:
# import libraries

import boto3, s3fs, awswrangler as wr, pandas as pd

from time import sleep

In [None]:
# set s3 buckets

s3_bucket = 's3://datalake-igti-fabio-desafio-mod3/staging-zone/'
athena_output = 's3://datalake-igti-fabio-desafio-mod3/athena-results/'

In [None]:
# create Pandas dataframe

s3 = s3fs.S3FileSystem()

df = wr.s3.read_parquet(s3_bucket)

print(df.shape)
print(df.columns)
print(df.dtypes)
df.head()

In [None]:
# create client to AWS Athena

client = boto3.client('athena')

# function to build query

def athena_query(sql, athena_output):

    response = client.start_query_execution(
        QueryString = sql,
        QueryExecutionContext = {'Database': 'edc_mod2_desafio_database'}, 
        ResultConfiguration = {'OutputLocation': athena_output}
    )

    query_status = []

    while not query_status == 'SUCCEEDED':
        query_status= client.get_query_execution(QueryExecutionId=response["QueryExecutionId"])
        query_status = query_status["QueryExecution"]["Status"]["State"]
        sleep(2)

    return response["QueryExecutionId"]

# function to read query results

def query_results(query_id):

    response = client.get_query_results(
        QueryExecutionId=query_id
    )

    return response

# function to convert query results to dataframe

def results_to_df(query_results):
 
    columns = [
        col['Label']
        for col in query_results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []

    for res in query_results['ResultSet']['Rows'][1:]:         
        values = []

        for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return pd.DataFrame(listed_results)

In [None]:
# Pergunta 1: Quantos estabelecimentos existem?

sql = ('''

    SELECT 
        COUNT(*)
        
    FROM staging_zone

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

###

In [None]:
# Pergunta 4: Quantos estabelecimentos não tem logradouro informado?

print(df.loc[df['LOGRADOURO'].isnull()].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE LOGRADOURO IS NULL

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

In [None]:
# Pergunta 5: Quantos logradouros são avenidas?

print(df.loc[df['LOGRADOURO'].str.upper().str.startswith('AVENIDA', na=False)].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE
            UPPER(LOGRADOURO) LIKE 'AVENIDA%'

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

In [None]:
# Pergunta 6: Quantos CEPs distintos existem entre os estabelecimentos? 

print(df.drop_duplicates(subset='CEP').shape[0])

sql = ('''

        SELECT
            COUNT(DISTINCT cep)
            
        FROM staging_zone

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))

In [None]:
# Pergunta 8: Quantos estabelecimentos possuem um CNAE relacionado a cultivo?

print(df.loc[df['DESCRICAO_CNAE'].str.upper().str.startswith('CULTIVO', na=False)].shape[0])

sql = ('''

        SELECT
            COUNT(*)
            
        FROM staging_zone

        WHERE
            UPPER(DESCRICAO_CNAE) LIKE 'CULTIVO%'

    ''')

results_to_df(query_results(athena_query(sql=sql, athena_output=athena_output)))