<a href="https://colab.research.google.com/github/AndersonGabrielCalasans/Projeto-Final-Engenharia-de-Dados-SoulCode/blob/main/Tratamento_NY_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 👮‍♀️ **Tratamento do Data Frame de Nova York com PySpark**

Link para a documentação completa do projeto: [clique aqui](https://www.notion.so/Mindful-Data-Consultoria-d495964059c34acb9c8fe3d21dbb5cf6)


## **📚 1 - Instalação e importação das bibliotecas**

In [1]:
# Instaladores 

# Google Cloud Storage
%%capture
%pip install gcsfs

# PySpark
%pip install pyspark

# Conector MySQL
%pip install mysql-connector-python
%pip install PyMySQL

In [2]:
# Conexão do Storage
from google.cloud import storage

# Importa sistema operacional
import os

# Conector do MySQL
from mysql.connector import Error
from sqlalchemy import create_engine

# Pandas
import pandas as pd

# Pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import NullType

# MongoClient
from pymongo import MongoClient
import pymongo

# setando configurações de display do pandas
pd.set_option('display.max_columns',100)

## 🔗 **2 - Conexões com o DataLake e SparkSession**

### 🔊 **Conectando com a Google Cloud Storage**

In [3]:
# Configuração da chave de segurança
serviceAccount = '/content/chave-projeto-final.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

### 🔊 **SparkSession**

In [4]:
# Criando a spark session com o config de conector da gcs
spark = (
    SparkSession.builder
                .master('local')
                .appName('ProjetoFinal')
                .config('spark.ui.port', '4050')
                .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
                .getOrCreate()
)

## ⚒️ **3 - Extração das bases de dados**

### 📋 **Criando Schema**

In [5]:
# Criando esquema de dados para determinar seus tipos
esquema = (
    StructType([
        StructField('CMPLNT_NUM', IntegerType()), StructField('CMPLNT_FR_DT', StringType()),
        StructField('CMPLNT_FR_TM', StringType()), StructField('CMPLNT_TO_DT', StringType()),
        StructField('CMPLNT_TO_TM', StringType()), StructField('ADDR_PCT_CD', StringType()),
        StructField('RPT_DT', StringType()), StructField('KY_CD', IntegerType()),
        StructField('OFNS_DESC', StringType()), StructField('PD_CD', FloatType()),
        StructField('PD_DESC', StringType()), StructField('CRM_ATPT_CPTD_CD', StringType()),
        StructField('LAW_CAT_CD', StringType()), StructField('BORO_NM', StringType()),
        StructField('LOC_OF_OCCUR_DESC', StringType()), StructField('PREM_TYP_DESC', StringType()),
        StructField('JURIS_DESC', StringType()), StructField('JURISDICTION_CODE', FloatType()),
        StructField('PARKS_NM', StringType()), StructField('HADEVELOPT', StringType()),
        StructField('HOUSING_PSA', StringType()), StructField('X_COORD_CD ', FloatType()),
        StructField('Y_COORD_CD', FloatType()), StructField('SUSP_AGE_GROUP', StringType()),
        StructField('SUSP_RACE', StringType()), StructField('SUSP_SEX', StringType()),
        StructField('TRANSIT_DISTRICT', FloatType()), StructField('Latitude', FloatType()),
        StructField('Longitude', FloatType()), StructField('Lat_Lon', StringType()),
        StructField('PATROL_BORO', StringType()), StructField('STATION_NAME', StringType()),
        StructField('VIC_AGE_GROUP', StringType()), StructField('VIC_RACE', StringType()),
        StructField('VIC_SEX', StringType())]))

### 📋 **Configurando Data Frame**

In [6]:
# Fazer acesso a bucket
client = storage.Client()

# Criar uma variável
bucket = client.get_bucket('projeto_final_soulcode')

# Escolher arquivo dentro da bucket
bucket.blob('Arquivos-PreTratados/ny_geral_orig.csv')

# Criar uma variável para receber o caminho do arquivo
path = 'gs://projeto_final_soulcode/Arquivos-PreTratados/ny_geral_orig.csv'

In [7]:
# Criando Data Frame
df = (
    spark.read.format('csv')
              .option('header', 'true')
              .option('inferschema', 'false')
              .option('delimiter', ',')
              .load(path, schema = esquema)
)

In [8]:
# Verificando esquema
df.printSchema()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- ADDR_PCT_CD: string (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: float (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- JURISDICTION_CODE: float (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: string (nullable = true)
 |-- X_COORD_CD : float (nullable = true)
 |-- Y_COORD_CD: float (nullable = true)
 |-- SUSP_AGE_GROUP:

In [9]:
# Visualizando dados
df.show(truncate=False)

+----------+------------+------------+------------+------------+-----------+----------+-----+-------------------------------+-----+-------+----------------+----------+-------+-----------------+-------------+-------------------+-----------------+--------+----------+-----------+-----------+----------+--------------+------------------------+--------+----------------+---------+----------+-----------------------------+-----------+------------+-------------+------------------------+-------+
|CMPLNT_NUM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|RPT_DT    |KY_CD|OFNS_DESC                      |PD_CD|PD_DESC|CRM_ATPT_CPTD_CD|LAW_CAT_CD|BORO_NM|LOC_OF_OCCUR_DESC|PREM_TYP_DESC|JURIS_DESC         |JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD |Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE               |SUSP_SEX|TRANSIT_DISTRICT|Latitude |Longitude |Lat_Lon                      |PATROL_BORO|STATION_NAME|VIC_AGE_GROUP|VIC_RACE                |VIC_SEX|
+----------+--------

In [10]:
# Conferindo quantidade de linhas e colunas do df
print(f'Quantidade de colunas do df:', len(df.columns))
print(f'Quantidade de linhas do df:', df.count())

Quantidade de colunas do df: 35
Quantidade de linhas do df: 1560


In [11]:
# BackUp do dataframe
dfback = df

##📝 **6 - Tratamento do Data Frame**

###▶️ **Renomeando colunas**

In [12]:
# Renomeando colunas
df = (
    df.withColumnRenamed('CMPLNT_NUM', 'id_ocorrencia')
      .withColumnRenamed('CMPLNT_FR_DT', 'dt_inicio_ocorrencia')
      .withColumnRenamed('CMPLNT_FR_TM', 'hr_inicio_ocorrencia')
      .withColumnRenamed('CMPLNT_TO_DT', 'dt_final_ocorrencia')
      .withColumnRenamed('CMPLNT_TO_TM', 'hr_final_ocorrencia')
      .withColumnRenamed('ADDR_PCT_CD', 'cod_delegacia')
      .withColumnRenamed('RPT_DT', 'dt_comunicacao').withColumnRenamed('KY_CD', 'cod_class_ofensa')
      .withColumnRenamed('OFNS_DESC', 'tipo_crime').withColumnRenamed('PD_CD', 'cod_class_interna')
      .withColumnRenamed('PD_DESC', 'desc_class_interna')
      .withColumnRenamed('CRM_ATPT_CPTD_CD', 'tentado_consumado')
      .withColumnRenamed('LAW_CAT_CD', 'nivel_ofensa').withColumnRenamed('BORO_NM', 'distrito')
      .withColumnRenamed('LOC_OF_OCCUR_DESC', 'loc_esp_ocorrencia')
      .withColumnRenamed('PREM_TYP_DESC', 'desc_local')
      .withColumnRenamed('JURIS_DESC', 'desc_foro').withColumnRenamed('JURISDICTION_CODE', 'cod_foro')
      .withColumnRenamed('PARKS_NM', 'nome_parque').withColumnRenamed('HADEVELOPT', 'nome_nycha')
      .withColumnRenamed('HOUSING_PSA', 'cod_nycha').withColumnRenamed('X_COORD_CD ', 'coord_x')
      .withColumnRenamed('Y_COORD_CD', 'coord_y')
      .withColumnRenamed('SUSP_AGE_GROUP', 'faixa_etaria_suspeito')
      .withColumnRenamed('SUSP_RACE', 'cor_pele_suspeito')
      .withColumnRenamed('SUSP_SEX', 'sexo_suspeito')
      .withColumnRenamed('TRANSIT_DISTRICT', 'cod_dist_transito')
      .withColumnRenamed('Latitude', 'latitude')
      .withColumnRenamed('Longitude', 'longitude').withColumnRenamed('Lat_Lon', 'latitude_longitude')
      .withColumnRenamed('PATROL_BORO', 'distrito_patrulha')
      .withColumnRenamed('STATION_NAME', 'estacao_transporte')
      .withColumnRenamed('VIC_AGE_GROUP', 'faixa_etaria_vitima')
      .withColumnRenamed('VIC_RACE', 'cor_pele_vitima')
      .withColumnRenamed('VIC_SEX', 'sexo_vitima'))

In [13]:
# Verificando alteração
df.printSchema()

root
 |-- id_ocorrencia: integer (nullable = true)
 |-- dt_inicio_ocorrencia: string (nullable = true)
 |-- hr_inicio_ocorrencia: string (nullable = true)
 |-- dt_final_ocorrencia: string (nullable = true)
 |-- hr_final_ocorrencia: string (nullable = true)
 |-- cod_delegacia: string (nullable = true)
 |-- dt_comunicacao: string (nullable = true)
 |-- cod_class_ofensa: integer (nullable = true)
 |-- tipo_crime: string (nullable = true)
 |-- cod_class_interna: float (nullable = true)
 |-- desc_class_interna: string (nullable = true)
 |-- tentado_consumado: string (nullable = true)
 |-- nivel_ofensa: string (nullable = true)
 |-- distrito: string (nullable = true)
 |-- loc_esp_ocorrencia: string (nullable = true)
 |-- desc_local: string (nullable = true)
 |-- desc_foro: string (nullable = true)
 |-- cod_foro: float (nullable = true)
 |-- nome_parque: string (nullable = true)
 |-- nome_nycha: string (nullable = true)
 |-- cod_nycha: string (nullable = true)
 |-- coord_x: float (nullable = 

###▶️ **Conferindo se há espaços extras**

In [14]:
# Usando o trim que tira apenas os espaços do início e fim da string
df = df.select([F.trim(F.col(c)).alias(c) for c in df.columns])
df.show(5, truncate=False)

+-------------+--------------------+--------------------+-------------------+-------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------------+-----------------+------------+--------+------------------+----------+----------------+--------+-----------+----------+---------+---------+--------+---------------------+-----------------+-------------+-----------------+---------+----------+-----------------------------+-----------------+------------------+-------------------+---------------+-----------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|dt_final_ocorrencia|hr_final_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime                     |cod_class_interna|desc_class_interna|tentado_consumado|nivel_ofensa|distrito|loc_esp_ocorrencia|desc_local|desc_foro       |cod_foro|nome_parque|nome_nycha|cod_nycha|coord_x  |coord_y |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|cod_dist_transito

###▶️ **Conferindo se há linhas duplicadas**

In [15]:
# Contando quantas linhas duplicadas existem no df
total_linhas = df.count()
total_linhas_distintas = df.distinct().count()

print('O total de linhas duplicadas é de: ', total_linhas - total_linhas_distintas)

O total de linhas duplicadas é de:  0


###▶️ **Conferindo se há dados nulos**

In [16]:
# Função para mostrar a quantidade de dados nulos, nan e vazios de cada coluna
df.select(([F.count(F.when(F.col(c).contains('None') | 
                       F.col(c).contains('NULL') | 
                       (F.col(c) == '' ) | 
                       F.col(c).isNull() | 
                       F.isnan(c), c)).alias(c) for c in df.columns])
).show()

+-------------+--------------------+--------------------+-------------------+-------------------+-------------+--------------+----------------+----------+-----------------+------------------+-----------------+------------+--------+------------------+----------+---------+--------+-----------+----------+---------+-------+-------+---------------------+-----------------+-------------+-----------------+--------+---------+------------------+-----------------+------------------+-------------------+---------------+-----------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|dt_final_ocorrencia|hr_final_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime|cod_class_interna|desc_class_interna|tentado_consumado|nivel_ofensa|distrito|loc_esp_ocorrencia|desc_local|desc_foro|cod_foro|nome_parque|nome_nycha|cod_nycha|coord_x|coord_y|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|cod_dist_transito|latitude|longitude|latitude_longitude|distrito_patrulha|estacao_transporte

In [17]:
# Análise aprofundada das colunas que tem dados nulos
df.groupBy("distrito").count().show(truncate=False)

+-------------+-----+
|distrito     |count|
+-------------+-----+
|null         |1509 |
|QUEENS       |5    |
|BROOKLYN     |16   |
|BRONX        |15   |
|MANHATTAN    |11   |
|STATEN ISLAND|4    |
+-------------+-----+



In [18]:
df.show(5, truncate=False)

+-------------+--------------------+--------------------+-------------------+-------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------------+-----------------+------------+--------+------------------+----------+----------------+--------+-----------+----------+---------+---------+--------+---------------------+-----------------+-------------+-----------------+---------+----------+-----------------------------+-----------------+------------------+-------------------+---------------+-----------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|dt_final_ocorrencia|hr_final_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime                     |cod_class_interna|desc_class_interna|tentado_consumado|nivel_ofensa|distrito|loc_esp_ocorrencia|desc_local|desc_foro       |cod_foro|nome_parque|nome_nycha|cod_nycha|coord_x  |coord_y |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|cod_dist_transito

###▶️ **Drop de colunas**

Decidimos dar drop nas colunas "coord_x", "coord_y", "latitude", "longitude", pois já temos os dados de localização global na coluna "latitude_longitude".

In [19]:
# Dropando colunas  
df = df.drop("coord_x","coord_y","latitude","longitude")

# Verificando alteração
df.show(5, truncate=False)

+-------------+--------------------+--------------------+-------------------+-------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------------+-----------------+------------+--------+------------------+----------+----------------+--------+-----------+----------+---------+---------------------+-----------------+-------------+-----------------+-----------------------------+-----------------+------------------+-------------------+---------------+-----------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|dt_final_ocorrencia|hr_final_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime                     |cod_class_interna|desc_class_interna|tentado_consumado|nivel_ofensa|distrito|loc_esp_ocorrencia|desc_local|desc_foro       |cod_foro|nome_parque|nome_nycha|cod_nycha|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|cod_dist_transito|latitude_longitude           |distrito_patrulha|estacao_tr

In [20]:
# Dropando colunas em que a grande maioria dos dados são nulos (+95%)

# Listando colunas
cols = ('dt_final_ocorrencia', 'hr_final_ocorrencia', 'cod_class_interna', 'desc_class_interna',
        'distrito', 'desc_local', 'cod_foro', 'nome_parque', 'nome_nycha', 'cod_nycha',
        'cod_dist_transito', 'distrito_patrulha', 'estacao_transporte')

# Dropando todas de vez
df = df.drop(*cols)

# Verificando alteração
df.show(5, truncate=False)

+-------------+--------------------+--------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+-----------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime                     |tentado_consumado|nivel_ofensa|loc_esp_ocorrencia|desc_foro       |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|
+-------------+--------------------+--------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+--------

In [21]:
# Conferindo quantidade de linhas e colunas do df
print(f'Quantidade de colunas do df:', len(df.columns))
print(f'Quantidade de linhas do df:', df.count())

Quantidade de colunas do df: 18
Quantidade de linhas do df: 1560


###▶️ **Criação/junção de colunas**

In [22]:
# Criando coluna juntando os dados de data e hora
df = df.withColumn('data_ocorrencia', 
                   F.concat(F.col('dt_inicio_ocorrencia'),
                            F.lit(' '), 
                            F.col('hr_inicio_ocorrencia')))

# Verificando alteração
df.show(truncate=False)

+-------------+--------------------+--------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|tipo_crime                     |tentado_consumado|nivel_ofensa|loc_esp_ocorrencia|desc_foro          |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |
+-------------+--------------------+--------------------+-------------+--------------+----------------+-------------------------------+-----------------+------------+------------------+-------------------+---------------------+------------------------+

In [23]:
# Criando a coluna distrito a partir do cod da delegacia
df = df.withColumn('distrito', F.when(F.col('cod_delegacia') < 35, 'Manhattan')
                          .when(F.col('cod_delegacia') < 53, 'Bronx')
                          .when(F.col('cod_delegacia') < 95, 'Brooklyn')
                          .when(F.col('cod_delegacia') < 116, 'Queens')
                          .when(F.col('cod_delegacia') < 124, 'Staten Island')
                          .otherwise('Desconhecido')
)

# Verificando alteração
df.show(5)

+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+--------------------+-------------------+---------------+-----------+-------------------+---------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|          tipo_crime|tentado_consumado|nivel_ofensa|loc_esp_ocorrencia|       desc_foro|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|  latitude_longitude|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|    data_ocorrencia| distrito|
+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+--------------------+-------------------+---------------+--------

In [24]:
# Criando coluna periodo_ocorrencia
df = df.withColumn('periodo_ocorrencia', F.when(F.col("hr_inicio_ocorrencia").between("06:00:00", "11:59:59"), 'Manhã')
                                    .when(F.col("hr_inicio_ocorrencia").between("12:00:00", "17:59:59"), 'Tarde')
                                    .when(F.col("hr_inicio_ocorrencia").between("18:00:00", "23:59:59"), 'Noite')
                                    .when(F.col("hr_inicio_ocorrencia").between("00:00:00", "05:59:59"), 'Madrugada')
                                    .otherwise(F.col('hr_inicio_ocorrencia'))
)

# Verificando alteração
df.show(5)

+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+--------------------+-------------------+---------------+-----------+-------------------+---------+------------------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|          tipo_crime|tentado_consumado|nivel_ofensa|loc_esp_ocorrencia|       desc_foro|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|  latitude_longitude|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|    data_ocorrencia| distrito|periodo_ocorrencia|
+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+--------------------+------

In [25]:
# Criando uma coluna com a informação da cidade
df = df.withColumn('cidade', F.lit('Nova York'))

# Verificando alteração
df.show(5)

+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+--------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|id_ocorrencia|dt_inicio_ocorrencia|hr_inicio_ocorrencia|cod_delegacia|dt_comunicacao|cod_class_ofensa|          tipo_crime|tentado_consumado|nivel_ofensa|loc_esp_ocorrencia|       desc_foro|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|  latitude_longitude|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|    data_ocorrencia| distrito|periodo_ocorrencia|   cidade|
+-------------+--------------------+--------------------+-------------+--------------+----------------+--------------------+-----------------+------------+------------------+----------------+---------------------+-----------------+-------------+-------

In [26]:
# Drop de colunas que não serão necessárias para análise

# Listando colunas
cols = ('id_ocorrencia', 'dt_inicio_ocorrencia', 'hr_inicio_ocorrencia', 'cod_delegacia', 'nivel_ofensa', 'cod_class_ofensa', 'dt_comunicacao')

# Dropando todas
df = df.drop(*cols)

# Verificando alteração
df.show(5, truncate=False)

+-------------------------------+-----------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|tipo_crime                     |tentado_consumado|loc_esp_ocorrencia|desc_foro       |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|data_ocorrencia    |distrito |periodo_ocorrencia|cidade   |
+-------------------------------+-----------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|MURDER & NON-NEGL. MANSLAUGHTER|COMPLETED        |INSIDE            |OTHER           |25-44                |WHITE HISPANIC   |M            |(40.804012949, -73.878

In [27]:
# Conferindo quantidade de linhas e colunas do df
print(f'Quantidade de colunas do df:', len(df.columns))
print(f'Quantidade de linhas do df:', df.count())

Quantidade de colunas do df: 15
Quantidade de linhas do df: 1560


###▶️ **Mudando tipos de dados das colunas**

Para conseguir mudar a coluna data_ocorrencia do tipo string para tipo timestamp devemos substituir os "/" por "-".


In [28]:
# Replace dos valores
df = df.withColumn('data_ocorrencia', F.regexp_replace('data_ocorrencia', '/', '-'))

# Verificando alterações
df.show(truncate=False)

+-------------------------------+-----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime                     |tentado_consumado|loc_esp_ocorrencia|desc_foro          |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+-------------------------------+-----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|MURDER & NON-NEGL. MANSLAUGHTER|COMPLETED        |INSIDE            |OTHER              |25-4

In [29]:
# Converter data do formato string para o formado date - yyyy-MM-dd
df = df.withColumn('data_ocorrencia', F.to_timestamp('data_ocorrencia','MM-dd-yyyy HH:mm:ss'))

# Verificando alterações
df.show(5, truncate=False)

+-------------------------------+-----------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|tipo_crime                     |tentado_consumado|loc_esp_ocorrencia|desc_foro       |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|data_ocorrencia    |distrito |periodo_ocorrencia|cidade   |
+-------------------------------+-----------------+------------------+----------------+---------------------+-----------------+-------------+-----------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|MURDER & NON-NEGL. MANSLAUGHTER|COMPLETED        |INSIDE            |OTHER           |25-44                |WHITE HISPANIC   |M            |(40.804012949, -73.878

In [30]:
df.printSchema()

root
 |-- tipo_crime: string (nullable = true)
 |-- tentado_consumado: string (nullable = true)
 |-- loc_esp_ocorrencia: string (nullable = true)
 |-- desc_foro: string (nullable = true)
 |-- faixa_etaria_suspeito: string (nullable = true)
 |-- cor_pele_suspeito: string (nullable = true)
 |-- sexo_suspeito: string (nullable = true)
 |-- latitude_longitude: string (nullable = true)
 |-- faixa_etaria_vitima: string (nullable = true)
 |-- cor_pele_vitima: string (nullable = true)
 |-- sexo_vitima: string (nullable = true)
 |-- data_ocorrencia: timestamp (nullable = true)
 |-- distrito: string (nullable = false)
 |-- periodo_ocorrencia: string (nullable = true)
 |-- cidade: string (nullable = false)



###▶️ **Encontrando e adequando inconsistências**

#### ✏️ **tipo_crime**

In [31]:
# Inconsistências: dados em inglês
df.select('tipo_crime').distinct().show(truncate=False)

+-------------------------------+
|tipo_crime                     |
+-------------------------------+
|HOMICIDE-NEGLIGENT-VEHICLE     |
|MURDER & NON-NEGL. MANSLAUGHTER|
|HOMICIDE-NEGLIGENT,UNCLASSIFIE |
+-------------------------------+



In [32]:
# Traduzindo dados
df = (df.withColumn('tipo_crime', F.when(F.col('tipo_crime') == 'HOMICIDE-NEGLIGENT-VEHICLE', 'Homicídio Culposo') 
        .otherwise(F.col('tipo_crime')))
        .withColumn('tipo_crime', F.when(F.col('tipo_crime') == 'MURDER & NON-NEGL. MANSLAUGHTER', 'Homicídio Doloso') 
        .otherwise(F.col('tipo_crime')))
        .withColumn('tipo_crime', F.when(F.col('tipo_crime') == 'HOMICIDE-NEGLIGENT,UNCLASSIFIE', 'Homicídio Culposo') 
        .otherwise(F.col('tipo_crime')))
)

df.show(truncate=False)

+----------------+-----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |tentado_consumado|loc_esp_ocorrencia|desc_foro          |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+-----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|COMPLETED        |INSIDE            |OTHER              |25-44                |WHITE HISPANIC          |M            |(40

In [33]:
# Verificando se mudanças foram feitas
df.select('tipo_crime').distinct().show(truncate=False)

+-----------------+
|tipo_crime       |
+-----------------+
|Homicídio Doloso |
|Homicídio Culposo|
+-----------------+



#### ✏️ **tentado_consumado**

In [34]:
# Inconsistências: dados em inglês
df.select('tentado_consumado').distinct().show(truncate=False)

+-----------------+
|tentado_consumado|
+-----------------+
|ATTEMPTED        |
|COMPLETED        |
+-----------------+



In [35]:
# Há apenas um dado em que consta como ATTEMPTED (tentado)
# para nossa análise, será relevante apenas crimes consumados
df.groupby('tentado_consumado').count().show(truncate=False)

+-----------------+-----+
|tentado_consumado|count|
+-----------------+-----+
|ATTEMPTED        |1    |
|COMPLETED        |1559 |
+-----------------+-----+



In [36]:
# Dando drop na linha em que o homicidio não foi consumado
df = df.where(F.col('tentado_consumado') == 'COMPLETED')

df.count()

1559

In [37]:
# Faremos um drop na coluna tentado_consumado, pois ela consta apenas um dado
df = df.drop('tentado_consumado')

df.show(truncate=False)

+----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |loc_esp_ocorrencia|desc_foro          |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|INSIDE            |OTHER              |25-44                |WHITE HISPANIC          |M            |(40.804012949, -73.878331833)|25-44              |BLACK                   |

#### ✏️ **loc_esp_ocorrencia**

In [38]:
# Inconsistência: dados incompletos
df.select('loc_esp_ocorrencia').distinct().show(truncate=False)

+------------------+
|loc_esp_ocorrencia|
+------------------+
|OPPOSITE OF       |
|REAR OF           |
|null              |
|INSIDE            |
|OUTSIDE           |
|FRONT OF          |
+------------------+



In [39]:
# Faremos um drop, pois estes dados sozinhos não nos trazem insights
df = df.drop('loc_esp_ocorrencia')

df.show(truncate=False)

+----------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro          |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+-------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|OTHER              |25-44                |WHITE HISPANIC          |M            |(40.804012949, -73.878331833)|25-44              |BLACK                   |M          |2018-07-09 14:25:00|Bronx        |Tarde             |Nova York|


#### ✏️ **desc_foro**

In [40]:
# Inconsistências: dados em inglês
df.select('desc_foro').distinct().show(truncate=False)

+-------------------+
|desc_foro          |
+-------------------+
|N.Y. POLICE DEPT   |
|N.Y. TRANSIT POLICE|
|N.Y. HOUSING POLICE|
|OTHER              |
+-------------------+



In [41]:
# Traduzindo os dados
df = (df.withColumn('desc_foro', F.when(F.col('desc_foro') == 'N.Y. POLICE DEPT', 'Departamento de Polícia de Nova York') 
        .otherwise(F.col('desc_foro')))
        .withColumn('desc_foro', F.when(F.col('desc_foro') == 'N.Y. TRANSIT POLICE', 'Polícia de Trânsito de Nova York') 
        .otherwise(F.col('desc_foro')))
        .withColumn('desc_foro', F.when(F.col('desc_foro') == 'N.Y. HOUSING POLICE', 'Polícia de Moradia de Nova York') 
        .otherwise(F.col('desc_foro')))
        .withColumn('desc_foro', F.when(F.col('desc_foro') == 'OTHER', 'Outro')
        .otherwise(F.col('desc_foro')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |WHITE HISPANIC          |M            |(40.804012949, -73.878331833)|25-44              |BLACK                   |M       

In [42]:
# Confirmando que as mudanças foram realizadas
df.select('desc_foro').distinct().show(truncate=False)

+------------------------------------+
|desc_foro                           |
+------------------------------------+
|Departamento de Polícia de Nova York|
|Polícia de Moradia de Nova York     |
|Polícia de Trânsito de Nova York    |
|Outro                               |
+------------------------------------+



#### ✏️ **faixa_etaria_suspeito**

In [43]:
# Inconsistências: 1056, null
df.select('faixa_etaria_suspeito').distinct().show(truncate=False)

+---------------------+
|faixa_etaria_suspeito|
+---------------------+
|1056                 |
|<18                  |
|25-44                |
|null                 |
|UNKNOWN              |
|65+                  |
|18-24                |
|45-64                |
+---------------------+



In [44]:
df.groupby('faixa_etaria_suspeito').count().show(truncate=False)

+---------------------+-----+
|faixa_etaria_suspeito|count|
+---------------------+-----+
|1056                 |1    |
|<18                  |51   |
|25-44                |542  |
|null                 |530  |
|UNKNOWN              |32   |
|65+                  |9    |
|18-24                |243  |
|45-64                |151  |
+---------------------+-----+



In [45]:
# Substituindo dados inconsistentes para Desconhecido
df = (df.withColumn('faixa_etaria_suspeito', F.when(F.col('faixa_etaria_suspeito') == '1056', 'Desconhecido')
             .otherwise(F.col('faixa_etaria_suspeito')))
             .withColumn('faixa_etaria_suspeito', F.when(F.isnull(F.col('faixa_etaria_suspeito')), 'Desconhecido')
             .otherwise(F.col('faixa_etaria_suspeito')))
             .withColumn('faixa_etaria_suspeito', F.when(F.col('faixa_etaria_suspeito') == 'UNKNOWN', 'Desconhecido')
             .otherwise(F.col('faixa_etaria_suspeito')))
)
         
df.show(truncate=False)

+----------------+------------------------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito       |sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+------------------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |WHITE HISPANIC          |M            |(40.804012949, -73.878331833)|25-44              |BLACK                   |M       

In [46]:
# Verificando se mudanças foram feitas
df.groupby('faixa_etaria_suspeito').count().show(truncate=False)

+---------------------+-----+
|faixa_etaria_suspeito|count|
+---------------------+-----+
|<18                  |51   |
|25-44                |542  |
|Desconhecido         |563  |
|65+                  |9    |
|18-24                |243  |
|45-64                |151  |
+---------------------+-----+



#### ✏️ **cor_pele_suspeito**


In [47]:
# Inconsistências: dados em inglês, null
df.select('cor_pele_suspeito').distinct().show(truncate=False)

+------------------------+
|cor_pele_suspeito       |
+------------------------+
|WHITE                   |
|BLACK                   |
|null                    |
|BLACK HISPANIC          |
|WHITE HISPANIC          |
|UNKNOWN                 |
|ASIAN / PACIFIC ISLANDER|
+------------------------+



In [48]:
# Traduzindo dados
df = (df.withColumn('cor_pele_suspeito', 
                    F.when(F.col('cor_pele_suspeito') == 'WHITE', 'Branca')
                    .when(F.col('cor_pele_suspeito') == 'BLACK', 'Preta')
                    .when(F.isnull(F.col('cor_pele_suspeito')), 'Desconhecido')
                    .when(F.col('cor_pele_suspeito') == 'BLACK HISPANIC', 'Preta')
                    .when(F.col('cor_pele_suspeito') == 'WHITE HISPANIC', 'Branca')
                    .when(F.col('cor_pele_suspeito') == 'UNKNOWN', 'Desconhecido')
                    .when(F.col('cor_pele_suspeito') == 'ASIAN / PACIFIC ISLANDER', 'Amarela') 
                    .otherwise(F.col('cor_pele_suspeito')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |M            |(40.804012949, -73.878331833)|25-44              |BLACK                   |M          |2018-07-09 14:25:00|Bron

In [49]:
# Confirmando se as mudanças foram realizadas
df.select('cor_pele_suspeito').distinct().show(truncate=False)

+-----------------+
|cor_pele_suspeito|
+-----------------+
|Preta            |
|Desconhecido     |
|Amarela          |
|Branca           |
+-----------------+



#### ✏️ **sexo_suspeito**

In [50]:
# Inconsistências: dados em inglês e abreviados, null 
df.select('sexo_suspeito').distinct().show(truncate=False)

+-------------+
|sexo_suspeito|
+-------------+
|F            |
|null         |
|M            |
|U            |
+-------------+



In [51]:
# Traduzindo dados
df = (df.withColumn('sexo_suspeito', F.when(F.col('sexo_suspeito') == 'F', 'Feminino') 
        .otherwise(F.col('sexo_suspeito')))
        .withColumn('sexo_suspeito', F.when(F.col('sexo_suspeito') == 'M', 'Masculino') 
        .otherwise(F.col('sexo_suspeito')))
        .withColumn('sexo_suspeito', F.when(F.col('sexo_suspeito') == 'U', 'Desconhecido') 
        .otherwise(F.col('sexo_suspeito')))
        .withColumn('sexo_suspeito', F.when(F.isnull(F.col('sexo_suspeito')), 'Desconhecido') 
        .otherwise(F.col('sexo_suspeito')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude           |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+-----------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |Masculino    |(40.804012949, -73.878331833)|25-44              |BLACK                   |M          |2018-07-09 14:25:00|Bron

In [52]:
# Confirmando se as mudanças foram realizadas
df.select('sexo_suspeito').distinct().show(truncate=False)

+-------------+
|sexo_suspeito|
+-------------+
|Desconhecido |
|Feminino     |
|Masculino    |
+-------------+



#### ✏️ **latitude_longitude**

In [53]:
# Inconsistências: "(" ")" e espaço
df.select('latitude_longitude').distinct().show(truncate=False)

+----------------------------------------+
|latitude_longitude                      |
+----------------------------------------+
|(40.81950961800004, -73.95358253499995) |
|(40.71662118300002, -73.99326291399994) |
|(40.67560823700006, -73.91935098699997) |
|(40.693780803, -73.735773689)           |
|(40.597999401, -73.760016269)           |
|(40.84512030900004, -73.92249595899995) |
|(40.65921221700007, -73.90998894799998) |
|(40.640592758000025, -73.918932091)     |
|(40.67458330800008, -73.93022154099998) |
|(40.85396027400003, -73.90012087499997) |
|(40.86600470600007, -73.87043794899995) |
|(40.666231521000036, -73.92253800099996)|
|(40.66080625500007, -73.89923866699996) |
|(40.819269208, -73.89628544)            |
|(40.85359836700008, -73.90057688099995) |
|(40.68465082400007, -73.95776878599996) |
|(40.62141559900005, -74.08007850499997) |
|(40.88708918800006, -73.86778566699996) |
|(40.668503474000026, -73.92559940099994)|
|(40.82286114200008, -73.91916530199995) |
+----------

In [54]:
# Corrigindo inconsistências
df = df.withColumn('latitude_longitude', F.translate('latitude_longitude', '() ', ''))
df.show(5, truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude        |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|data_ocorrencia    |distrito |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+---------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |Masculino    |40.804012949,-73.878331833|25-44              |BLACK          |M          |2018-07-09 14:25:00|Bronx    |Tarde             |Nova York|
|Homicídio Doloso|Depart

#### ✏️ **faixa_etaria_vitima**

In [55]:
# Inconsistências: dados em inglês
df.select('faixa_etaria_vitima').distinct().show(truncate=False)

+-------------------+
|faixa_etaria_vitima|
+-------------------+
|<18                |
|25-44              |
|UNKNOWN            |
|65+                |
|18-24              |
|45-64              |
+-------------------+



In [56]:
# Traduzindo dados
df = (df.withColumn('faixa_etaria_vitima', F.when(F.col('faixa_etaria_vitima') == 'UNKNOWN', 'Desconhecido') 
        .otherwise(F.col('faixa_etaria_vitima')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude        |faixa_etaria_vitima|cor_pele_vitima         |sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+------------------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |Masculino    |40.804012949,-73.878331833|25-44              |BLACK                   |M          |2018-07-09 14:25:00|Bronx        |Ta

In [57]:
# Confirmando se as mudanças foram realizadas
df.select('faixa_etaria_vitima').distinct().show(truncate=False)

+-------------------+
|faixa_etaria_vitima|
+-------------------+
|<18                |
|25-44              |
|Desconhecido       |
|65+                |
|18-24              |
|45-64              |
+-------------------+



#### ✏️ **cor_pele_vitima**

In [58]:
# Inconsistências: dados em inglês
df.select('cor_pele_vitima').distinct().show(truncate=False)

+------------------------------+
|cor_pele_vitima               |
+------------------------------+
|WHITE                         |
|BLACK                         |
|AMERICAN INDIAN/ALASKAN NATIVE|
|BLACK HISPANIC                |
|WHITE HISPANIC                |
|UNKNOWN                       |
|ASIAN / PACIFIC ISLANDER      |
+------------------------------+



In [59]:
# Traduzindo dados
df = (df.withColumn('cor_pele_vitima', 
                    F.when(F.col('cor_pele_vitima') == 'WHITE', 'Branca')
                    .when(F.col('cor_pele_vitima') == 'BLACK', 'Preta')
                    .when(F.col('cor_pele_vitima') == 'BLACK HISPANIC', 'Preta')
                    .when(F.col('cor_pele_vitima') == 'WHITE HISPANIC', 'Branca')
                    .when(F.col('cor_pele_vitima') == 'UNKNOWN', 'Desconhecido')
                    .when(F.col('cor_pele_vitima') == 'ASIAN / PACIFIC ISLANDER', 'Amarela')
                    .when(F.col('cor_pele_vitima') == 'AMERICAN INDIAN/ALASKAN NATIVE', 'Povos originários') 
                    .otherwise(F.col('cor_pele_vitima')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude        |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |Masculino    |40.804012949,-73.878331833|25-44              |Preta          |M          |2018-07-09 14:25:00|Bronx        |Tarde             |Nova York|
|Homicíd

In [60]:
# Confirmando se as mudanças foram realizadas
df.select('cor_pele_vitima').distinct().show(truncate=False)

+-----------------+
|cor_pele_vitima  |
+-----------------+
|Povos originários|
|Preta            |
|Desconhecido     |
|Amarela          |
|Branca           |
+-----------------+



#### ✏️ **sexo_vitima**

In [61]:
# Inconsistências: dados abreviados
df.select('sexo_vitima').distinct().show(truncate=False)

+-----------+
|sexo_vitima|
+-----------+
|F          |
|E          |
|M          |
|U          |
+-----------+



In [62]:
# Traduzindo dados
df = (df.withColumn('sexo_vitima', F.when(F.col('sexo_vitima') == 'F', 'Feminino') 
        .otherwise(F.col('sexo_vitima')))
        .withColumn('sexo_vitima', F.when(F.col('sexo_vitima') == 'M', 'Masculino') 
        .otherwise(F.col('sexo_vitima')))
        .withColumn('sexo_vitima', F.when(F.col('sexo_vitima') == 'U', 'Desconhecido') 
        .otherwise(F.col('sexo_vitima')))
        .withColumn('sexo_vitima', F.when(F.col('sexo_vitima') == 'E', 'Desconhecido') 
        .otherwise(F.col('sexo_vitima')))
)

df.show(truncate=False)

+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+-------------+------------------+---------+
|tipo_crime      |desc_foro                           |faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|latitude_longitude        |faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|data_ocorrencia    |distrito     |periodo_ocorrencia|cidade   |
+----------------+------------------------------------+---------------------+-----------------+-------------+--------------------------+-------------------+---------------+-----------+-------------------+-------------+------------------+---------+
|Homicídio Doloso|Outro                               |25-44                |Branca           |Masculino    |40.804012949,-73.878331833|25-44              |Preta          |Masculino  |2018-07-09 14:25:00|Bronx        |Tarde             |Nova York|
|Homicíd

In [63]:
# Confirmando se as mudanças foram realizadas
df.select('sexo_suspeito').distinct().show(truncate=False)

+-------------+
|sexo_suspeito|
+-------------+
|Desconhecido |
|Feminino     |
|Masculino    |
+-------------+



#### ✏️ **data_ocorrencia**

In [64]:
# Dados ok
df.select('data_ocorrencia').sort('data_ocorrencia').show(truncate=False)

+-------------------+
|data_ocorrencia    |
+-------------------+
|2018-01-01 13:09:00|
|2018-01-03 20:37:00|
|2018-01-06 21:05:00|
|2018-01-09 23:36:00|
|2018-01-10 14:30:00|
|2018-01-10 14:30:00|
|2018-01-12 20:17:00|
|2018-01-13 13:32:00|
|2018-01-15 23:39:00|
|2018-01-16 02:00:00|
|2018-01-16 21:30:00|
|2018-01-18 08:26:00|
|2018-01-18 21:58:00|
|2018-01-21 01:05:00|
|2018-01-21 02:35:00|
|2018-01-21 17:40:00|
|2018-01-27 08:43:00|
|2018-01-28 05:10:00|
|2018-01-29 12:45:00|
|2018-02-02 18:30:00|
+-------------------+
only showing top 20 rows



In [65]:
# Dados ok
df.select('data_ocorrencia').sort('data_ocorrencia', ascending=False).show(truncate=False)

+-------------------+
|data_ocorrencia    |
+-------------------+
|2021-12-31 19:23:00|
|2021-12-27 20:04:00|
|2021-12-27 15:48:00|
|2021-12-26 22:45:00|
|2021-12-26 12:20:00|
|2021-12-26 04:07:00|
|2021-12-26 04:07:00|
|2021-12-25 23:54:00|
|2021-12-25 19:15:00|
|2021-12-23 23:05:00|
|2021-12-23 04:05:00|
|2021-12-22 23:17:00|
|2021-12-22 05:35:00|
|2021-12-21 21:30:00|
|2021-12-20 18:35:00|
|2021-12-18 00:15:00|
|2021-12-17 22:13:00|
|2021-12-17 20:59:00|
|2021-12-17 06:21:00|
|2021-12-17 00:10:00|
+-------------------+
only showing top 20 rows



###▶️ **Reorganizando colunas**

In [66]:
df = df.select('cidade', 'tipo_crime', 'distrito', 'data_ocorrencia', 
               'periodo_ocorrencia', 'faixa_etaria_suspeito', 'cor_pele_suspeito',
               'sexo_suspeito', 'faixa_etaria_vitima', 'cor_pele_vitima',
               'sexo_vitima', 'latitude_longitude', 'desc_foro')

In [67]:
df.show(truncate=False)

+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|cidade   |tipo_crime      |distrito     |data_ocorrencia    |periodo_ocorrencia|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|latitude_longitude        |desc_foro                           |
+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|Nova York|Homicídio Doloso|Bronx        |2018-07-09 14:25:00|Tarde             |25-44                |Branca           |Masculino    |25-44              |Preta          |Masculino  |40.804012949,-73.878331833|Outro                               |
|Nova Yo

##💿 **7 - Upload de arquivo final**


###⬆️ **Load do arquivo**

In [68]:
df_pd = df.toPandas()

In [69]:
df_pd.to_csv('ny_geral_tratado.csv', index=False)

In [70]:
df_tratado = pd.read_csv('/content/ny_geral_tratado.csv')

In [71]:
df_tratado

Unnamed: 0,cidade,tipo_crime,distrito,data_ocorrencia,periodo_ocorrencia,faixa_etaria_suspeito,cor_pele_suspeito,sexo_suspeito,faixa_etaria_vitima,cor_pele_vitima,sexo_vitima,latitude_longitude,desc_foro
0,Nova York,Homicídio Doloso,Bronx,2018-07-09 14:25:00,Tarde,25-44,Branca,Masculino,25-44,Preta,Masculino,"40.804012949,-73.878331833",Outro
1,Nova York,Homicídio Doloso,Brooklyn,2018-03-14 05:00:00,Madrugada,25-44,Preta,Masculino,<18,Preta,Feminino,"40.661502264,-73.907661037",Departamento de Polícia de Nova York
2,Nova York,Homicídio Doloso,Brooklyn,2018-04-18 20:40:00,Noite,Desconhecido,Desconhecido,Desconhecido,25-44,Preta,Masculino,"40.693667261,-73.94451783",Departamento de Polícia de Nova York
3,Nova York,Homicídio Doloso,Manhattan,2018-05-18 08:05:00,Manhã,45-64,Preta,Feminino,<18,Branca,Masculino,"40.755701867,-73.978221916",Departamento de Polícia de Nova York
4,Nova York,Homicídio Doloso,Bronx,2018-09-10 08:00:00,Manhã,45-64,Preta,Masculino,45-64,Preta,Feminino,"40.846391886,-73.88412771",Departamento de Polícia de Nova York
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1554,Nova York,Homicídio Culposo,Queens,2018-11-22 03:43:00,Madrugada,Desconhecido,Branca,Masculino,18-24,Branca,Masculino,"40.74360542,-73.920986473",Departamento de Polícia de Nova York
1555,Nova York,Homicídio Doloso,Manhattan,2018-07-17 19:58:00,Noite,25-44,Preta,Masculino,18-24,Preta,Masculino,"40.828076123,-73.936770657",Departamento de Polícia de Nova York
1556,Nova York,Homicídio Culposo,Manhattan,2018-10-19 22:50:00,Noite,Desconhecido,Desconhecido,Desconhecido,25-44,Preta,Masculino,"40.827462608,-73.939192193",Departamento de Polícia de Nova York
1557,Nova York,Homicídio Doloso,Brooklyn,2018-03-14 05:00:00,Madrugada,25-44,Preta,Masculino,45-64,Preta,Masculino,"40.661502264,-73.907661037",Departamento de Polícia de Nova York


###⬆️ **Envio ao MongoDB**

In [72]:
# Conector do mongo atlas
uri = "mongodb+srv://cluster0.2qynihy.mongodb.net/?authSource=%24external&authMechanism=MONGODB-X509&retryWrites=true&w=majority"
client = MongoClient(uri, 
                     tls=True,
                     tlsCertificateKeyFile='/content/X509-cert-7694731329810785124.pem')

In [73]:
# Escolha/crie o database e colecao
db = client['projeto_final']
colecao = db['ny_geral_tratado']

# Verificar conexão / Qtd. documentos da coleção
colecao.count_documents({})

0

In [74]:
# Enviar o DF para colecao selecionada no mongo
colecao.drop()
df_dict = df_tratado.to_dict("records")
colecao.insert_many(df_dict)

<pymongo.results.InsertManyResult at 0x7f32a37c9280>

In [75]:
colecao.count_documents({})

1559

###⬆️ **Envio ao Google Cloud Storage**

In [76]:
# Configuração da chave de segurança
serviceAccount = '/content/chave-projeto-final.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [77]:
# Função para fazer upload de arquivo no bucket
def upload_blob(bucket, arquivo, destino):
    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(destino)

    blob.upload_from_filename(arquivo)

    print(
        f"Arquivo {arquivo} enviado a {destino}."
    )

In [78]:
# Upload do arquivo 
bucket = 'projeto_final_soulcode'
arquivo = '/content/ny_geral_tratado.csv'
destino = 'Arquivos_Tratados/ny_geral_tratado.csv'
upload_blob(bucket, arquivo, destino)

Arquivo /content/ny_geral_tratado.csv enviado a Arquivos_Tratados/ny_geral_tratado.csv.


In [79]:
path_tratado = 'gs://projeto_final_soulcode/Arquivos_Tratados/ny_geral_tratado.csv'

In [80]:
# Conferindo se o carregamento deu certo
df_tratado = (
    spark.read.format('csv')
              .option('header', 'true')
              .option('delimiter', ',')
              .load(path_tratado)
)

In [81]:
df_tratado.show(truncate=False)

+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|cidade   |tipo_crime      |distrito     |data_ocorrencia    |periodo_ocorrencia|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|latitude_longitude        |desc_foro                           |
+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|Nova York|Homicídio Doloso|Bronx        |2018-07-09 14:25:00|Tarde             |25-44                |Branca           |Masculino    |25-44              |Preta          |Masculino  |40.804012949,-73.878331833|Outro                               |
|Nova Yo

In [82]:
df_tratado.printSchema()

root
 |-- cidade: string (nullable = true)
 |-- tipo_crime: string (nullable = true)
 |-- distrito: string (nullable = true)
 |-- data_ocorrencia: string (nullable = true)
 |-- periodo_ocorrencia: string (nullable = true)
 |-- faixa_etaria_suspeito: string (nullable = true)
 |-- cor_pele_suspeito: string (nullable = true)
 |-- sexo_suspeito: string (nullable = true)
 |-- faixa_etaria_vitima: string (nullable = true)
 |-- cor_pele_vitima: string (nullable = true)
 |-- sexo_vitima: string (nullable = true)
 |-- latitude_longitude: string (nullable = true)
 |-- desc_foro: string (nullable = true)



## 📈 **8 - Criando os insigths preliminares**

### 🔎 **Spark SQL**

In [83]:
# Criando database ny
spark.sql('create database ny').show()

++
||
++
++



In [84]:
# Verificando se foi criada
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
|       ny|
+---------+



In [85]:
# Escolhendo o database
spark.sql('use ny')

DataFrame[]

In [86]:
# Transformando df em table
df_tratado.write.saveAsTable('ny_tratado')

In [87]:
# Verificando se foi criada
spark.sql('show tables').show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|       ny|ny_tratado|      false|
+---------+----------+-----------+



In [88]:
# Mostrando todos os dados da tabela
spark.sql('select * from ny_tratado').show(truncate=False)

+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|cidade   |tipo_crime      |distrito     |data_ocorrencia    |periodo_ocorrencia|faixa_etaria_suspeito|cor_pele_suspeito|sexo_suspeito|faixa_etaria_vitima|cor_pele_vitima|sexo_vitima|latitude_longitude        |desc_foro                           |
+---------+----------------+-------------+-------------------+------------------+---------------------+-----------------+-------------+-------------------+---------------+-----------+--------------------------+------------------------------------+
|Nova York|Homicídio Doloso|Bronx        |2018-07-09 14:25:00|Tarde             |25-44                |Branca           |Masculino    |25-44              |Preta          |Masculino  |40.804012949,-73.878331833|Outro                               |
|Nova Yo

In [89]:
# Distrito com mais crimes de morte
spark.sql('''
SELECT
  distrito,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  distrito
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+-------------+------------+
|distrito     |crimes_morte|
+-------------+------------+
|Brooklyn     |517         |
|Bronx        |435         |
|Queens       |280         |
|Manhattan    |263         |
|Staten Island|61          |
|Desconhecido |3           |
+-------------+------------+



In [90]:
# Sexo da vítima em crimes de morte
spark.sql('''
SELECT
  sexo_vitima,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  sexo_vitima
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+------------+------------+
|sexo_vitima |crimes_morte|
+------------+------------+
|Masculino   |1319        |
|Feminino    |238         |
|Desconhecido|2           |
+------------+------------+



In [91]:
# Sexo do suspeito em crimes de morte
spark.sql('''
SELECT
  sexo_suspeito,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  sexo_suspeito
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+-------------+------------+
|sexo_suspeito|crimes_morte|
+-------------+------------+
|Masculino    |930         |
|Desconhecido |556         |
|Feminino     |73          |
+-------------+------------+



In [92]:
# PerÍodo do dia com mais crimes de morte
spark.sql('''
SELECT
  periodo_ocorrencia,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  periodo_ocorrencia
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+------------------+------------+
|periodo_ocorrencia|crimes_morte|
+------------------+------------+
|Noite             |577         |
|Madrugada         |453         |
|Tarde             |352         |
|Manhã             |177         |
+------------------+------------+



In [93]:
# Faixa etária da vítima
spark.sql('''
SELECT
  faixa_etaria_vitima,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
WHERE
  NOT faixa_etaria_vitima = "Desconhecido"
GROUP BY
  faixa_etaria_vitima
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+-------------------+------------+
|faixa_etaria_vitima|crimes_morte|
+-------------------+------------+
|25-44              |803         |
|18-24              |299         |
|45-64              |245         |
|<18                |98          |
|65+                |86          |
+-------------------+------------+



In [94]:
# Faixa etária do suspeito
spark.sql('''
SELECT
  faixa_etaria_suspeito,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
WHERE
  NOT faixa_etaria_suspeito = "Desconhecido"
GROUP BY
  faixa_etaria_suspeito
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+---------------------+------------+
|faixa_etaria_suspeito|crimes_morte|
+---------------------+------------+
|25-44                |542         |
|18-24                |243         |
|45-64                |151         |
|<18                  |51          |
|65+                  |9           |
+---------------------+------------+



In [95]:
# Cor de pele do suspeito de crimes de morte por distrito
spark.sql('''
SELECT
  distrito,
  cor_pele_suspeito,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
WHERE
  NOT cor_pele_suspeito = "Desconhecido"
GROUP BY
  distrito,
  cor_pele_suspeito
ORDER BY
  distrito,
  crimes_morte DESC
''').show(truncate=False)

+-------------+-----------------+------------+
|distrito     |cor_pele_suspeito|crimes_morte|
+-------------+-----------------+------------+
|Bronx        |Preta            |243         |
|Bronx        |Branca           |85          |
|Bronx        |Amarela          |1           |
|Brooklyn     |Preta            |222         |
|Brooklyn     |Branca           |47          |
|Brooklyn     |Amarela          |5           |
|Manhattan    |Preta            |130         |
|Manhattan    |Branca           |50          |
|Manhattan    |Amarela          |2           |
|Queens       |Preta            |89          |
|Queens       |Branca           |56          |
|Queens       |Amarela          |23          |
|Staten Island|Preta            |21          |
|Staten Island|Branca           |16          |
|Staten Island|Amarela          |2           |
+-------------+-----------------+------------+



In [96]:
# Cor de pele da vítima de crimes de morte por distrito
spark.sql('''
SELECT
  distrito,
  cor_pele_vitima,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
WHERE
  NOT cor_pele_vitima = "Desconhecido"
GROUP BY
  distrito,
  cor_pele_vitima
ORDER BY
  distrito,
  crimes_morte DESC
''').show(truncate=False)

+-------------+-----------------+------------+
|distrito     |cor_pele_vitima  |crimes_morte|
+-------------+-----------------+------------+
|Bronx        |Preta            |300         |
|Bronx        |Branca           |124         |
|Bronx        |Amarela          |2           |
|Brooklyn     |Preta            |406         |
|Brooklyn     |Branca           |89          |
|Brooklyn     |Amarela          |17          |
|Desconhecido |Preta            |2           |
|Desconhecido |Branca           |1           |
|Manhattan    |Preta            |164         |
|Manhattan    |Branca           |84          |
|Manhattan    |Amarela          |12          |
|Queens       |Preta            |163         |
|Queens       |Branca           |91          |
|Queens       |Amarela          |24          |
|Queens       |Povos originários|1           |
|Staten Island|Preta            |36          |
|Staten Island|Branca           |23          |
|Staten Island|Amarela          |1           |
+------------

In [97]:
# Meses com mais crimes de morte
spark.sql('''
SELECT
  CASE MONTH(data_ocorrencia)
    WHEN 1 THEN 'Janeiro'
    WHEN 2 THEN 'Fevereiro'
    WHEN 3 THEN 'Março'
    WHEN 4 THEN 'Abril'
    WHEN 5 THEN 'Maio'
    WHEN 6 THEN 'Junho'
    WHEN 7 THEN 'Julho'
    WHEN 8 THEN 'Agosto'
    WHEN 9 THEN 'Setembro'
    WHEN 10 THEN 'Outubro'
    WHEN 11 THEN 'Novembro'
    WHEN 12 THEN 'Dezembro'
END
  AS mes,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  mes
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+---------+------------+
|mes      |crimes_morte|
+---------+------------+
|Agosto   |179         |
|Setembro |172         |
|Julho    |156         |
|Junho    |146         |
|Outubro  |142         |
|Maio     |129         |
|Abril    |127         |
|Dezembro |118         |
|Janeiro  |103         |
|Março    |101         |
|Novembro |100         |
|Fevereiro|86          |
+---------+------------+



In [98]:
# Dias da semana com mais crimes de morte
spark.sql('''
SELECT
  CASE DAYOFWEEK(data_ocorrencia)
    WHEN 1 THEN 'Domingo'
    WHEN 2 THEN 'Segunda'
    WHEN 3 THEN 'Terça'
    WHEN 4 THEN 'Quarta'
    WHEN 5 THEN 'Quinta'
    WHEN 6 THEN 'Sexta'
    WHEN 7 THEN 'Sábado'
END
  AS dia_semana,
  COUNT(*) AS crimes_morte
FROM
  ny_tratado
GROUP BY
  dia_semana
ORDER BY
  crimes_morte DESC
''').show(truncate=False)

+----------+------------+
|dia_semana|crimes_morte|
+----------+------------+
|Domingo   |290         |
|Sábado    |268         |
|Segunda   |212         |
|Sexta     |210         |
|Terça     |200         |
|Quarta    |200         |
|Quinta    |179         |
+----------+------------+

