In [0]:
#Algoritmo baseado em pyspark - Diego Palomo - 10/06/2022
import pyspark

##################################################### IMPORTAÇÃO #########################################################################################
#Importa arquivo do repositório github
url = "https://raw.githubusercontent.com/diegopalomo89/talent-data-analyst-lv4/main/dados_cadastrais_fake.csv"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)
df_cadastro = spark.read.csv("file://"+SparkFiles.get("dados_cadastrais_fake.csv"), header=True, inferSchema= True, sep=';')

#Verifica se colunas foram importadas corretamente e o tipo delas
df_cadastro.printSchema()

#Seleciona linhas de exemplo pra checar se a importação ocorreu com êxito
df_cadastro.select("nomes", "idade", "cidade", "estado", "cpf", "cnpj").show(20)


##################################################### HIGIENIZAÇÃO #######################################################################################

#Higieniza os nomes dos estados
from pyspark.sql.functions import when
df_cadastro=df_cadastro.withColumn('estado', 
    when(df_cadastro.estado==('são  paulo'),regexp_replace(df_cadastro.estado,'são  paulo','SP')) \
   .when(df_cadastro.estado==('MINAS GERAIs'),regexp_replace(df_cadastro.estado,'MINAS GERAIs','MG')) \
   .when(df_cadastro.estado==('MINAS GERAI'),regexp_replace(df_cadastro.estado,'MINAS GERAI','MG')) \
   .when(df_cadastro.estado==('sao  paulo'),regexp_replace(df_cadastro.estado,'sao  paulo','SP')) \
   .when(df_cadastro.estado==('rio de  janeiro '),regexp_replace(df_cadastro.estado,'rio de  janeiro ','RJ')) \
   .when(df_cadastro.estado==('distrito federal'),regexp_replace(df_cadastro.estado,'distrito federal','DF')) \
   .otherwise(df_cadastro.estado)) 

#Deixa todas as cidades em caixa alta, seguindo o padrão da maioria do arquivo
from pyspark.sql.functions import upper, col
df_cadastro=df_cadastro.withColumn('cidade', upper(col('cidade')))


#Realiza o tratamento das colunas de CPF e CNPJ, criando novas colunas com os valores higienizados (removidos caracteres especiais, espaços e letras)
from pyspark.sql.functions import regexp_replace, col
df_cadastro=df_cadastro.withColumn("cpf_final",regexp_replace(col("cpf"), "[\-.(\s+)(\D+)]", ""))
df_cadastro=df_cadastro.withColumn("cnpj_final",regexp_replace(col("cnpj"), "[\-.(\s+)(\D+)]", ""))
df_cadastro.printSchema()

#Seleciona linhas de exemplo comparando o dado bruto com o dado higienizado
df_cadastro.select("cpf","cpf_final").show(20)
df_cadastro.select("cnpj","cnpj_final").show(20)


#################################### VALIDAÇÃO CPF/CNPJ #################################################################################################

#Validação do CPF. Para checar se um CPF é válido, nós primeiramos validamos o primeiro dígito verificador. Realizamos essa verificação fazendo a multiplicação dos nove digítos por uma sequência decrescente de 10 a 2, então somando os resultados e dividindo por 11. O resultado do resto da divisão deve bater com o primeiro dígito verificador. Então fazemos o mesmo cálculo mas considerando também o primeiro dígito verificador na soma para checar o segundo dígito verificador, conforme consta no site da Receita Federal.

from pyspark.sql.functions import substring
df_cadastro=df_cadastro.withColumn("dig1",substring(df_cadastro.cpf_final,1,1))\
                       .withColumn("dig2",substring(df_cadastro.cpf_final,2,1))\
                       .withColumn("dig3",substring(df_cadastro.cpf_final,3,1))\
                       .withColumn("dig4",substring(df_cadastro.cpf_final,4,1))\
                       .withColumn("dig5",substring(df_cadastro.cpf_final,5,1))\
                       .withColumn("dig6",substring(df_cadastro.cpf_final,6,1))\
                       .withColumn("dig7",substring(df_cadastro.cpf_final,7,1))\
                       .withColumn("dig8",substring(df_cadastro.cpf_final,8,1))\
                       .withColumn("dig9",substring(df_cadastro.cpf_final,9,1))\
                       .withColumn("dig10",substring(df_cadastro.cpf_final,10,1))\
                       .withColumn("dig11",substring(df_cadastro.cpf_final,11,1))

#Verifica se dígitos posicionais foram extraídos corretamente
df_cadastro.select("cpf_final", "dig1","dig2","dig3","dig4","dig5","dig6","dig7","dig8","dig9","dig10","dig11").show(10)

