# GEOFUSION  Processo Seletivo

##  Engenheiro de Dados - Análise de Dados

###  Desafio Técnico

### CLAUDIO GERVASIO DE LISBOA FEV/2019

In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import plotly.plotly as py

ModuleNotFoundError: No module named 'plotly'

In [2]:
# Iniciar spark session
_spark = SparkSession.builder \
                    .master("local") \
                    .appName("GeoFusion CASE") \
                    .getOrCreate()

In [7]:
# Files localization ON Windows
_bairros = 'C:\\dump\\gf\\bairros.csv'
_concorrentes = 'C:\\dump\\gf\\concorrentes.csv'
_eventos_de_fluxo = 'C:\\dump\\gf\\eventos_de_fluxo.csv'
_populacao = 'C:\\dump\\gf\\populacao.json'
_potencial = 'C:\\dump\\gf\\potencial.csv'

# Linux localization
_bairros = '/home/jovyan/work/bairros.csv'
_concorrentes = '/home/jovyan/work/concorrentes.csv'
_eventos_de_fluxo = '/home/jovyan/work/eventos_de_fluxo.csv'
_populacao = '/home/jovyan/work/populacao.json'
_potencial = '/home/jovyan/work/potencial.csv'

In [8]:
# UDF Periodo do dia
def get_periodo_dia(hour):
    return (
        "(1)manha" if 5 <= hour <= 11
        else
        "(2)tarde" if 12 <= hour <= 17
        else
        "(3)noite" if 18 <= hour <= 22
        else
        "(4)madrugada"
    )
udf_get_periodo_dia = udf(get_periodo_dia, StringType())

In [9]:
# Leitura das Fontes de Dados

df_bairros = _spark.read.option("encoding", "utf-8").csv(_bairros, header=True, sep=',')
df_concorrentes = _spark.read.option("encoding", "utf-8").csv(_concorrentes, header=True, sep=',')
df_eventos_de_fluxo = _spark.read.option("encoding", "utf-8").csv(_eventos_de_fluxo, header=True, sep=',')
df_populacao = _spark.read.option("encoding", "utf-8").json(_populacao)


df_potencial = _spark.read.option("encoding", "utf-8").csv(_potencial, header=True, sep=',')

In [10]:
# A densidade demográfica de um bairro é uma informação muito
# importante para nossos clientes e é uma informação que precisa ser
# calculada. A densidade demográfica de um bairro é o resultado da divisão
# da população do bairro pela área do bairro.
df_densidade_demografica = df_populacao.join( df_bairros,( df_populacao.codigo == df_bairros.codigo ), 'left' ) \
                                       .select( df_bairros["codigo"].alias("codigo_bairro"),
                                                df_bairros["nome"].alias("nome_bairro"),
                                                df_bairros["area"],
                                                df_populacao["populacao"],
                                                ( df_populacao["populacao"] / df_bairros["area"]).alias('dens_demografica'))

In [11]:
# Análise do Fluxo de Pessoas por concorrentes
df_fluxo_pessoas = df_eventos_de_fluxo \
                       .select('codigo_concorrente',
                               'datetime', 
                               dayofmonth("datetime").alias('dia'), 
                               hour("datetime").alias('hora'),  
                               udf_get_periodo_dia(hour("datetime").alias('hora')).alias('periodo'),                                       
                               date_format("datetime", 'u').alias('semana_id'), 
                               date_format("datetime", 'E').alias('semana_nome')) \
                        .groupBy('codigo_concorrente','semana_id','semana_nome','periodo') \
                        .agg(f.count('codigo_concorrente').alias('qtd_visitas')) \
                        .sort(col('codigo_concorrente'),col('semana_id').asc(), col('periodo').asc())

In [12]:
df_fluxo_pessoas.dtypes

[('codigo_concorrente', 'string'),
 ('semana_id', 'string'),
 ('semana_nome', 'string'),
 ('periodo', 'string'),
 ('qtd_visitas', 'bigint')]

In [13]:
# Neste case seu desafio será implementar um serviço REST que retorne
# informações dos concorrentes como código, nome, endereço, preço
# praticado, fluxo médio de pessoas por dia da semana e por período do dia,
# bairro e a população e a densidade demográfica do bairro.

df_final = df_concorrentes.join( df_densidade_demografica, 
                                (df_concorrentes.codigo_bairro == df_densidade_demografica.codigo_bairro), 'left') \
                          .join( df_fluxo_pessoas, 
                                (df_concorrentes.codigo == df_fluxo_pessoas.codigo_concorrente), 'left') \
                          .select(df_concorrentes['codigo'].alias('codigo_concorrente'),
                                  df_concorrentes['nome'],
                                  df_concorrentes['categoria'],
                                  df_concorrentes['endereco'],
                                  df_concorrentes['municipio'],
                                  df_concorrentes['uf'],
                                  df_concorrentes['codigo_bairro'],
                                  df_densidade_demografica['area'],
                                  df_densidade_demografica['populacao'],
                                  df_densidade_demografica['dens_demografica'],
                                  df_fluxo_pessoas['semana_id'],
                                  df_fluxo_pessoas['semana_nome'],
                                  df_fluxo_pessoas['periodo'],
                                  df_fluxo_pessoas['qtd_visitas'])


In [14]:
# Filtro para visualizar o resultado final do processamento

df_final.select('codigo_concorrente','endereco','qtd_visitas','semana_nome','periodo','dens_demografica') \
        .filter( df_final['qtd_visitas'].isNotNull() ) \
        .sort(col('codigo_concorrente'),col('semana_id').asc(), col('periodo').asc()) \
        .show(n=100)

+------------------+--------------------+-----------+-----------+------------+------------------+
|codigo_concorrente|            endereco|qtd_visitas|semana_nome|     periodo|  dens_demografica|
+------------------+--------------------+-----------+-----------+------------+------------------+
|  1002487856442955|R. João Carlos do...|         12|        Mon|    (1)manha|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|          5|        Mon|    (3)noite|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|          2|        Tue|    (1)manha|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|          4|        Tue|    (2)tarde|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|          3|        Wed|    (1)manha|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|         10|        Wed|    (2)tarde|3799.5875141376205|
|  1002487856442955|R. João Carlos do...|          2|        Wed|    (3)noite|3799.5875141376205|
|  1002487856442955|

In [15]:
df_final.dtypes

[('codigo_concorrente', 'string'),
 ('nome', 'string'),
 ('categoria', 'string'),
 ('endereco', 'string'),
 ('municipio', 'string'),
 ('uf', 'string'),
 ('codigo_bairro', 'string'),
 ('area', 'string'),
 ('populacao', 'bigint'),
 ('dens_demografica', 'double'),
 ('semana_id', 'string'),
 ('semana_nome', 'string'),
 ('periodo', 'string'),
 ('qtd_visitas', 'bigint')]

In [17]:
# Gravacao arquivo final para disponibilização na camada de Micro Serviços

df_final.repartition(1).write.csv('hive_final', header=True, mode='overwrite', sep='|')

# MICRO SERVICO Concorrentes

## Exposicao Webservice para validar REST API