# Importando dependencias

In [None]:
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from feature_store import FeatureStore, Catalog
from pyspark.sql import DataFrame
import time
from pyspark.sql.functions import col, when

In [None]:
from functools import reduce
import subprocess

In [None]:
#TIMESTAMP = "oculto"
#Timestamp oculto
nw_timestamp = 'oculto'
b3_timestamp = 'oculto'

# Feature Store

In [None]:
fs = FeatureStore(spark)

You are currently in the PROD environment.


In [None]:
catalog = fs.get_catalog()

# Utils

In [None]:
oculto = "gs://oculto/loss_prevention/perfil_laranja/"

In [None]:
UF_DDD = {"SP": [11, 12, 13, 14, 15, 16, 17, 18, 19],
       "RJ": [21, 22, 24],
       "ES": [27, 28],
       "MG": [31, 32, 33, 34, 35, 37, 38],
       "PR": [41, 42, 43, 44, 45, 46],
       "SC": [47, 48, 49],
       "RS": [51, 53, 54, 55],
       "DF": [61],
       "GO": [62, 64],
       "TO": [63],
       "MT": [65, 66],
       "MS": [67],
       "AC": [68],
       "RO": [69],
       "BA": [71, 73, 74, 75, 77],
       "SE": [79],
       "PE": [81, 87],
       "AL": [82],
       "PB": [83],
       "RN": [84],
       "CE": [85, 88],
       "PI": [86, 89],
       "PA": [91, 93, 94],
       "AM": [92, 97],
       "RR": [95],
       "AP": [96],
       "MA": [98, 99]
      }

In [None]:
def get_age_status(pessoa: DataFrame) -> DataFrame:

    pessoa = pessoa.withColumn(
        "flagAdolescente",
        sf.when((sf.col("idade") >= 12) & (sf.col("idade") <= 18), 1)
        .otherwise(0)
    )

    pessoa = pessoa.withColumn(
        "flagIdoso",
        sf.when(sf.col("idade") >= 60, 1)
        .otherwise(0)
    )

    return pessoa

In [None]:
def countMissingValues(dataframe: DataFrame) -> DataFrame:
    missing_values = dataframe.select([
        sf.count(sf.when(sf.col(c).isNull(), c)).alias(c) for c in dataframe.columns
    ])

    return missing_values

# Coleta dos dados

## Cadastrais

In [None]:
pessoa = oculto

In [None]:
pessoa = (
    pessoa
    .select(
        'cpf',
        'nome',
        'idade',
        'situacao',
        'indicioFalecimento',
        'dataIndicioFalecimento',
        sf.when(sf.col("falecidoConfirmado"), 1).otherwise(0).alias("flagFalecidoConfirmado"),
        sf.when(sf.col("situacao") == "REGULAR", 1).otherwise(0).alias("flagSituacaoRegular"),
        sf.when(sf.col('dataInscricao') >= sf.col('dataNascimento'),
                sf.round(sf.datediff(sf.col('dataInscricao'), sf.col('dataNascimento')) / 365, 2)
               ).otherwise(None).alias("tempoEmAnosDataNascimentoDataInscricao")
    )
    .withColumn("flagSupercentenario", sf.when(sf.col('idade') >= 110, sf.lit(1)).otherwise(sf.lit(0)))
    .filter((sf.col('idade') >= 12) & (sf.col('situacao') == "REGULAR"))
)

In [None]:
pessoa.count()

                                                                                

184050329

In [None]:
pessoa = get_age_status(pessoa)

In [None]:
pessoa_compliance_regras = (
    oculto
    .select("cpf",
            sf.col("flagEmpresario").cast("int").alias("flagEmpresario"),
            sf.col("flagAltaQualificacao").cast("int").alias("flagAltaQualificacao")
           )
)

### servidor público

In [None]:
pessoa_servidor_publico = (
    oculto
    .select(sf.col("cd_cpf").alias("cpf"))
    .dropDuplicates()
    .withColumn("flagServidorPublico", sf.lit(1))
)

### produtor rural

In [None]:
pessoa_sintegra_rural_ies = oculto

In [None]:
pessoa_produtor_rural = (
    oculto
    .select("cpf")
    .dropDuplicates()
    .withColumn("flagProdutorRural", sf.lit(1))
)

### registro ANTT

In [None]:
pessoa_registro_antt = (
    oculto
    .withColumn(
        "flagRegistroAntt",
            sf.when(
                sf.col("rntrcSituacao") == "ATIVO", 1
            ).otherwise(0)
        )
    .select("cpf",
            "flagRegistroAntt")
    .filter(sf.col("cpf").isNotNull())
)

### Salvando dados

In [None]:
datasets = [pessoa,
            pessoa_compliance_regras,
            #pessoa_profissao,
            pessoa_servidor_publico,
            pessoa_registro_antt,
            pessoa_produtor_rural
           ]

In [None]:
pessoa_cadastrais = reduce(lambda df1, df2: df1.join(df2, on="cpf", how='left'), datasets)

In [None]:
pessoa_cadastrais.count()

                                                                                

184050329

In [None]:
pessoa_cadastrais = pessoa_cadastrais.fillna(0, subset=["flagServidorPublico",
                                                        "flagEmpresario",
                                                        "flagRegistroAntt",
                                                        "flagProdutorRural",
                                                        "flagAltaQualificacao"
                                                       ])

In [None]:
DATASET_NAME = 'cadastrais.parquet'
PATH = os.path.join(oculto, 'data/')

pessoa_cadastrais.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

                                                                                

In [None]:
os.path.join(PATH, DATASET_NAME)

## Endereço

In [None]:
cols = ['cd_cpf', 'nm_logradouro', 'nu_logradouro',
        'nm_bairro', 'cd_cep', 'nm_municipio',
        'sg_uf', 'cd_latitude', 'cd_longitude']

In [None]:
endereco_simm = (
    oculto
    .select(*cols)
)

In [None]:
endereco = endereco_simm

In [None]:
endereco.createOrReplaceTempView("endereco_pessoa")

### Setor censitário

In [None]:
from sedona.spark import *
config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

25/03/19 04:31:19 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
25/03/19 04:31:19 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/03/19 04:31:19 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/03/19 04:31:19 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/03/19 04:31:19 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/03/19 04:31:19 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
25/03/19 04:31:19 WARN SimpleFunctionRegistry: The function st_union_aggr replaced a previously registered function.


In [None]:
from sedona.core.formatMapper.shapefileParser import ShapefileReader
from sedona.utils.adapter import Adapter

In [None]:
endereco_geometry = spark.sql("SELECT *, ST_Point(cd_longitude, cd_latitude) AS geometry from endereco_pessoa WHERE cd_latitude IS NOT NULL")
endereco_geometry.createOrReplaceTempView("endereco_geometry")

In [None]:
path_censo = "gs://oculto/discovery/Censo2022/shape_files_censo2022"
spatialRDD = ShapefileReader.readToGeometryRDD(spark.sparkContext, path_censo)
spatial_df = Adapter.toDf(spatialRDD, spark)
spatial_df.createOrReplaceTempView("spatial_censo22")

In [None]:
df_censo = spark.sql("SELECT geometry, SITUACAO AS situacao, CD_SIT AS situacaoDetalhada, CD_TIPO AS tipoSetorCensitario FROM spatial_censo22")
df_censo.createOrReplaceTempView("censo")

In [None]:
query = """
SELECT
    e.cd_cpf AS cpf,
    e.nm_municipio,
    e.sg_uf,
    c.situacaoDetalhada,
    c.tipoSetorCensitario
FROM
    endereco_geometry e,
    censo c
WHERE ST_Contains(c.geometry, e.geometry)
"""

In [None]:
endereco_censo = spark.sql(query)

In [None]:
endereco_censo = (
    endereco_censo
    .withColumn("flagAreaUrbanaBaixaDensidadeEdificios", sf.when(sf.col("situacaoDetalhada") == 2, 1).otherwise(0))
    .withColumn("flagAglomeradoRural", sf.when(sf.col("situacaoDetalhada").isin([5, 6, 7]), 1).otherwise(0))
    .withColumn("flagAreaRural", sf.when(sf.col("situacaoDetalhada") == 8, 1).otherwise(0))
    .withColumn("flagFavelaOuComunidadeUrbana", sf.when(sf.col("tipoSetorCensitario") == 1, 1).otherwise(0))
    .drop("situacaoDetalhada", "tipoSetorCensitario")
    .join(pessoa_cadastrais, how="right", on="cpf")
    .select("cpf", "flagAreaUrbanaBaixaDensidadeEdificios",
            "flagAglomeradoRural", "flagAreaRural",
            "flagFavelaOuComunidadeUrbana"
           )
    .fillna(0, subset=["flagAreaUrbanaBaixaDensidadeEdificios",
                       "flagAglomeradoRural", "flagAreaRural",
                       "flagFavelaOuComunidadeUrbana"])
)

In [None]:
endereco_censo.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- flagAreaUrbanaBaixaDensidadeEdificios: integer (nullable = true)
 |-- flagAglomeradoRural: integer (nullable = true)
 |-- flagAreaRural: integer (nullable = true)
 |-- flagFavelaOuComunidadeUrbana: integer (nullable = true)



#### Salvando dados

In [None]:
t1 = time.time()
DATASET_NAME = 'enderecos.parquet'
PATH = os.path.join(oculto, 'data/')

