# ST IT Cloud - Data and Analytics Test LV.4

Esse teste deve avaliar alguns conceitos de big data e a qualidade técnica na manipulacão de dados, otimização de performance, trabalho com arquivos grandes e tratamento de qualidade.

## Passo a passo

-Disponibilizamos aqui 2 cases para serem desenvolvidos, leia os enunciados dos problemas, desenvolver os programas, utilizando a **stack definida durante o processo seletivo**, para entregar os dados de acordo com os requisitos descritos abaixo.

**Faz parte dos critérios de avaliacão a pontualidade da entrega. Implemente até onde for possível dentro do prazo acordado.**

**Os dados de pessoas foram gerados de forma aleatória, utilizando a biblioteca FakerJS, FakerJS-BR e Faker**

LEMBRE-SE: A entrega deve conter TODOS os passos para o avaliador executar o programa (keep it simple).


# TESTE PRÁTICO

**Problema 1**: Você está recebendo o arquivo 'dados_cadastrais_fake.csv' que contem dados cadastrais de clientes, mas para que análises ou relatórios sejam feitos é necessário limpar e normalizar os dados. Além disso, existe uma coluna com o número de cpf e outra com cnpj, você precisará padronizar deixando apenas dígitos em formato string (sem caracteres especiais), implementar uma forma de verificar se tais documentos são válidos sendo que a informação deve se adicionada ao dataframe em outras duas novas colunas.

Após a normalização, gere reports que respondam as seguintes perguntas:
- Quantos clientes temos nessa base?
- Qual a média de idade dos clientes?
- Quantos clientes nessa base pertencem a cada estado?
- Quantos CPFs válidos e inválidos foram encontrados?
- Quantos CNPJs válidos e inválidos foram encontrados?

Ao final gere um arquivo no formato csv e um outro arquivo no formato parquet chamado (problema1_normalizado), eles serão destinados para pessoas distintas.


In [27]:
#!pip install pandas
#!pip install spark
#!pip install pyspark
#!pip install dask
#!pip install pyarrow

import sys
import pandas as pd
import pyspark.pandas
from datetime import datetime, date, timedelta
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.sql import functions as F


In [28]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").getOrCreate()

sc = spark.sparkContext

In [40]:
file = 'dados_cadastrais_fake.csv'
df = spark.read.csv(file, sep=';', inferSchema=True, header=True)
df.show()

+------------------+-----+-------------------+----------------+--------------+------------------+
|             nomes|idade|             cidade|          estado|           cpf|              cnpj|
+------------------+-----+-------------------+----------------+--------------+------------------+
|    Dennis Daniels|   31|         ACRELÂNDIA|              AC|   97566536800|    06589184909526|
|       Leah Becker|   42|        ÁGUA BRANCA|              AL|425.263.807-07|25.673.336/2350-20|
|        Sally Ford|   18|           ALVARÃES|              AM|   34647754103|    26543101702989|
|    Colleen Duncan|   21|     SERRA DO NAVIO|              AP|252.531.560-03|19.062.080/5100-98|
|   Jeff Stephenson|   73|             ABAÍRA|              BA|   49668886542|    97794530015384|
|     Sydney Curtis|   85|            ABAIARA|              CE|506.202.907-49|29.476.298/0856-78|
|    Kelly Matthews|   44|           Brasília|distrito federal|   39154836808|    24709301957761|
|         Juan Ruiz|

In [41]:
# Normalização cpf e cnpj
df = df.withColumn('cpf', translate('cpf', '.', ''))
df = df.withColumn('cpf', translate('cpf','-',''))
df = df.withColumn('cnpj', translate('cnpj','.',''))
df = df.withColumn('cnpj', translate('cnpj','/',''))
df = df.withColumn('cnpj', translate('cnpj','-',''))
df.show()

