
#PUC-RIO
#Aluno: Eduardo Baptistone

##MVP - Engenharia de Dados

###Objetivo

A base de dados a ser analisada representa um sistema de controle de acesso de criança em eventos de instiruições religiosas. Nessa avaliação vamos analisar os dados da criança, do responsável e as ação de acesso no evento que fica registrado. As crianças podem ser classificadas pela idade ou pela sua condição, pois isso vai determinar o nível de cuidado que aquela criança exige. O objetivo central é observar a tendência de crescimento da presença de crianças e a proximidade geográfica que elas tem com a organização. Essas informações são variáveis no tempo, porque se tratanto de instituições religiosas, a presença podem ou não ocorrer naquele evento. As seguintes perguntas serão feitas a essa base de dados com o propósito de avaliar o objetivo principal:

**Pergunta 1**<br>
Qual a quantidade de crianças Especiais que frequentaram, em média por mês, para os eventos de domingo pela manhã, por organização, no primeiro semestre?<br><br>
**Pergunta 2**<br>
Qual a quantidade de crianças registradas por bairro na organização?<br><br>
**Pergunta 3**<br>
Qual média mensal de crianças do evento de domingo pela manhã, para classes kids, no primeiro semestre para a organização ibazs?<br>



##Coleta de Dados

Trabalho de MVP para a disciplina de Engenhraira de Dados onde devemos ser capaz de construir um pipeline de dados utilizando tecnologias na nuvem. O pipeline irá envolver a busca, coleta, modelagem, carga e análise dos dados.
O dataset utilizado é baseado em um aplicação desenvolvida por mim, que controla acesso de crianças em eventos de instituição religiosa. O app se chama My Children.
O banco principal do aplicativo é o Firebase. Como esse banco é NoSQL as informações da base são replicadas para o BigQuery. No BigQuery foram realizadas consultas para gerar os arquivos CSV utilzados no trabalho.
Os dados sensíveis foram retirados ou anonimizados antes da coleta ser inserida no Databricks.

![Coleta de dados](files/images/Coleta_bigquery.jpg)


Os aquivos CSV são carregados manualmente no file system do Databricks para transformação em tabelas. A estrutura de tabelas segue um modelo de níveis de tratamento, onde cada nível representa um database de tabelas:
LEVEL1 - tabelas com a estrutura e dados originais do CSV
LEVEL2 - tabelas que passaram por processo de limpeza e inserção de novos campos
LEVEL3 - tabelas dimensão, preparadas para alcançar o objetivo nas consultas


| **Arquivos CSV** | **LEVEL 1**              | **LEVEL 2**                 | **LEVEL 3**               |
|------------------|--------------------------|-----------------------------|---------------------------|
| tb_children.csv  | tb_children<br>1937 rows | tb_children_l2<br>1923 rows | dim_children<br>1923 rows |
| tb_events.csv    | tb_events<br>6239 rows   | tb_events_l2<br>6155 rows   | dim_events<br>6155 rows   |
| tb_parents.csv   | tb_parents<br>486 rows   | tb_parents_l2<br>478 rows   | dim_parents<br>478 rows   |
|                  |                          |                             | dim_classes<br>5 rows     |



###Descrição das tabelas
####tb_children
Registro da criança no sistema com dados pessoais

####tb_parents
Registro dos pais ou responsáveis com dados pessoais

####tb_events
Registro da presença da criança no evento

###Transformação dos arquivos CSV em Dataframes

In [0]:
# Transforma os arquivos CSV em dataframes organizando cabeçalhos e tipos dos dados
# Localização dos arquivos CSV
file_tb_children_location = "/FileStore/tables/tb_children.csv"
file_tb_events_location = "/FileStore/tables/tb_events.csv"
file_tb_parents_location = "/FileStore/tables/tb_parents.csv"
file_type = "csv"

# CSV options
delimiter = ","

# TB_CHILDREN
#  Aplicar options para formar o schema com os tipos corretos
df_children = spark.read.format(file_type) \
  .option("treatEmptyValuesAsNulls", "true") \
  .option("inferSchema", "true") \
  .option("mode","DROPMALFORMED") \
  .option("header", "true") \
  .option("sep", delimiter) \
  .load(file_tb_children_location)

