## Data Engineering Capstone Project
#### Introduction
This project deals with criminal information from the City of São Paulo - Brazil. The aim is to provide a solid and sanitized database for use in statistical and Business Inteligence analysis.

Steps for achieve the project:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Import Libraries

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from datetime import datetime
from sys import stdout
import os


### Create Functions

In [2]:
def normalize_cname(_df):
    """
    Standardizes column names from a dataframe (Spark)
    """
    cols = _df.columns
    df_result = _df
    for c in cols:
        df_result = df_result.withColumnRenamed(c, c.lower().replace(' ', '_'))
    
    return df_result

def clean_ncols(_df):
    """
    Replicate the dataframe to a new one with null fields
    """
    col_types = _df.dtypes
    df_result = _df
    
    for item in col_types:
        if item[1] == 'string':
            df_result = df_result.withColumn(item[0]\
                                            ,f.when(f.col(item[0]).isin('NULL','NaN', 'NA'), None)\
                                            .otherwise(f.col(item[0])) )
    return df_result
    

In [3]:
spark = SparkSession\
    .builder\
    .appName('caps')\
    .master("local[*]")\
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Describe and Gather Data
1. **Crime Data in Brazil:** comes from [Kaggle](https://www.kaggle.com/inquisitivecrow/crime-data-in-brazil). All crime data from 10 years of police work in Sao Paulo - Brazil. There are more than 16kk of records with several attributes of criminal fact.
3. **Current Properati Listing Information:** comes from [Kaggle](https://www.kaggle.com/properati-data/properties). Property attributes of 1.5 million Latin American listings.

#### Gather Data
The scope of this project is to create a Spark task responsible for making clean and reliable data available in a dimensional model.
For our project, we will use AWS S3 as repository of our dimensional model, the query engine that will access our data is AWS Athena.
Our analytical repository can be used by data scientists and industry intelligence analysts to identify patterns and correlations between data sets.
For example, a company's real estate business needs to apply properties from a pricing study, using the data provided by our repository to identify if the surroundings of the properties have many indictment crimes and how relevant these criminal acts are.

In [7]:
READ_PATH_CRIME_DATA = './data/crimes/*.csv'
READ_PATH_PROPERTIES = './data/properties/*.csv'

OUTPUT_CRIMES = 's3://hvale_dend/crimes'
OUTPUT_PROPERTIES = 's3://hvale_dend/properties'

#### Read dataset Crime data in Brazil and drop tuples duplicates

In [8]:
df_crime_crude = spark.read\
    .option('mergeSchema', 'true')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv(READ_PATH_CRIME_DATA)\
    .dropDuplicates()

df_crime_crude.printSchema()

root
 |-- NUM_BO: string (nullable = true)
 |-- ANO_BO: string (nullable = true)
 |-- ID_DELEGACIA: string (nullable = true)
 |-- NOME_DEPARTAMENTO: string (nullable = true)
 |-- NOME_SECCIONAL: string (nullable = true)
 |-- DELEGACIA: string (nullable = true)
 |-- NOME_DEPARTAMENTO_CIRC: string (nullable = true)
 |-- NOME_SECCIONAL_CIRC: string (nullable = true)
 |-- NOME_DELEGACIA_CIRC: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- MES: string (nullable = true)
 |-- DATA_OCORRENCIA_BO: string (nullable = true)
 |-- HORA_OCORRENCIA_BO: string (nullable = true)
 |-- FLAG_STATUS13: string (nullable = true)
 |-- RUBRICA: string (nullable = true)
 |-- DESDOBRAMENTO: string (nullable = true)
 |-- CONDUTA: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- CIDADE: string (nullable = true)
 |-- LOGRADOURO: string (nullable = true)
 |-- NUMERO_LOGRADOURO: string (nullable = true)
 |-- FLAG_STATUS22: string (nullabl

#### Renamed columns to lower case

In [9]:
df_crime_crude = normalize_cname(df_crime_crude)

In [10]:
qtd_reg_crimes = df_crime_crude.count()
print('Total records df_crime_crude:', qtd_reg_crimes)

Total records df_crime_crude: 16960893


### Step 2: Explore and Assess the Data

#### Replace strings 'NULL','NaN', 'NA' to null type

In [11]:
df_crime_crude = clean_ncols(df_crime_crude)

#### The columns are recorded as _col# records of badly formatted strings to our project records affected by these anomalies will be discarded because they represent a very low percentage.

In [12]:
df_crime = df_crime_crude.where('(_c30 is null and _c31 is null and _c32 is null)').drop('_c30', '_c31', '_c32')

In [32]:
df_crime.limit(5).toPandas()

Unnamed: 0,num_bo,ano_bo,id_delegacia,nome_departamento,nome_seccional,delegacia,nome_departamento_circ,nome_seccional_circ,nome_delegacia_circ,ano,...,logradouro,numero_logradouro,flag_status22,descr_tipo_pessoa,cont_pessoa,sexo_pessoa,idade_pessoa,cor,descr_profissao,descr_grau_instrucao
0,3807,2009,30307,DEMACRO,DEL.SEC.MOGI DAS CRUZES,02º D.P. MOGI DAS CRUZES,DEMACRO,DEL.SEC.MOGI DAS CRUZES,02º D.P. MOGI DAS CRUZES,2009,...,AV ANCHIETA,0,C,Vítima,1,M,41.0,Preta,APOSENTADO(A),1 Grau incompleto
1,2897,2009,10354,DECAP,DEL.SEC.8º SAO MATEUS,54º D.P. CID. TIRADENTES,DECAP,DEL.SEC.8º SAO MATEUS,54º D.P. CID. TIRADENTES,2009,...,DOUTOR GUILERME DE ABREU SODRE,790,C,Indiciado,6,M,,,,
2,2919,2009,10344,DECAP,DEL.SEC.6º SANTO AMARO,80º D.P. VILA JOANIZA,DECAP,DEL.SEC.2º SUL,35º D.P. JABAQUARA,2009,...,AC AV ENG ARMANDO DE ARRUDA PEREIRA,4678,C,Vítima,2,F,29.0,Branca,CONSULTOR(A),Superior completo
3,3744,2009,10365,DECAP,DEL.SEC.4º NORTE,90º D.P. PQ. NOVO MUNDO,DECAP,DEL.SEC.4º NORTE,90º D.P. PQ. NOVO MUNDO,2009,...,AC MARGINAL DIREITA DO TIETE,18,C,Partes,5,M,45.0,Branca,ANALISTA DE SISTEMAS,Superior completo
4,3363,2009,10308,DECAP,DEL.SEC.5º LESTE,52º D.P. PARQUE S.JORGE,DECAP,DEL.SEC.4º NORTE,19º D.P. VILA MARIA,2009,...,AC PONTE PRES JANIO QUADROS-AV MORVAN DI,0,C,Testemunha,3,M,41.0,Branca,AJUDANTE,1 Grau incompleto


#### Normalizing the sexo_pessoa stands for F (female)

In [14]:
df_crime = df_crime.withColumn('sexo_pessoa'\
                               , f.when(f.col('sexo_pessoa') == 'I', 'F')\
                               .otherwise(f.col('sexo_pessoa')))

#### Data dictionary: crime data

In [16]:
data_dict_crime = {
"num_bo": "integer",
"ano_bo": "integer",
"id_delegacia": "integer",
"nome_departamento": "string",
"nome_seccional": "string",
"delegacia": "string",
"nome_departamento_circ": "string",
"nome_seccional_circ": "string",
"nome_delegacia_circ": "string",
"ano": "integer",
"mes": "integer",
"data_ocorrencia_bo":"string",
"hora_ocorrencia_bo":"string",
"flag_status13": "string",
"rubrica": "string",
"desdobramento": "string",
"conduta": "string",
"latitude": "double",
"longitude": "double",
"cidade": "string",
"logradouro": "string",
"numero_logradouro": "string",
"flag_status22": "string",
"descr_tipo_pessoa": "string",
"cont_pessoa": "string",
"sexo_pessoa": "string",
"idade_pessoa": "string",
"cor": "string",
"descr_profissao": "string",
"descr_grau_instrucao": "string"
}

#### Converted all column types according to data dictionary

In [17]:
for k, v in data_dict_crime.items():
    df_crime = df_crime.withColumn(k, f.col(k).cast(v))

#### Data quality check: validate types

In [18]:
df_crime.printSchema()

root
 |-- num_bo: integer (nullable = true)
 |-- ano_bo: integer (nullable = true)
 |-- id_delegacia: integer (nullable = true)
 |-- nome_departamento: string (nullable = true)
 |-- nome_seccional: string (nullable = true)
 |-- delegacia: string (nullable = true)
 |-- nome_departamento_circ: string (nullable = true)
 |-- nome_seccional_circ: string (nullable = true)
 |-- nome_delegacia_circ: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- data_ocorrencia_bo: string (nullable = true)
 |-- hora_ocorrencia_bo: string (nullable = true)
 |-- flag_status13: string (nullable = true)
 |-- rubrica: string (nullable = true)
 |-- desdobramento: string (nullable = true)
 |-- conduta: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- cidade: string (nullable = true)
 |-- logradouro: string (nullable = true)
 |-- numero_logradouro: string (nullable = true)
 |-- flag_status22: string (nu

#### To optimize future transformations, our dataframe will be created from parquet files in a temporary directory.
We will use this feature only now because previous transformations made the data frame consumable

In [20]:
df_crime.repartition(50).write.mode('append').parquet('./data/tmp_crime_data_br/')

In [21]:
df_crime_temp = spark.read.parquet('./data/tmp_crime_data_br/*.parquet')
df_crime_temp.where("sexo_pessoa in ('F', 'M') and (latitude is not null or longitude is not null)")\
.createOrReplaceTempView('df_crime')

#### Create dimension police station

In [22]:
df_police_station = spark.sql("\
          SELECT DISTINCT id_delegacia, nome_departamento, nome_seccional, delegacia\
          FROM df_crime\
          ")

#### Data quality check: check empty data frame

In [23]:
df_police_station.count()

1135

#### Writing data from police station

In [None]:
df_police_station.repartition(1).write.mode('append').parquet('{}/{}'.format(OUTPUT_CRIMES, 'police_station'))

#### Create table fact crimes

In [25]:
df_crime_crudedf_fact = spark.sql("\
          SELECT DISTINCT \
            num_bo, \
            id_delegacia, \
            latitude, \
            longitude, \
            ano_bo, \
            nome_departamento_circ, \
            nome_seccional_circ, \
            nome_delegacia_circ, \
            ano, \
            mes, \
            data_ocorrencia_bo, \
            hora_ocorrencia_bo, \
            flag_status13, \
            rubrica, \
            desdobramento, \
            conduta, \
            cidade, \
            logradouro, \
            numero_logradouro, \
            flag_status22, \
            descr_tipo_pessoa, \
            cont_pessoa, \
            sexo_pessoa, \
            idade_pessoa, \
            cor, \
            descr_profissao, \
            descr_grau_instrucao \
          FROM df_crime\
          ")

In [None]:
df_fact.repartition(10).write.mode('append').parquet('{}/{}'.format(OUTPUT_CRIMES, 'fact_crimes'))


#### Read dataset Properties

In [27]:
df_properties_crude = spark.read\
    .option('mergeSchema', 'true')\
    .option('header', 'true')\
    .option('quote', '"')\
    .option('escape', '"')\
    .option('inferSchema', 'true')\
    .csv(READ_PATH_PROPERTIES)
df_properties = normalize_cname(df_properties_crude)
df_properties.printSchema()

root
 |-- id: string (nullable = true)
 |-- created_on: timestamp (nullable = true)
 |-- operation: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- place_name: string (nullable = true)
 |-- place_with_parent_names: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- geonames_id: string (nullable = true)
 |-- lat-lon: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- price_aprox_local_currency: double (nullable = true)
 |-- price_aprox_usd: double (nullable = true)
 |-- surface_total_in_m2: integer (nullable = true)
 |-- surface_covered_in_m2: integer (nullable = true)
 |-- price_usd_per_m2: double (nullable = true)
 |-- price_per_m2: double (nullable = true)
 |-- floor: integer (nullable = true)
 |-- rooms: integer (nullable = true)
 |-- expenses: integer (nullable = true

In [28]:
df_properties = df_properties_crude.where("lat-lon is not null")

#### Data dictionary: crime data

In [29]:
data_dict_properties = {
"id": "string",
"created_on": "string",
"country_name": "string",
"lat": "double",
"lon": "double",
"property_type":"string",
"place_name":"string",
"country_name":"string",
"place_with_parent_names":"string",
"price_aprox_local_currency":"double",
"price_aprox_usd":"double",
"price_usd_per_m2":"double",
"price_per_m2":"double"  
}

#### Converted all column types according to data dictionary

In [30]:
for k, v in data_dict_properties.items():
    df_properties = df_properties.withColumn(k, f.col(k).cast(v))

In [31]:
df_properties_output = df_properties.select('id', \
                     'created_on', \
                     'country_name', \
                     'lat', \
                     'lon', \
                     'property_type',\
                     'place_name',\
                     'country_name',\
                     'place_with_parent_names',\
                     'price_aprox_local_currency',\
                     'price_aprox_usd',
                     'price_usd_per_m2',\
                     'price_per_m2')\
    .distinct()

In [None]:
df_properties_output.repartition(1).write.mode('append').parquet(OUTPUT_PROPERTIES)