+------------------+-----+-------------------+----------------+-----------+--------------+
|             nomes|idade|             cidade|          estado|        cpf|          cnpj|
+------------------+-----+-------------------+----------------+-----------+--------------+
|    Dennis Daniels|   31|         ACRELÂNDIA|              AC|97566536800|06589184909526|
|       Leah Becker|   42|        ÁGUA BRANCA|              AL|42526380707|25673336235020|
|        Sally Ford|   18|           ALVARÃES|              AM|34647754103|26543101702989|
|    Colleen Duncan|   21|     SERRA DO NAVIO|              AP|25253156003|19062080510098|
|   Jeff Stephenson|   73|             ABAÍRA|              BA|49668886542|97794530015384|
|     Sydney Curtis|   85|            ABAIARA|              CE|50620290749|29476298085678|
|    Kelly Matthews|   44|           Brasília|distrito federal|39154836808|24709301957761|
|         Juan Ruiz|   39|     AFONSO CLÁUDIO|              ES|22688119648|02420338147900|

In [42]:
# - Quantos clientes temos nessa base?
countCpf = df.select(col('cpf')).distinct().count()
countCnpj = df.select(col('cnpj')).distinct().count()
print(f"cpf: {countCpf}")
print(f"cnpj: {countCnpj}")

cpf: 10000
cnpj: 10000


In [43]:
# - Qual a média de idade dos clientes?
newDf = df.select(avg('idade').alias('avg_idade')).collect()

print(newDf[0]['avg_idade'])

53.7831


In [44]:
# - Quantos clientes nessa base pertencem a cada estado?
clientesPorEstado = df. \
    groupBy("estado"). \
    agg(count(col("cpf")).alias('qtClientes')). \
    select("estado", "qtClientes")
clientesPorEstado.show()

+------------+----------+
|      estado|qtClientes|
+------------+----------+
|          SC|       370|
|          RO|       370|
|          PI|       370|
|          AM|       371|
|  são  paulo|         3|
|          RR|       370|
|MINAS GERAIs|        17|
|          GO|       371|
|          TO|       370|
|          MT|       370|
|          SP|       364|
|          ES|       371|
|          PB|       370|
|          RS|       370|
|          MS|       370|
|          AL|       371|
| MINAS GERAI|        13|
|          MG|       340|
|          PA|       370|
|          BA|       371|
+------------+----------+
only showing top 20 rows



In [46]:
# - Quantos clientes nessa base pertencem a cada estado?
def translate_state(state):
    if len(state) > 2:
        dict_states = {
            "MINAS GERAIs": "MG",
            "MINAS GERAI": "MG",
            "são  paulo": "SP",
            "sao  paulo": "SP",
            "distrito federal": "DF",
            "rio de  janeiro ": "RJ",
            
        }
        new_state = dict_states.get(state, state)
    else:
        new_state = state

    return new_state


translate_state_udf = F.udf(lambda z: translate_state(z), StringType())

clientesPorEstado = df. \
    withColumn("estado", translate_state_udf(col("estado"))). \
    groupBy("estado"). \
    agg(count(col("cpf")).alias('qtClientes')). \
    select("qtClientes", "estado")
clientesPorEstado.show(50, False)



+----------+------+
|qtClientes|estado|
+----------+------+
|370       |SC    |
|370       |RO    |
|370       |PI    |
|371       |AM    |
|370       |RR    |
|371       |GO    |
|370       |TO    |
|370       |MT    |
|370       |SP    |
|371       |ES    |
|370       |PB    |
|370       |RS    |
|370       |MS    |
|371       |AL    |
|370       |MG    |
|370       |PA    |
|371       |BA    |
|370       |SE    |
|370       |PE    |
|371       |CE    |
|370       |RN    |
|370       |RJ    |
|371       |MA    |
|371       |AC    |
|371       |DF    |
|370       |PR    |
|371       |AP    |
+----------+------+



In [56]:
# Normalização estado
df = df.withColumn("estado", translate_state_udf(col("estado")))
df.show()