endereco_censo.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))
print(f"Passo 1 concluído em {time.time() - t1:.2f} segundos")

25/03/19 04:31:51 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
                                                                                

Passo 1 concluído em 1429.82 segundos


In [None]:
%%time
# endereco_censo.show(5)

CPU times: user 3 μs, sys: 2 μs, total: 5 μs
Wall time: 8.82 μs


## Benefícios

### BPC

In [None]:
beneficio_bpc = (
    oculto
    .select(sf.col("cpfBeneficiario").alias("cpf"),
            "cpfRepresentanteLegal")
    .dropDuplicates(subset=['cpf'])
    .withColumn("flagBeneficiarioBPC", sf.lit(1))
    .withColumn(
        "flagVulneravelBPC",
        sf.when((sf.col("cpf") != sf.col("cpfRepresentanteLegal")), 1)
        .otherwise(0)
    )
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
    .select("cpf", "flagBeneficiarioBPC", "flagVulneravelBPC")
    .fillna(0)
)

### Seguro Defeso

In [None]:
beneficio_seguro_defeso = (
    oculto
    .select("cpf")
    .dropDuplicates()
    .withColumn("flagBeneficiarioSeguroDefeso", sf.lit(1))
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
    .fillna(0)
)

### Garantia Safra

In [None]:
beneficio_garantia_safra = (
    oculto
    .filter(sf.col("beneficio") == "BENEFICIARIO DE GARANTIA SAFRA")
    .select("cpf")
    .dropDuplicates()
    .withColumn("flagBeneficiarioGarantiaSafra", sf.lit(1))
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
    .fillna(0)
)

### Novo Bolsa Familia

In [None]:
novo_bolsa_familia = (
    oculto
    .select("cpf")
    .dropDuplicates()
    .withColumn("flagBeneficiarioNovoBolsaFamilia", sf.lit(1))
    .join(pessoa_cadastrais, how="right", on="cpf")
    .select("cpf", "flagBeneficiarioNovoBolsaFamilia")
    .fillna(0)
)

In [None]:
novo_bolsa_familia.groupBy("flagBeneficiarioNovoBolsaFamilia").count().show()

### Salvando dados

In [None]:
beneficiary_condition = (
    (sf.col("flagBeneficiarioBPC") == 1) |
    # (sf.col("flagBeneficiarioBolsaFamilia") == 1) |
    # (sf.col("flagBeneficiarioAuxilioBrasil") == 1) |
    # (sf.col("flagBeneficiarioAuxilioEmergencial") == 1) |
    (sf.col("flagBeneficiarioSeguroDefeso") == 1) |
    (sf.col("flagBeneficiarioGarantiaSafra") == 1) |
    (sf.col("flagBeneficiarioNovoBolsaFamilia") == 1)
)

In [None]:
datasets = [beneficio_bpc, beneficio_seguro_defeso,
            beneficio_garantia_safra,
            novo_bolsa_familia
           ]

In [None]:
beneficios = (
    reduce(lambda df1, df2: df1.join(df2, on="cpf", how='inner'), datasets)
    .withColumn("flagBeneficiarioProgramasSociais", sf.when(beneficiary_condition, 1).otherwise(0))
)

In [None]:
def get_high_qualification_beneficiary_programs(pessoa_cadastrais: DataFrame, beneficios: DataFrame) -> DataFrame:
    pessoa_alta_qualificacao_beneficiaria_programas = (
        pessoa_cadastrais
        .select("cpf", "flagAltaQualificacao")
        .join(beneficios, on='cpf', how='inner')
    )

    pessoa_alta_qualificacao_beneficiaria_programas = (
        pessoa_alta_qualificacao_beneficiaria_programas
        .withColumn("flagAltaQualificacaoBeneficiarioProgramasSociais",
                    sf.when(sf.col("flagBeneficiarioProgramasSociais") == 1, 1)
                    .otherwise(0)
                   )
    )

    return pessoa_alta_qualificacao_beneficiaria_programas.select("cpf", "flagAltaQualificacaoBeneficiarioProgramasSociais")

In [None]:
pessoa_beneficios = (
    beneficios
    .join(
        get_high_qualification_beneficiary_programs(pessoa_cadastrais, beneficios),
        how="inner", on="cpf"
    ).fillna(0)
)

In [None]:
t1 = time.time()
DATASET_NAME = 'beneficios.parquet'
PATH = os.path.join(oculto, 'data/')

pessoa_beneficios.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))
print(f"Passo 1 concluído em {time.time() - t1:.2f} segundos")

                                                                                

Passo 1 concluído em 941.51 segundos


## Multiplicidade de CPFs

In [None]:
multiplicidade_cpfs = (
    oculto
    .join(pessoa_cadastrais.select("cpf"), how="inner", on="cpf")
)

In [None]:
multiplicidade_cpfs.count()

                                                                                

184050329

### Salvando dados

In [None]:
DATASET_NAME = 'multiplicidade_cpf.parquet'
PATH = os.path.join(oculto, 'data/')

multiplicidade_cpfs.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

                                                                                

## Renda

In [None]:
pessoa_renda_estimada = (
    oculto
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
    .select("cpf",
            sf.col("flagRendaSemInformacao").cast("int").alias("flagRendaSemInformacao"),
            sf.col("flagBaixaRenda").cast("int").alias("flagBaixaRenda"),
            sf.col("flagMediaRenda").cast("int").alias("flagMediaRenda"),
            sf.col("flagAltaRenda").cast("int").alias("flagAltaRenda"))
    .fillna(0)
)

### Salvando dados

In [None]:
DATASET_NAME = 'renda.parquet'
PATH = os.path.join(oculto, 'data/')

pessoa_renda_estimada.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

## Imposto de Renda

In [None]:
cols = ["cpf", "quantidadeRestituicaoIrpfObservadas",
        "quantidadeDeclaracoesIrpfObservadas",
        "flagAlterouBancoDeclaracaoIrpfUltimos5Anos"
       ]

In [None]:
pessoa_irpf = (
    oculto
    .select(*cols)
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
    .fillna(0, subset=["quantidadeRestituicaoIrpfObservadas", "quantidadeDeclaracoesIrpfObservadas"])
    .fillna(False, subset=["flagAlterouBancoDeclaracaoIrpfUltimos5Anos"])
)

In [None]:
from pyspark.sql.functions import col

### Salvando dados

In [None]:
DATASET_NAME = 'imposto-renda.parquet'
PATH = os.path.join(oculto, 'data/')

pessoa_irpf.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

                                                                                

## Sócios

In [None]:
cols = ["cpf",
        "flagSocioDuplaId",
        "percentualEmpresasEmLocalEntregaInterna",
        "flagSocioEmpresaComSocioFalecido",
        "quantidadeMaximaEmailsIguaisEmpresasAtivas", "quantidadeMediaEmpresasMesmoCnaeAbertas12Meses",
        "quantidadeMaximaAlteracoesEnderecos12meses",
        "quantidadeMaximaAlteracoesQsa365Dias", "quantidadeMaximaAlteracoesQsa30Dias",
        # "flagSocioProgramaSocial"
       ]

compliance_socios = (
    oculto
    .select("cpf",
            sf.col("flagSocioDuplaId").cast("int").alias("flagSocioDuplaId"),
            "percentualEmpresasEmLocalEntregaInterna",
            sf.col("flagSocioEmpresaComSocioFalecido").cast("int").alias("flagSocioEmpresaComSocioFalecido"),
            "quantidadeMaximaEmailsIguaisEmpresasAtivas", "quantidadeMediaEmpresasMesmoCnaeAbertas12Meses",
            "quantidadeMaximaAlteracoesEnderecos12meses",
            "quantidadeMaximaAlteracoesQsa365Dias", "quantidadeMaximaAlteracoesQsa30Dias")
    # .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
)

In [None]:
beneficios = pessoa_beneficios
dataset = compliance_socios

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagAltaQualificacaoBeneficiarioProgramasSociais"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagAltaQualificacaoBeneficiarioProgramasSociais") == 0).drop("flagAltaQualificacaoBeneficiarioProgramasSociais")

dataset_1 = dataset_joined.filter(col("flagAltaQualificacaoBeneficiarioProgramasSociais") == 1).drop("flagAltaQualificacaoBeneficiarioProgramasSociais")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 27549814
Processamento concluído em 43.77 segundos


                                                                                

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagBeneficiarioProgramasSociais"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagBeneficiarioProgramasSociais") == 0).drop("flagBeneficiarioProgramasSociais")

dataset_1 = dataset_joined.filter(col("flagBeneficiarioProgramasSociais") == 1).drop("flagBeneficiarioProgramasSociais")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 56.07 segundos


                                                                                

In [None]:
dataset = dataset_0

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagBeneficiarioNovoBolsaFamilia"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagBeneficiarioNovoBolsaFamilia") == 0).drop("flagBeneficiarioNovoBolsaFamilia")

dataset_1 = dataset_joined.filter(col("flagBeneficiarioNovoBolsaFamilia") == 1).drop("flagBeneficiarioNovoBolsaFamilia")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 72.36 segundos


                                                                                

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagBeneficiarioGarantiaSafra"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagBeneficiarioGarantiaSafra") == 0).drop("flagBeneficiarioGarantiaSafra")