#Realiza o cálculo para validar o primeiro dígito verificador
df_cadastro=df_cadastro.withColumn("calcDigVerif1",((((df_cadastro.dig1*10)+(df_cadastro.dig2*9)+(df_cadastro.dig3*8)+(df_cadastro.dig4*7)+(df_cadastro.dig5*6)+(df_cadastro.dig6*5)+(df_cadastro.dig7*4)+(df_cadastro.dig8*3)+(df_cadastro.dig9*2))*10)%11))

#Realiza o cálculo para validar o segundo dígito verificador
df_cadastro=df_cadastro.withColumn("calcDigVerif2",((((df_cadastro.dig1*11)+(df_cadastro.dig2*10)+(df_cadastro.dig3*9)+(df_cadastro.dig4*8)+(df_cadastro.dig5*7)+(df_cadastro.dig6*6)+(df_cadastro.dig7*5)+(df_cadastro.dig8*4)+(df_cadastro.dig9*3)+(df_cadastro.dig10*2))*10)%11))

#Converte pra integer
from pyspark.sql.functions import col
df_cadastro=df_cadastro.withColumn(("calcDigVerif1"), col("calcDigVerif1").cast("integer"))
df_cadastro=df_cadastro.withColumn(("calcDigVerif2"), col("calcDigVerif2").cast("integer"))

#Valida o resultado dos cálculos
df_cadastro.select("cpf_final", "dig1","dig2","dig3","dig4","dig5","dig6","dig7","dig8","dig9","dig10","dig11","calcDigVerif1", "calcDigVerif2").show(10)

#Gera coluna com o resultado da validação dos CPFs, considerando os dígitos verificadores para checar se o CPF é válido ou não
import pyspark.sql.functions as F
df_cadastro = df_cadastro.withColumn('cpfValido',
   F.when((df_cadastro.dig10==df_cadastro.calcDigVerif1) & (df_cadastro.dig11==df_cadastro.calcDigVerif2), 'Válido').otherwise('Inválido'))

#Mostra o resultado da validação dos CPFs
df_cadastro.select("cpf_final", "dig10","dig11","calcDigVerif1", "calcDigVerif2", "cpfValido").show(10)


#Inicia validação dos CNPJs
#O digito verificador do CNPJ é calculado multiplicando os 12 primeiros dígitos pela sequência posicional 5,4,3,2,9,8,7,6,5,4,3,2
from pyspark.sql.functions import substring
df_cadastro=df_cadastro.withColumn("dig1",substring(df_cadastro.cnpj_final,1,1))\
                       .withColumn("dig2",substring(df_cadastro.cnpj_final,2,1))\
                       .withColumn("dig3",substring(df_cadastro.cnpj_final,3,1))\
                       .withColumn("dig4",substring(df_cadastro.cnpj_final,4,1))\
                       .withColumn("dig5",substring(df_cadastro.cnpj_final,5,1))\
                       .withColumn("dig6",substring(df_cadastro.cnpj_final,6,1))\
                       .withColumn("dig7",substring(df_cadastro.cnpj_final,7,1))\
                       .withColumn("dig8",substring(df_cadastro.cnpj_final,8,1))\
                       .withColumn("dig9",substring(df_cadastro.cnpj_final,9,1))\
                       .withColumn("dig10",substring(df_cadastro.cnpj_final,10,1))\
                       .withColumn("dig11",substring(df_cadastro.cnpj_final,11,1))\
                       .withColumn("dig12",substring(df_cadastro.cnpj_final,12,1))\
                       .withColumn("dig13",substring(df_cadastro.cnpj_final,13,1))\
                       .withColumn("dig14",substring(df_cadastro.cnpj_final,14,1))


#Verifica se dígitos posicionais foram extraídos corretamente
df_cadastro.select("cnpj_final", "dig1","dig2","dig3","dig4","dig5","dig6","dig7","dig8","dig9","dig10","dig11","dig12","dig13","dig14").show(10)

#Realiza o cálculo para validar o primeiro dígito verificador. Subtraímos 11 pelo resto da divisão. Se o resto for menor que 2, gravamos 0.
df_cadastro=df_cadastro.withColumn("calcCnpjResto",(((df_cadastro.dig1*5)+(df_cadastro.dig2*4)+(df_cadastro.dig3*3)+(df_cadastro.dig4*2)+(df_cadastro.dig5*9)+(df_cadastro.dig6*8)+(df_cadastro.dig7*7)+(df_cadastro.dig8*6)+(df_cadastro.dig9*5)+(df_cadastro.dig10*4)+(df_cadastro.dig11*3)+(df_cadastro.dig12*2))%11))