+------------------+-----+-------------------+------+-----------+--------------+
|             nomes|idade|             cidade|estado|        cpf|          cnpj|
+------------------+-----+-------------------+------+-----------+--------------+
|    Dennis Daniels|   31|         ACRELÂNDIA|    AC|97566536800|06589184909526|
|       Leah Becker|   42|        ÁGUA BRANCA|    AL|42526380707|25673336235020|
|        Sally Ford|   18|           ALVARÃES|    AM|34647754103|26543101702989|
|    Colleen Duncan|   21|     SERRA DO NAVIO|    AP|25253156003|19062080510098|
|   Jeff Stephenson|   73|             ABAÍRA|    BA|49668886542|97794530015384|
|     Sydney Curtis|   85|            ABAIARA|    CE|50620290749|29476298085678|
|    Kelly Matthews|   44|           Brasília|    DF|39154836808|24709301957761|
|         Juan Ruiz|   39|     AFONSO CLÁUDIO|    ES|22688119648|02420338147900|
|      Brian Thomas|   26|    ABADIA DE GOIÁS|    GO|47475484084|70723419110335|
|        Sara Ayers|   62|  

In [57]:

# https://dicasdeprogramacao.com.br/algoritmo-para-validar-cpf/

def valida_cpf(cpf):
    ver = 0
    for i in range(0,9):
        ver = ver + (int(cpf[i])*(10-i))

    resto = (ver*10)%11
    if resto == 10:  resto = 0

    if resto != int(cpf[9]): return False

    ver = 0
    for i in range(0,10):
        ver = ver + (int(cpf[i])*(11-i))
    
    resto = (ver*10)%11
    if resto == 10:  resto = 0

    if resto != int(cpf[10]): return False

    return True

# Testando cpf
print(valida_cpf("28180540871"))

# https://www.macoratti.net/alg_cnpj.htm

def calc_resto(val):
    resto = val%11
    if resto < 2: 
        return 0
    else: 
        return 11-resto
    

def valida_cnpj(cnpj):
    list = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
    ver = 0
    
    for i in range(0, len(list)):
        ver = ver + (int(cnpj[i]) * list[i]) 
        
    resto = calc_resto(ver)
    
    if resto != int(cnpj[12]): return False

    list.insert(0,6)
    ver = 0
    for i in range(0, len(list)):
        ver = ver + (int(cnpj[i]) * list[i])
    resto = calc_resto(ver)
    if resto != int(cnpj[13]): return False
    
    return True

print(valida_cnpj("11222333000181"))

True
True


In [58]:
# - Quantos CPFs válidos e inválidos foram encontrados?
valida_udf = F.udf(lambda z: valida_cpf(z),StringType())

df_com_cpf = df.select(col("cpf"), valida_udf(col("cpf")).alias("cpf_valido") )
qt_valido = df_com_cpf.where(col('cpf_valido') == True).count()
qt_invalido = df_com_cpf.where(col('cpf_valido') == False).count()
print(f"Cpfs Validos: {qt_valido}")
print(f"Cpfs Invalidos: {qt_invalido}")

Cpfs Validos: 10000
Cpfs Invalidos: 0


In [59]:
# - Quantos CNPJs válidos e inválidos foram encontrados?
valida_udf = F.udf(lambda z: valida_cnpj(z),StringType())

df_com_cnpj = df.select(col("cnpj"), valida_udf(col("cnpj")).alias("cnpj_valido") )
qt_valido = df_com_cnpj.where(col('cnpj_valido') == True).count()
qt_invalido = df_com_cnpj.where(col('cnpj_valido') == False).count()
print(f"Cnpjs Validos: {qt_valido}")
print(f"Cnpjs Invalidos: {qt_invalido}")

Cnpjs Validos: 10000
Cnpjs Invalidos: 0


**Problema 2**: Você deverá implementar um programa, para ler, tratar e particionar os dados.

O arquivo fonte está disponível em `https://st-it-cloud-public.s3.amazonaws.com/people-v2_1E6.csv.gz`

### Data Quality

- Higienizar e homogenizar o formato da coluna `document`
- Detectar através da coluna `document` se o registro é de uma Pessoa Física ou Pessoa Jurídica, adicionando uma coluna com essa informação
- Higienizar e homogenizar o formato da coluna `birthDate`
- Existem duas colunas nesse dataset que em alguns registros estão trocadas. Quais são essas colunas? 
- Corrigir os dados com as colunas trocadas
- Além desses pontos, existem outras tratamentos para homogenizar esse dataset. Aplique todos que conseguir.

### Agregação dos dados