dataset_1 = dataset_joined.filter(col("flagBeneficiarioGarantiaSafra") == 1).drop("flagBeneficiarioGarantiaSafra")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 80.76 segundos


                                                                                

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagBeneficiarioSeguroDefeso"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagBeneficiarioSeguroDefeso") == 0).drop("flagBeneficiarioSeguroDefeso")

dataset_1 = dataset_joined.filter(col("flagBeneficiarioSeguroDefeso") == 1).drop("flagBeneficiarioSeguroDefeso")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 103.02 segundos


                                                                                

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagVulneravelBPC"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagVulneravelBPC") == 0).drop("flagVulneravelBPC")

dataset_1 = dataset_joined.filter(col("flagVulneravelBPC") == 1).drop("flagVulneravelBPC")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 116.81 segundos


                                                                                

In [None]:
from pyspark.sql.functions import col

t1 = time.time()
dataset_joined = dataset.join(beneficios.select("cpf", "flagBeneficiarioBPC"), on="cpf", how="left")

dataset_0 = dataset_joined.filter(col("flagBeneficiarioBPC") == 0).drop("flagBeneficiarioBPC")

dataset_1 = dataset_joined.filter(col("flagBeneficiarioBPC") == 1).drop("flagBeneficiarioBPC")

print(f"Registros em dataset_0: {dataset_0.count()}")
print(f"Registros em dataset_1: {dataset_1.count()}")

dataset = dataset_0

print(f"Processamento concluído em {time.time() - t1:.2f} segundos")

                                                                                

Registros em dataset_0: 156500515




Registros em dataset_1: 0
Processamento concluído em 135.23 segundos


                                                                                

In [None]:
cols = ["cpf",
        "flagSocioDuplaId",
        "percentualEmpresasEmLocalEntregaInterna",
        "flagSocioEmpresaComSocioFalecido",
        "quantidadeMaximaEmailsIguaisEmpresasAtivas", "quantidadeMediaEmpresasMesmoCnaeAbertas12Meses",
        "quantidadeMaximaAlteracoesEnderecos12meses",
        "quantidadeMaximaAlteracoesQsa365Dias", "quantidadeMaximaAlteracoesQsa30Dias",
        # "flagSocioProgramaSocial"
       ]

compliance_socios = (
    oculto
    .select("cpf",
            sf.col("flagSocioDuplaId").cast("int").alias("flagSocioDuplaId"),
            "percentualEmpresasEmLocalEntregaInterna",
            sf.col("flagSocioEmpresaComSocioFalecido").cast("int").alias("flagSocioEmpresaComSocioFalecido"),
            "quantidadeMaximaEmailsIguaisEmpresasAtivas", "quantidadeMediaEmpresasMesmoCnaeAbertas12Meses",
            "quantidadeMaximaAlteracoesEnderecos12meses",
            "quantidadeMaximaAlteracoesQsa365Dias", "quantidadeMaximaAlteracoesQsa30Dias")
    # .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
)

In [None]:
from pyspark.sql.functions import col, lit, when

primary_keys = "cpf"

dataset_cpfs = dataset.select(primary_keys).distinct()

compliance_socios = compliance_socios.alias("cs").join(
    dataset_cpfs.alias("ds"),
    on=col("cs." + primary_keys) == col("ds." + primary_keys),
    how="left"
).select(
    col("cs.*"),
    when(col("ds." + primary_keys).isNull(), lit(1)).otherwise(lit(0)).alias("flagSocioProgramaSocial")
)

compliance_socios.printSchema()


root
 |-- cpf: string (nullable = true)
 |-- flagSocioDuplaId: integer (nullable = true)
 |-- percentualEmpresasEmLocalEntregaInterna: double (nullable = true)
 |-- flagSocioEmpresaComSocioFalecido: integer (nullable = true)
 |-- quantidadeMaximaEmailsIguaisEmpresasAtivas: long (nullable = true)
 |-- quantidadeMediaEmpresasMesmoCnaeAbertas12Meses: double (nullable = true)
 |-- quantidadeMaximaAlteracoesEnderecos12meses: long (nullable = true)
 |-- quantidadeMaximaAlteracoesQsa365Dias: long (nullable = true)
 |-- quantidadeMaximaAlteracoesQsa30Dias: long (nullable = true)
 |-- flagSocioProgramaSocial: integer (nullable = false)



In [None]:
print(f"CPFs com flagSocioProgramaSocial = 1: {compliance_socios.filter(col('flagSocioProgramaSocial') == 1).count()}")
print(f"CPFs com flagSocioProgramaSocial = 0: {compliance_socios.filter(col('flagSocioProgramaSocial') == 0).count()}")

                                                                                

CPFs com flagSocioProgramaSocial = 1: 91458764




CPFs com flagSocioProgramaSocial = 0: 156500515


                                                                                

In [None]:
DATASET_NAME = 'socios_beneficios.parquet'
PATH = os.path.join(oculto, 'data/')

compliance_socios.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

                                                                                

In [None]:
empresa = (
    oculto
    .select("cnpj", "porte")
)

In [None]:
qsa = (
    oculto
    .join(oculto)
          .withColumnRenamed("cnpj", "cnpjSocio"), on="id"
         )
)

In [None]:
empresa_fachada = (
    oculto
    .select("cnpj", "classeEmpresaFachada")
)

In [None]:
def qsa_informations(empresa: DataFrame, qsa_socios: DataFrame, pessoa_renda_estimada:DataFrame, empresa_fachada:DataFrame, beneficios: DataFrame) -> DataFrame:
    socios = (
        qsa_socios
        .join(empresa, on='cnpj', how='left')
        .join(empresa_fachada, on="cnpj", how="left")
        .select("cpf", "porte", "classeEmpresaFachada")
        .join(pessoa_renda_estimada, on='cpf', how='left')
        .join(beneficios, on='cpf', how='left')
        .withColumn("flagSocioEmpresasEPPBaixaRenda",
                    sf.when((sf.col("porte") == "EPP") & (sf.col("flagBaixaRenda") == 1), 1).otherwise(0))
        .withColumn("flagSocioEmpresasGrandePorteBaixaRenda",
                    sf.when((sf.col("porte") == "DEMAIS") & (sf.col("flagBaixaRenda") == 1), 1).otherwise(0))
        .withColumn("flagSocioBeneficiarioProgramaSocial",
                    sf.when(sf.col("flagBeneficiarioProgramasSociais") == 1, 1).otherwise(0))
        .withColumn("flagSocioClasseEmpresaFachadaALTA",
                    sf.when(sf.col("classeEmpresaFachada") == "ALTA", 1).otherwise(0))
        .withColumn("flagSocioClasseEmpresaFachadaMUITOALTA",
                    sf.when(sf.col("classeEmpresaFachada") == "MUITO ALTA", 1).otherwise(0))
        .withColumn("flagSocioClasseEmpresaFachadaMEDIO",
                    sf.when(sf.col("classeEmpresaFachada") == "MEDIO", 1).otherwise(0))
    )

    agg_socios = (
        socios
        .groupBy("cpf")
        .agg(
            sf.max("flagSocioEmpresasGrandePorteBaixaRenda").alias("flagSocioEmpresasGrandePorteBaixaRenda"),
            sf.max("flagSocioEmpresasEPPBaixaRenda").alias("flagSocioEmpresasEPPBaixaRenda"),
            sf.max("flagSocioBeneficiarioProgramaSocial").alias("flagSocioBeneficiarioProgramaSocial"),
            sf.max("flagSocioClasseEmpresaFachadaALTA").alias("flagSocioClasseEmpresaFachadaALTA"),
            sf.max("flagSocioClasseEmpresaFachadaMUITOALTA").alias("flagSocioClasseEmpresaFachadaMUITOALTA"),
            sf.max("flagSocioClasseEmpresaFachadaMEDIO").alias("flagSocioClasseEmpresaFachadaMEDIO")
        )
    )

    return agg_socios

In [None]:
socios_inf = qsa_informations(empresa, qsa, pessoa_renda_estimada, empresa_fachada, pessoa_beneficios)

In [None]:
socios = (
    compliance_socios
    .join(socios_inf, how="outer", on="cpf")
    .fillna(0, subset=["flagSocioEmpresasEPPBaixaRenda",
                       "flagSocioEmpresasGrandePorteBaixaRenda",
                       "flagSocioClasseEmpresaFachadaALTA",
                       "flagSocioClasseEmpresaFachadaMUITOALTA",
                       "flagSocioClasseEmpresaFachadaMEDIO",
                       "flagSocioBeneficiarioProgramaSocial"
                      ])
    .join(pessoa_cadastrais.select("cpf"), how="right", on="cpf")
)

In [None]:
socios.count()

                                                                                

184050329