df_children.printSchema()

# Count rows
row_count = df_children.count()
print(f'The df_children has {row_count} rows.')
print(f'-'*30)

# TB_EVENTS
#  Aplicar options para formar o schema com os tipos corretos
df_events = spark.read.format(file_type) \
  .option("treatEmptyValuesAsNulls", "true") \
  .option("inferSchema", "true") \
  .option("mode","DROPMALFORMED") \
  .option("header", "true") \
  .option("sep", delimiter) \
  .load(file_tb_events_location)

df_events.printSchema()

# Count rows
row_count = df_events.count()
print(f'The df_events has {row_count} rows.')
print(f'-'*30)

# TB_PARENTS 
#  Aplicar options para formar o schema com os tipos corretos
df_parents = spark.read.format(file_type) \
  .option("treatEmptyValuesAsNulls", "true") \
  .option("inferSchema", "true") \
  .option("mode","DROPMALFORMED") \
  .option("header", "true") \
  .option("sep", delimiter) \
  .load(file_tb_parents_location)

df_parents.printSchema()

# Count rows
row_count = df_parents.count()
print(f'The df_parents has {row_count} rows.')
print(f'-'*40)


root
 |-- org: string (nullable = true)
 |-- mat: integer (nullable = true)
 |-- matResp: long (nullable = true)
 |-- codigo: integer (nullable = true)
 |-- dtnasc: date (nullable = true)
 |-- dtinicio: timestamp (nullable = true)
 |-- genero: string (nullable = true)
 |-- especial: boolean (nullable = true)
 |-- restriAlimentar: boolean (nullable = true)
 |-- visitante: boolean (nullable = true)

The df_children has 1937 rows.
------------------------------
root
 |-- org: string (nullable = true)
 |-- mat: integer (nullable = true)
 |-- evento: timestamp (nullable = true)
 |-- dtEnt: date (nullable = true)
 |-- hrEnt: timestamp (nullable = true)
 |-- dtSai: date (nullable = true)
 |-- hrSai: timestamp (nullable = true)
 |-- dtCon: date (nullable = true)
 |-- hrCon: timestamp (nullable = true)

The df_events has 6239 rows.
------------------------------
root
 |-- matResp: string (nullable = true)
 |-- org: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: string 

In [0]:
#Transforma os dataframes em tabelas e armazena no file system do Databricks

#Apaga todas as tabelas do file system
dbutils.fs.rm('dbfs:/user/hive/warehouse/level1.db/tb_children', True)
dbutils.fs.rm('dbfs:/user/hive/warehouse/level1.db/tb_events', True)
dbutils.fs.rm('dbfs:/user/hive/warehouse/level1.db/tb_parents', True)


# Cria database LEVEL1 para receber as tabelas
spark.sql("CREATE DATABASE IF NOT EXISTS level1")

# Cria tabelas no database default

# TB_CHILDREN

permanent_table_name = "level1.tb_children"
query = f"DROP TABLE IF EXISTS {permanent_table_name}"
spark.sql(query)
#spark.sql(f"CREATE TABLE {permanent_table_name}")

df_children.write.format("delta").options(mode='overwrite').saveAsTable(permanent_table_name)

# TB_EVENTS
permanent_table_name = "level1.tb_events"
query = f"DROP TABLE IF EXISTS {permanent_table_name}"
spark.sql(query)

df_events.write.format("delta").options(mode='overwrite').saveAsTable(permanent_table_name)

# TB_PARENTS
permanent_table_name = "level1.tb_parents"
query = f"DROP TABLE IF EXISTS {permanent_table_name}"
spark.sql(query)

df_parents.write.format("delta").options(mode='overwrite').saveAsTable(permanent_table_name)

In [0]:
#%run "./Tabelas LEVEL 1"

In [0]:
#%run "./Tabelas LEVEL 2"

In [0]:
#%run "./Tabelas LEVEL 3"

In [0]:
#%run "./Pergunta 1"

In [0]:
#%run "./Pergunta 2"

In [0]:
#%run "./Pergunta 3 - Análise Final"