- Quais são as 5 PF que mais gastaram (`totalSpent`)? 
- Qual é o valor de gasto médio por estado (`state`)?
- Qual é o valor de gasto médio por `jobArea`?
- Qual é a PF que gastou menos (`totalSpent`)?
- Quantos nomes e documentos repetidos existem nesse dataset?
- Quantas linhas existem nesse dataset?

### Particionamento de dados tratados com as regras descritas em `DATA QUALITY`

- Particionar em arquivos PARQUET por estado (`state`)
- Particionar em arquivos CSV por ano/mes/dia de nascimento (`birthDate`)

In [62]:
file = 'people-v2_1E6.csv'
df = spark.read.csv(file, sep=';', inferSchema=True, header=True)
df.show()

[Stage 141:====>                                                  (1 + 11) / 12]

+------------------+--------------------+--------------------+-------------------+------------+------------------+----------------+--------------------+------------------+----------+
|          document|                name|                 job|            jobArea|     jobType|       phoneNumber|       birthDate|                city|             state|totalSpent|
+------------------+--------------------+--------------------+-------------------+------------+------------------+----------------+--------------------+------------------+----------+
|       76684148787|     Charlleny Braga|Oficial Criativo ...|       Configuração|Estrategista|    (62) 4216-9799|      20-05-1972|   Município de Iara|             Goiás|     913.8|
|       85704855733|      Newton Saraiva|Administrador Com...|Prestação de contas| Facilitador|        Aplicações|     10-Jun-1982|Município de Neid...|                RR|     57.26|
|    15664328373377|Dr. Sr. Solange M...|Designer Identida...|           Métricas|   

                                                                                

In [74]:
# functions to use as udf later

def switch_person(doc):
    if len(doc) == 11:
        return "CPF"
    elif len(doc) == 14:
        return "CNPJ"
    else:
        return "INVALIDO"


def translate_birth_date(birthDate):
    ambiguous = False
    switch_day_mon = False
    res = birthDate
    split = []
    if '/' in birthDate:
        split = birthDate.split('/')
        ambiguous = True

    if '-' in birthDate:
        split = birthDate.split('-')

    elif '.' in birthDate:
        split = birthDate.split('.')
        split[0] = split[0].split(' ')[1]
        switch_day_mon = True

    elif ',' in birthDate:
        split = [birthDate[6:8], birthDate[4:6], birthDate[0:4]]

    months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
              'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
    if not split[1].isnumeric():
        split[1] = str(months.index(split[1]) + 1)

    elif not split[0].isnumeric():
        split[0] = str(months.index(split[0]) + 1)
        switch_day_mon = True

    if len(split[0]) == 1:
        split[0] = '0' + split[0]
    if len(split[1]) == 1:
        split[1] = '0' + split[1]
    if int(split[1]) > 12 or switch_day_mon:
        month = split[0]
        split[0] = split[1]
        split[1] = month
        ambiguous = False

    res = '/'.join(split)

    if ambiguous:
        return res + '-amb' # não sei se os primeiros 2 digitos se trata de mes ou dia
        #return res
    else:
        return res


def translate_state(state):
    if len(state) > 2:
        dict_states = {
            "Acre": "AC",
            "Alagoas": "AL",
            "Amapá": "AP",
            "Amazonas": "AM",
            "Bahia": "BA",
            "Ceará": "CE",
            "Distrito Federal": "DF",
            "Espírito Santo": "ES",
            "Goiás": "GO",
            "Maranhão": "MA",
            "Mato Grosso": "MT",
            "Mato Grosso do Sul": "MS",
            "Minas Gerais": "MG",
            "Paraná": "PR",
            "Paraíba": "PB",
            "Pará": "PA",
            "Pernambuco": "PE",
            "Piauí": "PI",
            "Rio Grande do Norte": "RN",
            "Rio Grande do Sul": "RS",
            "Rio de Janeiro": "RJ",
            "Rondônia": "RO",
            "Roraima": "RR",
            "Santa Catarina": "SC",
            "Sergipe": "SE",
            "São Paulo": "SP",
            "Tocantins": "TO",
        }
        new_state = dict_states.get(state, state)
    else:
        new_state = state

    return new_state