In [None]:
socios.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- flagSocioDuplaId: integer (nullable = true)
 |-- percentualEmpresasEmLocalEntregaInterna: double (nullable = true)
 |-- flagSocioEmpresaComSocioFalecido: integer (nullable = true)
 |-- quantidadeMaximaEmailsIguaisEmpresasAtivas: long (nullable = true)
 |-- quantidadeMediaEmpresasMesmoCnaeAbertas12Meses: double (nullable = true)
 |-- quantidadeMaximaAlteracoesEnderecos12meses: long (nullable = true)
 |-- quantidadeMaximaAlteracoesQsa365Dias: long (nullable = true)
 |-- quantidadeMaximaAlteracoesQsa30Dias: long (nullable = true)
 |-- flagSocioProgramaSocial: integer (nullable = true)
 |-- flagSocioEmpresasGrandePorteBaixaRenda: integer (nullable = true)
 |-- flagSocioEmpresasEPPBaixaRenda: integer (nullable = true)
 |-- flagSocioBeneficiarioProgramaSocial: integer (nullable = true)
 |-- flagSocioClasseEmpresaFachadaALTA: integer (nullable = true)
 |-- flagSocioClasseEmpresaFachadaMUITOALTA: integer (nullable = true)
 |-- flagSocioClasseEmpresa

### Salvando dados

In [None]:
DATASET_NAME = 'socios.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

socios.write.mode("overwrite").parquet(PATH)

                                                                                

## Patrimônio

### Imovel

In [None]:
sum_alto = lambda col, quantile_value: sf.sum(sf.when(sf.col(col) >= quantile_value, 1).otherwise(0)).alias("qtdImoveisAltoValor")
sum_mediana = lambda col, quantile_value: sf.sum(sf.when(sf.col(col) >= quantile_value, 1).otherwise(0)).alias("qtdImoveisAcimaMediana")

In [None]:
def economic_profile_property(renda: DataFrame, beneficios:DataFrame, imoveis:DataFrame) -> DataFrame:
    quantile_value = imoveis.approxQuantile("vl_avaliacao", [0.9], 0.01)[0]
    quantile_mediana_value = imoveis.approxQuantile("vl_avaliacao", [0.5], 0.01)[0]

    beneficiary_condition = sf.col("flagBeneficiarioProgramasSociais") == 1

    pessoa_imoveis = (
        imoveis
        .groupBy("cd_cpf")
        .agg(
            sf.count("*").alias("qtdImoveis"),
            sf.sum(sf.col("vl_avaliacao")).alias("valorTotalImoveis"),
            sum_alto("vl_avaliacao", quantile_value),
            sum_mediana("vl_avaliacao", quantile_mediana_value)
        )
        .withColumnRenamed("cd_cpf", "cpf")
        .join(pessoa_renda_estimada, how="left", on="cpf")
        .withColumn("flagBaixaRendaValorImobiliarioAcimaPercentil90",
                    sf.when((sf.col("qtdImoveisAltoValor") > 0) &
                        (sf.col("flagBaixaRenda") == 1), 1
                       ).otherwise(0)
                   )
        .join(pessoa_beneficios, how="left", on="cpf")
        .withColumn("flagBeneficiarioValorImobiliarioAcimaPercentil90",
                    sf.when(beneficiary_condition &
                            (sf.col("qtdImoveisAltoValor") > 0), 1
                           ).otherwise(0)
                   )
    )

    columns = ["cpf", "qtdImoveis", "valorTotalImoveis",
               "qtdImoveisAcimaMediana", "qtdImoveisAltoValor",
               "flagBaixaRendaValorImobiliarioAcimaPercentil90",
               "flagBeneficiarioValorImobiliarioAcimaPercentil90"
              ]
    return pessoa_imoveis.select(*columns)

In [None]:
imoveis = oculto

In [None]:
imoveis.count()

                                                                                

8791398

In [None]:
imoveis.printSchema()

root
 |-- id_pk: long (nullable = true)
 |-- id_imovel: string (nullable = true)
 |-- sg_uf_registro: string (nullable = true)
 |-- cd_cpf: string (nullable = true)
 |-- cd_cnpj: string (nullable = true)
 |-- nm_proprietario: string (nullable = true)
 |-- dt_consulta: timestamp (nullable = true)
 |-- qt_area_terreno: double (nullable = true)
 |-- qt_area_construida: double (nullable = true)
 |-- nu_ano_construcao: integer (nullable = true)
 |-- vl_avaliacao: double (nullable = true)
 |-- nm_logradouro: string (nullable = true)
 |-- nu_logradouro: string (nullable = true)
 |-- de_complemento: string (nullable = true)
 |-- nm_bairro: string (nullable = true)
 |-- nm_municipio: string (nullable = true)
 |-- sg_uf: string (nullable = true)
 |-- nu_cep: string (nullable = true)
 |-- cd_latitude: double (nullable = true)
 |-- cd_longitude: double (nullable = true)
 |-- dh_processamento: timestamp (nullable = true)
 |-- dh_ultima_atualizacao: timestamp (nullable = true)
 |-- dh_geracao_entida

In [None]:
quantile_value = imoveis.approxQuantile("vl_avaliacao", [0.9], 0.01)[0]
quantile_mediana_value = imoveis.approxQuantile("vl_avaliacao", [0.5], 0.01)[0]

                                                                                

In [None]:
quantile_value

784288.17

In [None]:
pessoa_imoveis = economic_profile_property(pessoa_renda_estimada, pessoa_beneficios, imoveis)

                                                                                

In [None]:
pessoa_renda_estimada.count()

In [None]:
pessoa_beneficios.count()

In [None]:
pessoa_imoveis.count()

In [None]:
pessoa_imoveis.printSchema()

### Veiculos Pesados

In [None]:
cols = ["cpf", "quantidadeVeiculosPesados",
        "quantidadeVeiculosPesadosAnoFabricacaoEntre0E4AnosAtras",
        "quantidadeVeiculosPesadosAnoFabricacaoEntre5E9AnosAtras",
        "quantidadeVeiculosPesadosAnoFabricacaoEntre10E14AnosAtras"]

In [None]:
pessoa_veiculo_pesado = (
    oculto
    .select(*cols)
)

In [None]:
pessoa_veiculo_pesado.count()

                                                                                

1427108

In [None]:
pessoa_veiculo_pesado.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- quantidadeVeiculosPesados: long (nullable = true)
 |-- quantidadeVeiculosPesadosAnoFabricacaoEntre0E4AnosAtras: long (nullable = true)
 |-- quantidadeVeiculosPesadosAnoFabricacaoEntre5E9AnosAtras: long (nullable = true)
 |-- quantidadeVeiculosPesadosAnoFabricacaoEntre10E14AnosAtras: long (nullable = true)



### Aeronaves

In [None]:
cols = ["cpf", "flagOperadorAeronave",
        "flagProprietarioAeronave",
        "quantidadeAeronaves"]

In [None]:
pessoa_aeronaves = (
    oculto
    .select("cpf",
            sf.col("flagOperadorAeronave").cast("int").alias("flagOperadorAeronave"),
            sf.col("flagProprietarioAeronave").cast("int").alias("flagProprietarioAeronave"),
            "quantidadeAeronaves")
)

In [None]:
pessoa_aeronaves.count()

                                                                                

15455

In [None]:
pessoa_aeronaves.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- flagOperadorAeronave: integer (nullable = true)
 |-- flagProprietarioAeronave: integer (nullable = true)
 |-- quantidadeAeronaves: long (nullable = true)



### Propriedades Rurais

In [None]:
cols = ["cpf", "valorAreaTotalPropriedadesRurais",
        "quantidadePropriedades", "quantidadePropriedadesAtivas"
       ]

In [None]:
pessoa_propriedades_rurais = (
    oculto
    .select(*cols)
)

### dataset patrimonio

In [None]:
def get_income_asset_flags(pessoa_aeronaves: DataFrame, pessoa_veiculo_pesado: DataFrame, renda_estimada: DataFrame, beneficios: DataFrame) -> DataFrame:
    beneficiary_condition = sf.col("flagBeneficiarioProgramasSociais") == 1
    pessoa_patrimonio_beneficio = (
        pessoa_aeronaves
        .join(pessoa_veiculo_pesado, how="outer", on="cpf")
        .join(renda_estimada, how="outer", on="cpf")
        .join(beneficios, how="outer", on="cpf")
        .withColumn("flagBaixaRendaAeronave",
                    sf.when((sf.col("flagBaixaRenda") == 1) &
                            (sf.col("quantidadeAeronaves") > 0), 1
                           ).otherwise(0)
                   )
        .withColumn("flagBeneficiarioAeronave",
                    sf.when(
                        (beneficiary_condition) &
                        (sf.col("quantidadeAeronaves") > 0), 1)
                    .otherwise(0)
                   )
        .withColumn("flagBeneficiarioVeiculoPesado",
                    sf.when(
                        (beneficiary_condition) &
                        (sf.col("quantidadeVeiculosPesados") > 0), 1)
                    .otherwise(0)
                   )
    )

    return pessoa_patrimonio_beneficio.select("cpf", "flagBaixaRendaAeronave",
                                              "flagBeneficiarioAeronave",
                                              "flagBeneficiarioVeiculoPesado"
                                             )

In [None]:
pessoa_beneficiaria_patrominio = get_income_asset_flags(pessoa_aeronaves, pessoa_veiculo_pesado, pessoa_renda_estimada, pessoa_beneficios)

In [None]:
pessoas_cpf = pessoa_cadastrais.select("cpf")

In [None]:
datasets = [pessoas_cpf, pessoa_imoveis, pessoa_veiculo_pesado,
            pessoa_propriedades_rurais, pessoa_aeronaves,
            pessoa_beneficiaria_patrominio
           ]

In [None]:
patrimonio = reduce(lambda df1, df2: df1.join(df2, on="cpf", how='left'), datasets)

