# Validação de CPFs - Experimento de validação paralelizada com Spark

https://colab.research.google.com/drive/1qotmO1ZczgMRiWDlnMTxgh1nvfqjDyWC?usp=sharing

# Preparo dos dados

## Geração de números aleatórios

In [1]:
import numpy as np

In [2]:
# Geração de números inteiros com até 9 dígitos
# para posterior cálculo de DV usando algoritmo
# de validação de CPFs
np.random.seed(42)

# quantidade de números a serem gerados
#num_amostras = 200
#num_amostras = int(10e6)
num_amostras = int(100e6)

# Geração das amostras (números inteiros com até 11 dígitos)
amostras = np.random.randint(99999999999, size=num_amostras)

# exibir informações do array de amostra
print('Amostra de valores:\n', amostras)
print('Quantidade de itens:', len(amostras))
print('Tamanho em memória: %.1f MB' % (amostras.size * amostras.itemsize / 1024 / 1024))


Amostra de valores:
 [29190929843 46298420295 27684640889 ... 39837153915 56931352828
 23285346913]
Quantidade de itens: 100000000
Tamanho em memória: 762.9 MB


## Geração de múltiplos arquivos

In [3]:
# quantidade de partições (arquivos a serem gerados)
qtd_particoes = 20

# dividir o vetor nas diversas partições
amostras_particoes = np.array_split(amostras, qtd_particoes)
amostras_particoes[0][:10]

array([29190929843, 46298420295, 27684640889, 77738800342, 44922131914,
       13498510183, 90436599092,  7395928407, 51275693469, 21478181249])

In [4]:
!rm -rf arquivos && mkdir arquivos

In [5]:
# gerar um arquivo de texto para cada partição
for i in range(len(amostras_particoes)):
  arq = "arquivos/%03d.txt" % (i+1)
  print(arq)
  np.savetxt(arq, amostras_particoes[i], fmt='%d')

arquivos/001.txt
arquivos/002.txt
arquivos/003.txt
arquivos/004.txt
arquivos/005.txt
arquivos/006.txt
arquivos/007.txt
arquivos/008.txt
arquivos/009.txt
arquivos/010.txt
arquivos/011.txt
arquivos/012.txt
arquivos/013.txt
arquivos/014.txt
arquivos/015.txt
arquivos/016.txt
arquivos/017.txt
arquivos/018.txt
arquivos/019.txt
arquivos/020.txt


In [4]:
!ls -lah arquivos/*

-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/001.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/002.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/003.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/004.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/005.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/006.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/007.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/008.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/009.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/010.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:10 arquivos/011.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:11 arquivos/012.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:11 arquivos/013.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:11 arquivos/014.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:11 arquivos/015.txt
-rw-r--r-- 1 hjort paranoa 57M fev 23 21:11 arquivos/016.txt
-rw-r--r

In [5]:
del amostras_particoes

# Criação da função de validação

## Função de validação de dígito verificador de CPF

> Retorna True caso o número de CPF seja válido, False caso contrário.

In [6]:
# Baseado no algoritmo em Linguagem C:
# https://github.com/EscovandoBits/cpf/blob/main/cpf.c

# verifica se um número de CPF é válido
def cpf_valido(n):
  #print('cpf_valido(%011d)' % n)

  # extrair dígitos verificadores
  dv = n % 100
  d10 = dv // 10
  d11 = dv % 10

  # calcular penúltimo dígito
  v1 = 0
  r = n // 100
  i = 9
  while True:
    d = r % 10
    r = r // 10
    v1 += i * d
    i = i - 1
    if not (r > 0 and i > 0):
      break
  v1 = (v1 % 11) % 10
  if (v1 != d10):
    return False

  # calcular último dígito
  v2 = 0
  r = n // 100
  i = 8
  while True:
    d = r % 10
    r = r // 10
    v2 += i * d
    i = i - 1
    if not (r > 0 and i > 0):
      break
  v2 += 9 * v1
  v2 = (v2 % 11) % 10
  if (v2 != d11):
    return False

  return True

In [7]:
# teste de execução da função
for num in [11111111111, 11111111112, 22222222222, 22222222221, 123]:
  print('cpf_valido(%011d)? %s' % (num, cpf_valido(num)))

cpf_valido(11111111111)? True
cpf_valido(11111111112)? False
cpf_valido(22222222222)? True
cpf_valido(22222222221)? False
cpf_valido(00000000123)? False


# Processamento parelizado

## Instalação do PySpark

In [8]:
#!pip install -q pyspark

## Inicialização da sessão Spark

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "10g") \
    .appName('my-cool-app') \
    .getOrCreate()
#spark = SparkSession.builder.getOrCreate()
spark

Referências:
- https://spark.apache.org/docs/latest/rdd-programming-guide.html
- https://stackoverflow.com/questions/32336915/pyspark-java-lang-outofmemoryerror-java-heap-space

## Criação dos dados

### Via vetor na memória

In [19]:
%%time

# criação do vetor de forma distribuída a partir da memória
rdd = spark.sparkContext.parallelize(amostras)
#rdd = spark.sparkContext.parallelize(amostras, 10)
rdd

CPU times: user 4min 34s, sys: 1.36 s, total: 4min 36s
Wall time: 4min 40s


ParallelCollectionRDD[9] at readRDDFromFile at PythonRDD.scala:274

In [20]:
rdd.getNumPartitions()

12

In [21]:
rdd.take(10)
#rdd.sample(withReplacement=False, fraction=0.1).take(10)

[29190929843,
 46298420295,
 27684640889,
 77738800342,
 44922131914,
 13498510183,
 90436599092,
 7395928407,
 51275693469,
 21478181249]

### Via arquivos de texto

In [10]:
# criação do vetor a partir dos arquivos de forma preguiçosa
rdd = spark.sparkContext.textFile("arquivos/*.txt").map(lambda x: int(x))
#rdd = spark.read.text("arquivos/*.txt").rdd
rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [11]:
rdd.getNumPartitions()

40

In [12]:
rdd.take(10)

[29190929843,
 46298420295,
 27684640889,
 77738800342,
 44922131914,
 13498510183,
 90436599092,
 7395928407,
 51275693469,
 21478181249]

## Filtragem com _filter_

In [22]:
rdd.filter(lambda n: n % 7 == 0).take(5)

[77738800342, 90436599092, 7395928407, 51275693469, 21478181249]

In [23]:
rdd.filter(lambda x: cpf_valido(x)).take(5)

[7395928407, 67890174292, 39542408234, 64972932643, 29879232283]

In [24]:
#del amostras

In [25]:
%%time
cpfs_validos = rdd.filter(lambda x: cpf_valido(x)).collect()

CPU times: user 265 ms, sys: 28.3 ms, total: 293 ms
Wall time: 3min 45s


## Resultado

In [17]:
# exibição do resultado
cpfs_validos[:10]

[7395928407,
 67890174292,
 39542408234,
 64972932643,
 29879232283,
 20298534452,
 40277561426,
 91162110228,
 15646246172,
 21763352609]

In [18]:
print("Do total de %d números da amostra, apenas %d são CPFs válidos (%.2f%%)." % (
    rdd.count(), len(cpfs_validos),
    len(cpfs_validos) / rdd.count() * 100))

Do total de 100000000 números da amostra, apenas 1001198 são CPFs válidos (1.00%).


## Finalização da sessão Spark

In [60]:
spark.stop()