In [75]:
# udf functions tests
# switch_person tests:
print("####################### CPF or CNPJ ###############################")
print(switch_person("76684148787"), "76684148787")
print(switch_person("85704855733"), "85704855733")
print(switch_person("15664328373377"), "15664328373377")
print(switch_person("02328238087786"), "02328238087786")
print(switch_person("30073687408740"), "30073687408740")
print(switch_person("77145788233"), "77145788233")
print(switch_person("75752983045"), "75752983045")

# translate_birth_date tests:
print("####################### NORMALIZACAO DATA ############################")
print(translate_birth_date('20-05-1972'), '20-05-1972')
print(translate_birth_date('10-Jun-1982'), '10-Jun-1982')
print(translate_birth_date('05/16/1968'), '05/16/1968')
print(translate_birth_date('19810417,'), '19810417,')
print(translate_birth_date('04/07/1980'), '04/07/1980')
print(translate_birth_date('19830618,'), '19830618,')
print(translate_birth_date('27-Oct-1980'), '27-Oct-1980')
print(translate_birth_date('09/08/1978'), '09/08/1978')
print(translate_birth_date('19771120,'), '19771120,')
print(translate_birth_date('19-01-1987'), '19-01-1987')
print(translate_birth_date('Thu, Jan.30.1975'), 'Thu, Jan.30.1975')
print(translate_birth_date('May/12/1985'), 'May/12/1985')
print(translate_birth_date('19801214,'), '19801214,')
print(translate_birth_date('Aug/02/1989'), 'Aug/02/1989')
print(translate_birth_date('Dec/02/1986'), 'Dec/02/1986')
print(translate_birth_date('08/18/1963'), '08/18/1963')
print(translate_birth_date('19-Apr-1987'), '19-Apr-1987')
print(translate_birth_date('Dec/07/1978'), 'Dec/07/1978')
print(translate_birth_date('21-03-1972'), '21-03-1972')

####################### CPF or CNPJ ###############################
CPF 76684148787
CPF 85704855733
CNPJ 15664328373377
CNPJ 02328238087786
CNPJ 30073687408740
CPF 77145788233
CPF 75752983045
####################### NORMALIZACAO DATA ############################
20/05/1972 20-05-1972
10/06/1982 10-Jun-1982
16/05/1968 05/16/1968
17/04/1981 19810417,
04/07/1980-amb 04/07/1980
18/06/1983 19830618,
27/10/1980 27-Oct-1980
09/08/1978-amb 09/08/1978
20/11/1977 19771120,
19/01/1987 19-01-1987
30/01/1975 Thu, Jan.30.1975
12/05/1985 May/12/1985
14/12/1980 19801214,
02/08/1989 Aug/02/1989
02/12/1986 Dec/02/1986
18/08/1963 08/18/1963
19/04/1987 19-Apr-1987
07/12/1978 Dec/07/1978
21/03/1972 21-03-1972


In [76]:
# Data Quality

switch_udf = F.udf(lambda z: switch_person(z),StringType())

# - Higienizar e homogenizar o formato da coluna `document`
df = df.withColumn('document', translate('document','.','')). \
    withColumn('document', translate('document','/','')). \
    withColumn('document', translate('document','-',''))

# - Detectar através da coluna `document` se o registro é de uma Pessoa Física ou Pessoa Jurídica, adicionando uma coluna com essa informação
df = df.withColumn("type_person", switch_udf(col("document")))

# - Existem duas colunas nesse dataset que em alguns registros estão trocadas. Quais são essas colunas?
# - Corrigir os dados com as colunas trocadas
# ESTAS COLUNAS NÃO FORAM TROCADAS POR HAVER POUCA CONSISTÊNCIA NOS DADOS, ONDE HÁ MUITOS QUE SIMPLESMENTE ESTÃO COM NÚMEROS DE TELEFONE EM AMBOS OS CAMPOS
# E É IMPRUDENTE INFERIR QUE OS DADOS FICARÃO CORRETOS CASO SEJAM TROCADOS