In [None]:
patrimonio = (
    patrimonio
    .fillna(0, subset=['qtdImoveis', 'valorTotalImoveis',
                       'qtdImoveisAcimaMediana',
                       'flagBaixaRendaValorImobiliarioAcimaPercentil90',
                       'flagBeneficiarioValorImobiliarioAcimaPercentil90',
                       'qtdImoveisAltoValor',
                       'quantidadeVeiculosPesados',
                       'quantidadeVeiculosPesadosAnoFabricacaoEntre0E4AnosAtras',
                       'quantidadeVeiculosPesadosAnoFabricacaoEntre5E9AnosAtras',
                       'quantidadeVeiculosPesadosAnoFabricacaoEntre10E14AnosAtras',
                       'valorAreaTotalPropriedadesRurais',
                       'quantidadePropriedades',
                       'quantidadePropriedadesAtivas',
                       'flagOperadorAeronave',
                       'flagProprietarioAeronave',
                       'quantidadeAeronaves'
                      ])
)

In [None]:
%%time
countMissingValues(patrimonio).show(truncate=False)

                                                                                

+---+----------+-----------------+----------------------+-------------------+----------------------------------------------+------------------------------------------------+-------------------------+-------------------------------------------------------+-------------------------------------------------------+---------------------------------------------------------+--------------------------------+----------------------+----------------------------+--------------------+------------------------+-------------------+----------------------+------------------------+-----------------------------+
|cpf|qtdImoveis|valorTotalImoveis|qtdImoveisAcimaMediana|qtdImoveisAltoValor|flagBaixaRendaValorImobiliarioAcimaPercentil90|flagBeneficiarioValorImobiliarioAcimaPercentil90|quantidadeVeiculosPesados|quantidadeVeiculosPesadosAnoFabricacaoEntre0E4AnosAtras|quantidadeVeiculosPesadosAnoFabricacaoEntre5E9AnosAtras|quantidadeVeiculosPesadosAnoFabricacaoEntre10E14AnosAtras|valorAreaTotalPropriedadesRurais

### salvando dados

In [None]:
DATASET_NAME = 'patrimonio.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

patrimonio.write.mode("overwrite").parquet(PATH)

                                                                                

## Financiamento Veicular

In [None]:
catalogo_sng = (
    oculto
    .select("cpf",
            "numTotalDiaPrimeiraCompra",
            "qteVeiculoFinanciadoHistorico",
            # "numTotalDiaPrimeiraCompraFinanciado",
            "numTotalDiaUltimaCompra",
            # "numTotalDiaUltimaCompraFinanciado",
            # "numTotalDiaUltimaVenda",
            # "qteCompraAutomovelNovo",
            # "qteTotalCompraVeiculoNovo",
            # "qteCompraAutomovelSeminovo",
            # "qteTotalCompraVeiculoSeminovo",
            # "qteCompraAutomovelUsado",
            # "qteTotalCompraVeiculoUsado",
            "qteTotalCompraVeiculo",
            # "qteCompraMotocicletaNovo",
            # "qteCompraMotocicletaSeminovo",
            # "qteCompraMotocicletaUsado",
            # "qteTotalCompraMotocicleta",
            "qteFinanciadoAtivo",
            "qteFinanciadoMaximaParcela",
            "qteFinanciadoMediaParcela",
            "qteFinanciadoQuitado",
            "qteFinanciadoQuitadoAntesPrazo",
            "qteFinanciadoQuitadoDepoisPrazo",
            "qteFinanciadoQuitadoPrazo",
            "qteTotalCompraOnibus",
            "valFinanciadoAtualDebito",
            "valFinanciadoMedia",
            "valFinanciadoMediaParcela",
            "valFinanciadoSoma"
           )
)

In [None]:
indicadores_veiculares = (
    oculto
    .select("cpf", "ctaQteFinanciamento",
            "ctaFinanciadorMarcaPremium", "ctaHistoricoAtrasoFinanciamento",
            "ctaMediaParcelaFinanciamento",
            "ctaRecorrenciaFinanciamento",
           )
)

In [None]:
financiamento_veicular = catalogo_sng.join(indicadores_veiculares, on="cpf", how="outer")

In [None]:
financiamento_veicular = financiamento_veicular.withColumn("qtdFinanciamentoFaixa",
                                                           sf.when(sf.col("ctaQteFinanciamento").contains("MAIS QUE"),
                                                                   sf.split("ctaQteFinanciamento", " ")[2]
                                                                  ).otherwise(sf.split("ctaQteFinanciamento", " ")[0])
                                                           .cast("int")
                                                          )

In [None]:
financiamento_veicular = (
    financiamento_veicular.withColumn("qtdVeiculosFinanciados",
                                      sf.coalesce(sf.col("qteVeiculoFinanciadoHistorico"), sf.col("qtdFinanciamentoFaixa"))
                                     )
    .drop("qteVeiculoFinanciadoHistorico", "ctaQteFinanciamento", "qtdFinanciamentoFaixa")
)

In [None]:
def income_financing(renda: DataFrame, financiamento_veicular: DataFrame) -> DataFrame:

    financiamento_veicular_renda = (
        financiamento_veicular.join(renda, how="left", on="cpf")
        .select("cpf", "flagBaixaRenda", "ctaFinanciadorMarcaPremium", "ctaMediaParcelaFinanciamento", "ctaRecorrenciaFinanciamento")
        .withColumn("flagBaixaRendaFinanciamentoVeiculoPremium",
                    sf.when((sf.col("flagBaixaRenda") == 1) &
                            (sf.col("ctaFinanciadorMarcaPremium") == "SIM"), 1
                           ).otherwise(0)
                   )
        .withColumn("flagBaixaRendaMediaParcelaFinanciamentoAlta",
                    sf.when((sf.col("flagBaixaRenda") == 1) &
                            (sf.col("ctaMediaParcelaFinanciamento") == "MAIOR QUE 1500"), 1
                           ).otherwise(0)
                   )
        .withColumn("flagBaixaRendaRecorrenciaFinanciamentoVeicular1a2Anos",
                    sf.when((sf.col("flagBaixaRenda") == 1) &
                            (sf.col("ctaRecorrenciaFinanciamento") == ""), 1
                           ).otherwise(0)
                   )
    )

    return financiamento_veicular_renda.select("cpf",
                                               "flagBaixaRendaFinanciamentoVeiculoPremium",
                                               "flagBaixaRendaMediaParcelaFinanciamentoAlta")


In [None]:
def beneficiary_financing(beneficios: DataFrame, financiamento_veicular: DataFrame) -> DataFrame:

    beneficiary_condition = sf.col("flagBeneficiarioProgramasSociais") == 1

    financiamento_veicular_beneficiario = (
        financiamento_veicular.join(beneficios, how="left", on="cpf")
        .withColumn("flagBeneficiarioProgramasSociaisFinanciamentoMediaParcelaAlta",
                    sf.when(beneficiary_condition &
                            (sf.col("ctaMediaParcelaFinanciamento") == "MAIOR QUE 1500"), 1
                           ).otherwise(0)
                   )
    )


    return financiamento_veicular_beneficiario.select("cpf",
                                                      "flagBeneficiarioProgramasSociaisFinanciamentoMediaParcelaAlta")

In [None]:
financiamento_veicular.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- numTotalDiaPrimeiraCompra: long (nullable = true)
 |-- numTotalDiaUltimaCompra: long (nullable = true)
 |-- qteTotalCompraVeiculo: long (nullable = true)
 |-- qteFinanciadoAtivo: long (nullable = true)
 |-- qteFinanciadoMaximaParcela: long (nullable = true)
 |-- qteFinanciadoMediaParcela: long (nullable = true)
 |-- qteFinanciadoQuitado: long (nullable = true)
 |-- qteFinanciadoQuitadoAntesPrazo: long (nullable = true)
 |-- qteFinanciadoQuitadoDepoisPrazo: long (nullable = true)
 |-- qteFinanciadoQuitadoPrazo: long (nullable = true)
 |-- qteTotalCompraOnibus: long (nullable = true)
 |-- valFinanciadoAtualDebito: long (nullable = true)
 |-- valFinanciadoMedia: long (nullable = true)
 |-- valFinanciadoMediaParcela: long (nullable = true)
 |-- valFinanciadoSoma: long (nullable = true)
 |-- ctaFinanciadorMarcaPremium: string (nullable = true)
 |-- ctaHistoricoAtrasoFinanciamento: string (nullable = true)
 |-- ctaMediaParcelaFinanciamento: string

In [None]:
pessoa_renda_estimada.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- flagRendaSemInformacao: integer (nullable = true)
 |-- flagBaixaRenda: integer (nullable = true)
 |-- flagMediaRenda: integer (nullable = true)
 |-- flagAltaRenda: integer (nullable = true)



In [None]:
financiamento_veicular_renda = income_financing(pessoa_renda_estimada, financiamento_veicular)

In [None]:
financiamento_veicular_beneficiario = beneficiary_financing(pessoa_beneficios, financiamento_veicular)