import pyspark.sql.functions as F
df_cadastro = df_cadastro.withColumn('calcDigVerif1',
   F.when((df_cadastro.calcCnpjResto>1), 11-df_cadastro.calcCnpjResto).otherwise(0))



#Realiza o cálculo para validar o segundo dígito verificador.

df_cadastro=df_cadastro.withColumn("calcCnpjResto",(((df_cadastro.dig1*6)+(df_cadastro.dig2*5)+(df_cadastro.dig3*4)+(df_cadastro.dig4*3)+(df_cadastro.dig5*2)+(df_cadastro.dig6*9)+(df_cadastro.dig7*8)+(df_cadastro.dig8*7)+(df_cadastro.dig9*6)+(df_cadastro.dig10*5)+(df_cadastro.dig11*4)+(df_cadastro.dig12*3)+(df_cadastro.dig13*2))%11))

import pyspark.sql.functions as F
df_cadastro = df_cadastro.withColumn('calcDigVerif2',
   F.when((df_cadastro.calcCnpjResto>1), 11-df_cadastro.calcCnpjResto).otherwise(0))

#Converte pra integer
from pyspark.sql.functions import col
df_cadastro=df_cadastro.withColumn(("calcDigVerif1"), col("calcDigVerif1").cast("integer"))
df_cadastro=df_cadastro.withColumn(("calcDigVerif2"), col("calcDigVerif2").cast("integer"))

#Valida o resultado dos cálculos
df_cadastro.select("cnpj_final", "dig1","dig2","dig3","dig4","dig5","dig6","dig7","dig8","dig9","dig10","dig11","dig12","dig13","dig14","calcDigVerif1", "calcDigVerif2").show(10)

#Gera coluna com o resultado da validação dos CPFs, considerando os dígitos verificadores para checar se o CPF é válido ou não
import pyspark.sql.functions as F
df_cadastro = df_cadastro.withColumn('cnpjValido',
   F.when((df_cadastro.dig13==df_cadastro.calcDigVerif1) & (df_cadastro.dig14==df_cadastro.calcDigVerif2), 'Válido').otherwise('Inválido'))

#Mostra o resultado da validação dos CPFs
df_cadastro.select("cnpj_final", "dig13","dig14","calcDigVerif1", "calcDigVerif2", "cnpjValido").show(10)

#Elimina eventuais duplicidades
dfCadastroFinal = df_cadastro.distinct()



#################################################################### REPORTS #############################################################################

#Gera o report de média de idade
import pyspark.sql.functions as F
dfMediaIdade=dfCadastroFinal.select(F.mean("idade"))

#Gera o report de total de clientes
import pyspark.sql.functions as F
dfTotalClientes=dfCadastroFinal.select(F.countDistinct("nomes", "cpf_final", "cnpj_final"))

#Gera o report de clientes por estado
dfEstadoClientes=dfCadastroFinal.groupBy("estado").count()

#Gera o report de CPFs válidos vs inválidos
dfCpfsValidos=dfCadastroFinal.groupBy("cpfValido").count()
dfCpfsValidos.show(10)

#Gera o report de CNJPs válidos vs inválidos
dfCnpjsValidos=dfCadastroFinal.groupBy("cnpjValido").count()
dfCnpjsValidos.show(10)

dfCadastroFinal.printSchema()

dfCadastroFinal=dfCadastroFinal.drop("dig1")\
                               .drop("dig2")\
                               .drop("dig3")\
                               .drop("dig4")\
                               .drop("dig5")\
                               .drop("dig6")\
                               .drop("dig7")\
                               .drop("dig8")\
                               .drop("dig9")\
                               .drop("dig10")\
                               .drop("dig11")\
                               .drop("dig12")\
                               .drop("dig13")\
                               .drop("dig14")\
                               .drop("calcDigVerif1")\
                               .drop("calcDigVerif2")\
                               .drop("calcCnpjResto")\
                               .drop("cpf")\
                               .drop("cnpj")

dfCadastroFinal=dfCadastroFinal.withColumnRenamed("cpf_final","cpf")\
                               .withColumnRenamed("cnpj_final","cnpj")



######################################################## GERAÇÃO DOS ARQUIVOS ###########################################################################

dfCadastroFinal.write.mode('overwrite').parquet("/temp/spark_output/problema1_normalizado")
dfCadastroFinal.write.format('csv').option('header',True).mode('overwrite').option('sep',';').save('file:///home/tangr/output.csv')