print(
    df.count() , \
    df. \
    filter(col("jobArea").contains('(')). \
    count(),
    df. \
    filter(~col("phoneNumber").contains('(')). \
    count(),
    df. \
    filter(col("jobArea").contains('(')). \
    filter(~col("phoneNumber").contains('(')). \
    count()
)







1000000 49991 49836 2484


                                                                                

In [77]:
translate_udf = F.udf(lambda z: translate_birth_date(z),StringType())
get_date_udf = F.udf(lambda z: z.split('-')[0],StringType())
is_amb_udf = F.udf(lambda z: len(z.split('-'))==2,StringType())

translate_state_udf = F.udf(lambda z: translate_state(z),StringType())

# - Higienizar e homogenizar o formato da coluna `birthDate`
# mesmo depois do tratamento, há o formato dd/mm/yyyy e mm/dd/yyyy que é ambiguo e ainda deve ser levado em conta:
# a funcão de tradução pega todos os dados que tem para inferir se a data é ou não ambigua
#    (por exemplo se o mes é maior que 12, e o mes ser escrito com letras (Jan,Feb,Mar....))
df = df.withColumn('birth_date_trans', translate_udf(col('birthDate'))). \
    withColumn('birthDate', get_date_udf(col('birth_date_trans'))). \
    withColumn('birth_date_is_ambiguous', is_amb_udf(col('birth_date_trans'))). \
    drop(col('birth_date_trans'))

# - Além desses pontos, existem outras tratamentos para homogenizar esse dataset. Aplique todos que conseguir.
df = df.withColumn('state', translate_state_udf(col('state')))



In [79]:
num_ambiguous = df.select(col('birth_date_is_ambiguous')).where(col('birth_date_is_ambiguous') == True).count()
num_unambiguous = df.select(col('birth_date_is_ambiguous')).where(col('birth_date_is_ambiguous') == False).count()
percentage_ambiguous = num_ambiguous / (num_ambiguous+num_unambiguous)
# df_test = df_4.select(col('birthDate'), col('birth_date_is_ambiguous')).where(col('birth_date_is_ambiguous') == True).show()
print(str(percentage_ambiguous*100)[0:5]+'%')



100.0%


                                                                                

In [81]:
# Agregação dos dados:
# - Quais são as 5 PF que mais gastaram (`totalSpent`)? 
phisicalPeopleWhoMostSpent = df. \
    where(col("type_person")=='CPF' ). \
    filter(col("totalSpent").isNotNull()). \
    orderBy(col("totalSpent").desc()). \
    limit(5). \
    select("document", "name","totalSpent")
phisicalPeopleWhoMostSpent.show()

[Stage 201:====>                                                  (1 + 11) / 12]

+-----------+------------------+----------+
|   document|              name|totalSpent|
+-----------+------------------+----------+
|18741688813|Sra. Rocio Martins|    1000.0|
|82190864755| Euvanderson Costa|    1000.0|
|58953147670|     Valeria Souza|    999.99|
|61454551445|    Wandir Martins|    999.99|
|17195012700|       Regina Melo|    999.99|
+-----------+------------------+----------+



                                                                                

In [82]:
# - Qual é o valor de gasto médio por estado (`state`)?
average_by_state = df. \
    orderBy("state"). \
    groupBy("state"). \
    agg(avg(col("totalSpent")).alias('avgSpent')). \
    select("state", "avgSpent")
average_by_state.show(50)

                                                                                

+-----+------------------+
|state|          avgSpent|
+-----+------------------+
|   AC| 502.4784239392661|
|   AL|500.51551942023275|
|   AM| 498.7706629805852|
|   AP|504.25913668879633|
|   BA| 498.7741403086009|
|   CE|499.30029952678933|
|   DF|  499.107365725608|
|   ES|501.63174747811667|
|   GO|501.29249260830375|
|   MA| 500.4352989228493|
|   MG| 499.5320941292197|
|   MS| 499.5097414101175|
|   MT|500.26461002714404|
|   PA|   499.23084708669|
|   PB|499.20039918879166|
|   PE|502.40577268067875|
|   PI| 499.0765211522321|
|   PR|500.64420030737665|
|   RJ| 502.4385686988432|
|   RN| 499.7560176991122|
|   RO|498.23731784934995|
|   RR|500.95639783978345|
|   RS| 500.6309834299535|
|   SC|501.34561245212547|
|   SE|498.24009925154405|
|   SP|498.69447971507907|
|   TO|501.90156334817084|
+-----+------------------+