In [None]:
financiamento_veicular = (
    financiamento_veicular
    .join(financiamento_veicular_renda, how="left", on="cpf")
    .join(financiamento_veicular_beneficiario, how="left", on="cpf")
    .join(pessoa_cadastrais, how="right", on="cpf")
    .withColumn("anosMaioridade", sf.col("idade") - sf.lit(18))
    .withColumn("RecorrenciaFinanciamentoMaioridade",
                sf.when((sf.col("idade") >= 18) &
                        (sf.col("qtdVeiculosFinanciados") > 0),
                        sf.round(sf.col("anosMaioridade")/sf.col("qtdVeiculosFinanciados"), 2)
                       ).otherwise(0)
               )
    .select("cpf",
            "qtdVeiculosFinanciados",
            sf.col("qteFinanciadoQuitado").alias("qtdFinanciamentosVeicularesQuitados"),
            "flagBaixaRendaFinanciamentoVeiculoPremium",
            sf.col("ctaHistoricoAtrasoFinanciamento").alias("historicoAtrasoFinanciamentoVeicular"),
            "flagBaixaRendaMediaParcelaFinanciamentoAlta",
            sf.col("ctaRecorrenciaFinanciamento").alias("recorrenciaFinanciamentoVeicular"),
            "flagBeneficiarioProgramasSociaisFinanciamentoMediaParcelaAlta",
            "RecorrenciaFinanciamentoMaioridade"
           )
    .fillna(0, subset=["qtdVeiculosFinanciados", "qtdFinanciamentosVeicularesQuitados", "flagBaixaRendaFinanciamentoVeiculoPremium",
                       "flagBaixaRendaMediaParcelaFinanciamentoAlta", "flagBeneficiarioProgramasSociaisFinanciamentoMediaParcelaAlta"])
    .fillna("SEM INFORMACAO", subset=["historicoAtrasoFinanciamentoVeicular", "recorrenciaFinanciamentoVeicular"])
)

In [None]:
financiamento_veicular.cache().count()

### Salvando dados

In [None]:
DATASET_NAME = 'financiamento-veicular.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

financiamento_veicular.write.mode("overwrite").parquet(PATH)

## Financiamento Imobiliário

In [None]:
financiamento_imobiliario = oculto

In [None]:
total_size = financiamento_imobiliario.count()
window = Window.orderBy(sf.desc("proportion"))
(
    financiamento_imobiliario.groupBy('descFaixaFinanciamentoMaisAtualAtivo')
    .agg(sf.count('*').alias('count'))
    .withColumn('proportion', sf.round((sf.col('count') / sf.lit(total_size))*100, 5))
    .withColumn('cumulative', sf.round(sf.sum("proportion").over(window), 5))
    .sort("proportion", ascending=False)
).show()

25/03/19 15:55:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 15:55:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 15:55:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 15:55:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 15:55:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 15:55:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/19 1

+------------------------------------+-------+----------+----------+
|descFaixaFinanciamentoMaisAtualAtivo|  count|proportion|cumulative|
+------------------------------------+-------+----------+----------+
|                          0 A 200000|3593753|  66.43532|  66.43532|
|                     200001 A 400000|1049517|  19.40172|  85.83704|
|                                  NA| 332666|   6.14978|  91.98682|
|                     400001 A 600000| 237011|   4.38146|  96.36828|
|                     600001 A 800000|  90287|   1.66908|  98.03736|
|                   1000001 A 2000000|  50551|    0.9345|  98.97186|
|                    800001 A 1000000|  44752|    0.8273|  99.79916|
|                   2000001 A 3000000|   6882|   0.12722|  99.92638|
|                   3000001 A 4000000|   1832|   0.03387|  99.96025|
|                   4000001 A 5000000|    853|   0.01577|  99.97602|
|                   5000001 A 6000000|    430|   0.00795|  99.98397|
|                 10000001 A 25000

                                                                                

In [None]:
def economic_profile_property_financing(renda: DataFrame, beneficios:DataFrame, imoveis:DataFrame) -> DataFrame:

    beneficiary_condition = sf.col("flagBeneficiarioProgramasSociais") == 1

    #TOP 1%
    FINANCIAMENTO_ALTO_FAIXAS = ['800001 A 1000000', '1000001 A 2000000', '2000001 A 3000000',
                                 '3000001 A 4000000', '4000001 A 5000000', '5000001 A 6000000',
                                 '6000001 A 7000000', '7000001 A 8000000', '8000001 A 9000000',
                                 '9000001 A 10000000', '10000001 A 25000000', 'ACIMA DE 25000000']

    pessoa_financiamento_imobiliario = (
        imoveis
        .join(renda, how="left", on="cpf")
        .join(beneficios, how="left", on="cpf")
        .withColumn("flagFinanciamentoImobiliarioAltoValorBeneficiaria",
                    sf.when(beneficiary_condition &
                            (sf.col("descFaixaFinanciamentoMaisAtualAtivo").isin(FINANCIAMENTO_ALTO_FAIXAS)), 1
                           ).otherwise(0)
                   )
        .withColumn("flagFinanciamentoImobiliarioAltoValorBaixaRenda",
                    sf.when((sf.col("flagBaixaRenda") == 1) &
                            (sf.col("descFaixaFinanciamentoMaisAtualAtivo").isin(FINANCIAMENTO_ALTO_FAIXAS)), 1
                           ).otherwise(0)
                   )
    )


    return pessoa_financiamento_imobiliario.select("cpf", "flagFinanciamentoImobiliarioAltoValorBeneficiaria",
                                                   "flagFinanciamentoImobiliarioAltoValorBaixaRenda")

In [None]:
pessoa_financiamento_imobiliario = (
    economic_profile_property_financing(pessoa_renda_estimada, pessoa_beneficios, financiamento_imobiliario)
    .join(pessoa_cadastrais.select("cpf"), on="cpf", how="right")
    .fillna(0)
)

In [None]:
pessoa_financiamento_imobiliario.count()

                                                                                

184050329

### Salvando dados

In [None]:
DATASET_NAME = 'financiamento_imobiliario.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

pessoa_financiamento_imobiliario.write.mode("overwrite").parquet(PATH)

                                                                                

## Investimento

In [None]:
investimento = oculto

In [None]:
def get_investiment_beneficiary_programs(investimento: DataFrame, beneficios: DataFrame) -> DataFrame:

    beneficiary_condition = sf.col("flagBeneficiarioProgramasSociais") == 1

    pessoa_investimento_beneficiaria_programas = (
        investimento
        .join(beneficios, on="cpf", how="left")
        .withColumn("flagBancarizacaoABeneficiarioProgramasSociais",
                    sf.when(
                        (beneficiary_condition)  &
                        (sf.col("bancarizacao") == "A"), 1)
                    .otherwise(0))
        .withColumn("flagPatrimonioABeneficiarioProgramasSociais",
                    sf.when(
                        (beneficiary_condition) &
                        (sf.col("patrimonio") == "A"), 1)
                    .otherwise(0))
        .withColumn("flagAtivosComCotaABeneficiarioProgramasSociais",
                   sf.when(
                       (beneficiary_condition) &
                        (sf.col("ativosComCota") == "A"), 1)
                    .otherwise(0)
                   )
        .withColumn("flagAtivosSemCotaABeneficiarioProgramasSociais",
                   sf.when(
                       (beneficiary_condition) &
                        (sf.col("ativosSemCota") == "A"), 1)
                    .otherwise(0)
                   )

    )

    return pessoa_investimento_beneficiaria_programas.select("cpf", "flagBancarizacaoABeneficiarioProgramasSociais",
                                                             "flagPatrimonioABeneficiarioProgramasSociais",
                                                             "flagAtivosComCotaABeneficiarioProgramasSociais",
                                                             "flagAtivosSemCotaABeneficiarioProgramasSociais"
                                                            )

In [None]:
def get_investiment_income(investimento: DataFrame, renda: DataFrame) -> DataFrame:


    pessoa_investimento_baixa_renda = (
        investimento
        .join(renda, on="cpf", how="left")
        .withColumn("flagBancarizacaoABaixaRenda",
                    sf.when(
                        (sf.col("bancarizacao") == "A") &
                        (sf.col("flagBaixaRenda") == 1), 1)
                    .otherwise(0)
                   )
        .withColumn("flagPatrimonioABaixaRenda",
                    sf.when(
                        (sf.col("patrimonio") == "A") &
                        (sf.col("flagBaixaRenda") == 1), 1)
                    .otherwise(0)
                   )
        .withColumn("flagAtivosComCotaABaixaRenda",
                    sf.when(
                        (sf.col("ativosComCota") == "A") &
                        (sf.col("flagBaixaRenda") == 1), 1)
                    .otherwise(0)
                   )
        .withColumn("flagAtivosSemCotaABaixaRenda",
                    sf.when(
                        (sf.col("ativosSemCota") == "A") &
                        (sf.col("flagBaixaRenda") == 1), 1)
                    .otherwise(0)
                   )
    )


    return pessoa_investimento_baixa_renda.select("cpf", "flagBancarizacaoABaixaRenda",
                                                  "flagPatrimonioABaixaRenda", "flagAtivosComCotaABaixaRenda",
                                                  "flagAtivosSemCotaABaixaRenda"
                                                 )

In [None]:
investimento_beneficiaria = get_investiment_beneficiary_programs(investimento, pessoa_beneficios)

In [None]:
investimento_baixa_renda = get_investiment_income(investimento, pessoa_renda_estimada)

In [None]:
pessoa_investimento = (
    investimento_beneficiaria
    .join(investimento_baixa_renda, how="outer", on="cpf")
    .join(pessoa_cadastrais.select("cpf"), on="cpf", how="right")
    .fillna(0)
)

In [None]:
pessoa_investimento.cache().count()

                                                                                

184050329

### Salvando dados

In [None]:
DATASET_NAME = 'investimento.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

pessoa_investimento.write.mode("overwrite").parquet(PATH)

                                                                                

## Juridico

In [None]:
cols = ["cpf", "flagSocioProcessoJudicialLavagemDinheiro",
        "flagProcessoJudicialCorrupcao", "flagProcessoJudicialLavagem",
        "flagProcessoJudicialFraude", "flagProcessoJudicialRouboFurtos",
        "flagSocioProcessoJudicialFraude", "flagProcessoJudicialCobranca",
        "flagSocioProcessoJudicialCorrupcao", "flagSocioProcessoJudicialTributario"]

In [None]:
juridico = (
    oculto
    .join(pessoa_cadastrais, how='right', on='cpf')
    .select("cpf",
            sf.col("flagSocioProcessoJudicialLavagemDinheiro").cast("int").alias("flagSocioProcessoJudicialLavagemDinheiro"),
            sf.col("flagProcessoJudicialCorrupcao").cast("int").alias("flagProcessoJudicialCorrupcao"),
            sf.col("flagProcessoJudicialLavagem").cast("int").alias("flagProcessoJudicialLavagem"),
            sf.col("flagProcessoJudicialFraude").cast("int").alias("flagProcessoJudicialFraude"),
            sf.col("flagProcessoJudicialRouboFurtos").cast("int").alias("flagProcessoJudicialRouboFurtos"),
            sf.col("flagSocioProcessoJudicialFraude").cast("int").alias("flagSocioProcessoJudicialFraude"),
            sf.col("flagProcessoJudicialCobranca").cast("int").alias("flagProcessoJudicialCobranca"),
            sf.col("flagSocioProcessoJudicialCorrupcao").cast("int").alias("flagSocioProcessoJudicialCorrupcao"),
            sf.col("flagSocioProcessoJudicialTributario").cast("int").alias("flagSocioProcessoJudicialTributario")
           )
)

In [None]:
juridico.count()

                                                                                

184050329

In [None]:
juridico = juridico.fillna(0)

### Salvando dados

In [None]:
DATASET_NAME = 'juridico.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

juridico.write.mode("overwrite").parquet(PATH)

                                                                                

## Vinculos Familiares

In [None]:
relacionamento = (
    oculto
    .withColumnRenamed("cd_cpf", "cpf")
)

In [None]:
relacionamento = (
    pessoa_cadastrais.join(relacionamento,on="cpf", how="inner")
    .select('cpf',
            'nm_pessoa',
            'cd_cpf_relacionado',
            'nm_relacionamento')
)

In [None]:
relacionamento_juridico = (
    oculto
    .select("cpf",
            "flagProcessoJudicialCorrupcao", "flagProcessoJudicialLavagem",
            "flagProcessoJudicialFraude", "flagProcessoJudicialRouboFurtos",
            "flagProcessoJudicialCobranca")
)

In [None]:
relacionamento_juridico = (
    relacionamento
    .join(relacionamento_juridico, how="left",
          on=relacionamento_juridico.cpf==relacionamento.cd_cpf_relacionado
    )
    .drop(relacionamento_juridico.cpf)
    .fillna(False, subset=["flagProcessoJudicialCorrupcao", "flagProcessoJudicialLavagem",
                           "flagProcessoJudicialFraude", "flagProcessoJudicialRouboFurtos",
                           "flagProcessoJudicialCobranca"
                          ])
    .select(
        "cpf",
        sf.col("cd_cpf_relacionado").alias("cpfRelacionado"),
        sf.col("nm_relacionamento").alias("nomeRelacionamento"),
        sf.col("flagProcessoJudicialCorrupcao").cast("int").alias("flagProcessoJudicialCorrupcao"),
        sf.col("flagProcessoJudicialLavagem").cast("int").alias("flagProcessoJudicialLavagem"),
        sf.col("flagProcessoJudicialFraude").cast("int").alias("flagProcessoJudicialFraude"),
        sf.col("flagProcessoJudicialRouboFurtos").cast("int").alias("flagProcessoJudicialRouboFurtos"),
        sf.col("flagProcessoJudicialCobranca").cast("int").alias("flagProcessoJudicialCobranca")
    )
)

In [None]:
parentre_primeiro_grau = ["MAE", "FILHO", "PAI"]
parente_segundo_grau = ["IRMAO", "AVO", "NETO"]

In [None]:
processo_primeiro_grau = lambda col_processo, col_relacionamento: sf.when(
    (sf.col(col_relacionamento).isin(parentre_primeiro_grau)) & (sf.col(col_processo) == 1), 1
).otherwise(0)

processo_segundo_grau = lambda col_processo, col_relacionamento: sf.when(
    (sf.col(col_relacionamento).isin(parente_segundo_grau)) & (sf.col(col_processo) == 1), 1
).otherwise(0)



relacionamentos = (
    relacionamento_juridico
    .withColumn("primeiroGrauProcessoJudicialCorrupcao", processo_primeiro_grau("flagProcessoJudicialCorrupcao", "nomeRelacionamento"))
    .withColumn("primeiroGrauProcessoJudicialLavagem", processo_primeiro_grau("flagProcessoJudicialLavagem", "nomeRelacionamento"))
    .withColumn("primeiroGrauProcessoJudicialFraude", processo_primeiro_grau("flagProcessoJudicialFraude", "nomeRelacionamento"))
    .withColumn("primeiroGrauProcessoJudicialRouboFurtos", processo_primeiro_grau("flagProcessoJudicialRouboFurtos", "nomeRelacionamento"))
    .withColumn("primeiroGrauProcessoJudicialCobranca", processo_primeiro_grau("flagProcessoJudicialCobranca", "nomeRelacionamento"))
    .withColumn("segundoGrauProcessoJudicialCorrupcao", processo_segundo_grau("flagProcessoJudicialCorrupcao", "nomeRelacionamento"))
    .withColumn("segundoGrauProcessoJudicialLavagem", processo_segundo_grau("flagProcessoJudicialLavagem", "nomeRelacionamento"))
    .withColumn("segundoGrauProcessoJudicialFraude", processo_segundo_grau("flagProcessoJudicialFraude", "nomeRelacionamento"))
    .withColumn("segundoGrauProcessoJudicialRouboFurtos", processo_segundo_grau("flagProcessoJudicialRouboFurtos", "nomeRelacionamento"))
    .withColumn("segundoGrauProcessoJudicialCobranca", processo_segundo_grau("flagProcessoJudicialCobranca", "nomeRelacionamento"))
    .groupBy("cpf")
    .agg(
        sf.count("*").alias("quantidadeRelacionamentos"),
        sf.max("primeiroGrauProcessoJudicialCorrupcao").alias("flagPrimeiroGrauProcessoJudicialCorrupcao"),
        sf.max("primeiroGrauProcessoJudicialLavagem").alias("flagPrimeiroGrauProcessoJudicialLavagem"),
        sf.max("primeiroGrauProcessoJudicialFraude").alias("flagPrimeiroGrauProcessoJudicialFraude"),
        sf.max("primeiroGrauProcessoJudicialRouboFurtos").alias("flagPrimeiroGrauProcessoJudicialRouboFurtos"),
        sf.max("primeiroGrauProcessoJudicialCobranca").alias("flagPrimeiroGrauProcessoJudicialCobranca"),
        sf.max("segundoGrauProcessoJudicialCorrupcao").alias("flagSegundoGrauProcessoJudicialCorrupcao"),
        sf.max("segundoGrauProcessoJudicialLavagem").alias("flagSegundoGrauProcessoJudicialLavagem"),
        sf.max("segundoGrauProcessoJudicialFraude").alias("flagSegundoGrauProcessoJudicialFraude"),
        sf.max("segundoGrauProcessoJudicialRouboFurtos").alias("flagSegundoGrauProcessoJudicialRouboFurtos"),
        sf.max("segundoGrauProcessoJudicialCobranca").alias("flagSegundoGrauProcessoJudicialCobranca")

    )
    .join(pessoa_cadastrais.select('cpf'), how='right', on='cpf')
    .fillna(0)
)

In [None]:
countMissingValues(relacionamentos).toPandas()

                                                                                

Unnamed: 0,cpf,quantidadeRelacionamentos,flagPrimeiroGrauProcessoJudicialCorrupcao,flagPrimeiroGrauProcessoJudicialLavagem,flagPrimeiroGrauProcessoJudicialFraude,flagPrimeiroGrauProcessoJudicialRouboFurtos,flagPrimeiroGrauProcessoJudicialCobranca,flagSegundoGrauProcessoJudicialCorrupcao,flagSegundoGrauProcessoJudicialLavagem,flagSegundoGrauProcessoJudicialFraude,flagSegundoGrauProcessoJudicialRouboFurtos,flagSegundoGrauProcessoJudicialCobranca
0,0,0,0,0,0,0,0,0,0,0,0,0


### Salvando dados

In [None]:
DATASET_NAME = 'relacionamento.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

relacionamentos.write.mode("overwrite").parquet(PATH)

                                                                                

## Endividamento