In [83]:
# - Qual é o valor de gasto médio por `jobArea`?
average_by_job_area = df. \
    orderBy("jobArea"). \
    groupBy("jobArea", "document"). \
    agg(avg(col("totalSpent")).alias('avgSpent')). \
    select("document", "avgSpent","jobArea")
average_by_job_area.show(50)
    



+--------------+--------+---------------+
|      document|avgSpent|        jobArea|
+--------------+--------+---------------+
|   01831172127|  219.35|(00) 00435-1738|
|   10465354807|  799.86|(00) 01430-7785|
|   51826552073|  128.48|(00) 01525-7613|
|   83435277122|  125.28|(00) 01837-4277|
|   27336851126|   340.8|(00) 01870-0192|
|   18343070909|  124.98| (00) 0291-1564|
|31217228563646|   81.83| (00) 0323-6916|
|   48412552067|  833.41| (00) 0331-2091|
|   12823148582|   205.1| (00) 0430-5931|
|   38014766938|  990.49| (00) 0457-9237|
|   62087526710|  464.85| (00) 0473-0021|
|   66023765785|  155.48| (00) 0503-2738|
|   89627807206|  665.54| (00) 0505-1554|
|   82227207752|  650.87| (00) 0510-0098|
|   91693841770|  416.92|(00) 05929-6139|
|   16633537352|  758.21|(00) 06264-9122|
|28511113600879|  445.32|(00) 06265-7749|
|   18933779701|  523.35|(00) 06694-1196|
|   69380278462|   78.99|(00) 07053-8152|
|43585682015003|   161.6| (00) 0723-8012|
|   89759777223|  540.08| (00) 074

                                                                                

In [85]:
# - Qual é a PF que gastou menos (`totalSpent`)?
phisicalPersonWhoLessSpent = df. \
    where(col("type_person")=='CPF' ). \
    filter(col("totalSpent").isNotNull()). \
    orderBy(col("totalSpent")). \
    limit(1). \
    select("document", "name","totalSpent")
phisicalPersonWhoLessSpent.show()

[Stage 217:====>                                                  (1 + 11) / 12]

+-----------+--------------+----------+
|   document|          name|totalSpent|
+-----------+--------------+----------+
|58868926210|Sr. Enio Souza|       0.0|
+-----------+--------------+----------+



                                                                                

In [86]:
# - Quantos nomes e documentos repetidos existem nesse dataset?
repeated_documents = df. \
    orderBy("document"). \
    groupBy("document"). \
    count(). \
    filter("count > 1"). \
    count()
repeated_names = df. \
    orderBy("name"). \
    groupBy("name"). \
    count(). \
    filter("count > 1"). \
    count()
print("Repeated Documents:", repeated_documents)
print("Repeated Names:", repeated_names)



Repeated Documents: 430
Repeated Names: 173513


                                                                                

In [87]:
# - Quantas linhas existem nesse dataset?
df.count()

1000000

In [89]:
# ### Particionamento de dados tratados com as regras descritas em `DATA QUALITY`

# - Particionar em arquivos PARQUET por estado (`state`)
df.write.partitionBy("state").mode("overwrite").parquet("people_parquet")

22/03/29 20:00:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/03/29 20:00:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
22/03/29 20:00:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
22/03/29 20:00:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
22/03/29 20:00:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
22/03/29 20:00:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
22/03/29 20:00:51 WARN MemoryManager: Total allocation exceeds 95.

In [None]:
# - Particionar em arquivos CSV por ano/mes/dia de nascimento (`birthDate`)
'''
df. \
    withColumn('day', substring('birthDate', 1,2)). \
    withColumn('month', substring('birthDate', 4,2)). \
    withColumn('year', substring('birthDate', 7,4)). \
    write. \
        partitionBy("year", "month", "day"). \
        mode("overwrite"). \
        format("csv"). \
        save("people_csv")

'''
df.show()