In [None]:
endividamento = (
    oculto
    .join(pessoa_cadastrais, on='cpf', how='right')
    .select("cpf",
            sf.col("flagDividaDauCresceu180Dias").cast("int").alias("flagDividaDauCresceu180Dias"),
            "valorTotalDividasDau"
           )
)

In [None]:
endividamento = endividamento.fillna(0)

In [None]:
countMissingValues(endividamento).toPandas()

                                                                                

Unnamed: 0,cpf,flagDividaDauCresceu180Dias,valorTotalDividasDau
0,0,0,0


### Salvando dados

In [None]:
DATASET_NAME = 'endividamento.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

endividamento.write.mode("overwrite").parquet(PATH)

                                                                                

## Doação Política

In [None]:
cols = ["cpf", "quantidadeCandidatosApoiados",
        "quantidadeEleicoesComoDoador",
        "valorMedioDoado", "valorTotalDoado",
        "quantidadeDoacoesDescricaoNaoEspecificada",
        "valorMaximoDoado",
        "quantidadeDoacoesPoliticas",
        "flagContratoPublicoBeneficiarioProgramasSociais",
        "flagLaranja"]

In [None]:
doacao_politica = (
    oculto
    .join(pessoa_cadastrais, how='right', on='cpf')
    .select("cpf", "quantidadeCandidatosApoiados",
            "quantidadeEleicoesComoDoador",
            "valorMedioDoado", "valorTotalDoado",
            "quantidadeDoacoesDescricaoNaoEspecificada",
            "valorMaximoDoado", "quantidadeDoacoesPoliticas",
            sf.col("flagContratoPublicoBeneficiarioProgramasSociais").cast("int").alias("flagContratoPublicoBeneficiarioProgramasSociais"),
            sf.col("flagLaranja").cast("int").alias("flagLaranja")
           )
)

In [None]:
countMissingValues(doacao_politica).toPandas()

                                                                                

Unnamed: 0,cpf,quantidadeCandidatosApoiados,quantidadeEleicoesComoDoador,valorMedioDoado,valorTotalDoado,quantidadeDoacoesDescricaoNaoEspecificada,valorMaximoDoado,quantidadeDoacoesPoliticas,flagContratoPublicoBeneficiarioProgramasSociais,flagLaranja
0,0,114713310,114713310,114713310,114713310,114713310,114713310,114713310,114713310,114713310


In [None]:
doacao_politica = doacao_politica.fillna(0)

### Salvando dados

In [None]:
DATASET_NAME = 'doacao_politica.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

doacao_politica.write.mode("overwrite").parquet(PATH)

                                                                                

## Telefone

In [None]:
pessoa_telefone = (
    oculto
    .join(oculto, how='left', on='id')
    .select("cpf", "numero")
)

In [None]:
dddAddr = lambda ddd, uf: 1 if ddd in UF_DDD.get(uf, []) else 0

In [None]:
pessoa_telefone.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- numero: string (nullable = true)



In [None]:
endereco_simm.printSchema()

root
 |-- cd_cpf: string (nullable = true)
 |-- nm_logradouro: string (nullable = true)
 |-- nu_logradouro: string (nullable = true)
 |-- nm_bairro: string (nullable = true)
 |-- cd_cep: string (nullable = true)
 |-- nm_municipio: string (nullable = true)
 |-- sg_uf: string (nullable = true)
 |-- cd_latitude: double (nullable = true)
 |-- cd_longitude: double (nullable = true)



In [None]:
telefone = (
    pessoa_telefone
    .join(
        endereco_simm.select(
            sf.col("cd_cpf").alias("cpf"),
            "sg_uf"
        ), on="cpf", how="inner")
    .withColumn("ddd", sf.col("numero").substr(1, 2).cast("int"))
    .withColumn("flagDDDConsistenteEndereco", sf.udf(dddAddr, IntegerType())("ddd", "sg_uf"))
    .groupBy("cpf")
    .agg(
        sf.count("*").alias("quantidadeTelefones"),
        sf.sum("flagDDDConsistenteEndereco").alias("quantidadeTelefonesEnderecoConsistente")
    )
    .join(pessoa_cadastrais.select('cpf'), how='right', on='cpf')
    .fillna(0)
)

In [None]:
countMissingValues(telefone).toPandas()

                                                                                ]]

Unnamed: 0,cpf,quantidadeTelefones,quantidadeTelefonesEnderecoConsistente
0,0,0,0


In [None]:
telefone.printSchema()

root
 |-- cpf: string (nullable = true)
 |-- quantidadeTelefones: long (nullable = true)
 |-- quantidadeTelefonesEnderecoConsistente: long (nullable = true)



### Salvando dados

In [None]:
DATASET_NAME = 'telefone.parquet'
PATH = os.path.join(oculto, 'data/')

telefone.write.mode("overwrite").parquet(os.path.join(PATH, DATASET_NAME))

                                                                                ]]

## Email

In [None]:
COMMON_PROVIDERS = ["GMAIL", "OUTLOOK", "UOL", "YAHOO", "HOTMAIL", "ICLOUD"]

In [None]:
pessoa_email = (
    oculto
    .join(oculto, how='inner', on='id')
    .join(pessoa_cadastrais, how='inner', on='cpf')
    .select("cpf", "nome", "email")
    .withColumn("provedorEmail", sf.regexp_extract("email", r"@([\w]+)\.", 1))
    # .withColumn("flagEmailCorporativo", sf.when(sf.col("provedorEmail").isin(COMMON_PROVIDERS), 0).otherwise(1))
    .withColumn("flagEmailPadraoSuspeito",
                sf.when(sf.col("email").rlike(r".*(@.*@).*"), 1) # mais de um "@"
                .when(sf.col("email").rlike(r".*@(fake_domain|temp-mail|unknown).*"), 1) # dominios invalidos ou genericos
                .when(sf.col("email").rlike(r".*[0-9]{4,}.*"), 1) #n ome do usuario com 4 ou mais numeros
                .when(sf.col("email").rlike(r".*[^a-zA-Z0-9._%+-]{2,}.*"), 1)  # muitos caracteres especiais
                .when(sf.col("email").rlike(r".*@.*\.(xyz|fake|test|null|unknown)$"), 1)  # extensões suspeitas
                .when(sf.col("email").rlike(r"^[^@]{1,2}@.*"), 1)  # usuário muito curto
                .when(sf.col("email").rlike(r".*@.{1,2}\..*"), 1)  # domínio muito curto
                .when(sf.col("email").rlike(r".*@((gmail|yahoo|outlook|hotmail)[-].*|.*\.biz)$"), 1)  # público imitando corporativo
                .when(sf.col("email").rlike(r".*([a-zA-Z0-9])\1{2,}.*"), 1)  # repetição de caracteres
                .when(sf.col("email").rlike(r".*\d{5,}.*"), 1)  # sequência numérica longa
                .when(sf.col("email").rlike(r".*@.*[^a-zA-Z0-9.-].*\..*"), 1)  # caracteres inválidos no domínio
                .when(sf.col("email").rlike(r"^(admin|support|contact|info|help)@.*"), 1)  # genéricos suspeitos
                .when(sf.col("email").rlike(r".*@((temp-mail|10minutemail).*)"), 1)  # emails temporários
                .when(sf.col("email").rlike(r".*@mail\..*"), 1)  # Subdomínios genéricos
                .otherwise(0)
               )
    .withColumn("usuario", sf.split(sf.col("email"), '@')[0])
    .withColumn("levenshteinScore",
                sf.expr("""1 - (levenshtein(nome, usuario) / greatest(length(nome), length(usuario)))""")
               )
    .withColumn("flagEmailNaoConsistenteNome",
                sf.when(
                    sf.expr("""
                    1 - (levenshtein(nome, usuario) / greatest(length(nome), length(usuario)))""") < 0.3, 1
                ).otherwise(0)
               )
)

In [None]:
email = (
    pessoa_email
    .groupBy("cpf")
    .agg(
        sf.count("*").alias("quantidadeEmails"),
        sf.count_distinct(sf.col("provedorEmail")).alias("quantidadeDiferentesprovedoresEmail"),
        # sf.sum("flagEmailCorporativo").alias("quantidadeEmailCorporativo"),
        sf.sum("flagEmailPadraoSuspeito").alias("quantidadeEmailPadraoSuspeito"),
        sf.sum("flagEmailNaoConsistenteNome").alias("quantidadeEmailNaoConsistenteNomeTitular")
    )
    .join(pessoa_cadastrais.select('cpf'), how='right', on='cpf')
    .fillna(0)
)

In [None]:
pessoa_email.filter(sf.col("flagEmailNaoConsistenteNome") == 1).limit(5).toPandas()

In [None]:
email.cache().count()

### Salvando dados

In [None]:
DATASET_NAME = 'email.parquet'
PATH = os.path.join(oculto, DATASET_NAME)

email.write.mode("overwrite").parquet(PATH)

                                                                                

In [None]:
%%time
oculto_RAW = oculto + "RAW/"
DATASET_NAME = 'PERFIL-RAW.parquet'
PATH = os.path.join(oculto_RAW, DATASET_NAME)

df_laranja.write.mode("overwrite").parquet(PATH)

                                                                                

CPU times: user 722 ms, sys: 210 ms, total: 932 ms
Wall time: 9min 11s
