#### Criando Catalogos, Schemas e Volumes Necessários para o Estudo

In [0]:
%sql


CREATE CATALOG IF NOT EXISTS costumers_registrations
COMMENT "Catalogo de cadastros e pedidos";

CREATE SCHEMA IF NOT EXISTS costumers_registrations.raw
COMMENT "Base de dados de clientes";

CREATE VOLUME IF NOT EXISTS costumers_registrations.raw.registrations
COMMENT "Volume de dados dos clientes cadastrados";

CREATE VOLUME IF NOT EXISTS costumers_registrations.raw.orders
COMMENT "Volume de dados dos pedidos dos clientes"



#### Importando Dependências

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import rand

from faker import Faker

import pandas as pd
import random
import uuid

#### Inferindo os Schemas
-----

Essa abordagem torna o script mais eficiênte, devido a não necessidade de inferir os schemas automáticamente

In [0]:

schema_cadastro = StructType([
    StructField('id', StringType(), True),
    StructField('nome', StringType(), True),
    StructField('data_nascimento', DateType(), True),
    StructField('cpf', StringType(),True),
    StructField('cep', StringType(), True),
    StructField('cidade', StringType(), True),
    StructField('estado', StringType(), True),
    StructField('pais', StringType(), True),
    StructField('genero', StringType(), True),
    StructField('telefone', StringType(), True),
    StructField('email', StringType(), True),
    StructField('data_cadastro', DateType(), True)
])

schema_pedido = StructType([
    StructField('id_pedido', StringType(), True),
    StructField('cpf', StringType(), True),
    StructField('valor_pedido', DoubleType(), True),
    StructField('valor_frete', DoubleType(), True),
    StructField('valor_desconto', DoubleType(), True),
    StructField('cupom', StringType(), True),
    StructField('endereco_entrega_logradouro', StringType(), True),
    StructField('endereco_entrega_numero', IntegerType(), True),
    StructField('endereco_entrega_bairro', StringType(), True),
    StructField('endereco_entrega_cidade', StringType(), True),
    StructField('endereco_entrega_estado', StringType(), True),
    StructField('endereco_entrega_pais', StringType(), True),
    StructField('status_pedido', StringType(), True),
    StructField('data_pedido', DateType(), True)
])

#### Função de Construção da Tabela Cadastross

In [0]:

"""
Método Utilizando Apenas Python Puro + Driver

Esse método é menos performático, pois o único momento em que estamos usando spark + cluster é na linha 

return spark.createDataFame(data, schema=schema_cadastro)

"""


# fake = Faker('pt_BR') # Configura Faker para gerar dados em português

# def gerar_dados_cadastro_driver(n_linhas:100000):
#     # Gera dados de um "dia" para a tabela de cadastros utilizando o Driver + Python Puro

#     data = []
#     for _ in range(n_linhas):
#         data.append({
#             'id': str(uuid.uuid4()), # Gera ID único
#             'nome': fake.name(),
#             'data_nascimento': fake.date_of_birth(minimum_age=18, maximum_age=90),
#             'cpf': fake.cpf(),
#             'cep': fake.postcode(),
#             'cidade': fake.city(),
#             'estado': fake.state(),
#             'pais': fake.current_country(),
#             'genero': fake.random_element(elements=['M', 'F']),
#             'telefone': fake.phone_number(),
#             'email': fake.email(),
#             'data_cadastro': fake.date_between(start_date='-2y', end_date='today')
#         })
#     return spark.createDataFrame(data, schema=schema_cadastro)

In [0]:
"""
Método Utilizando Spark + Cluster

Esse método é mais performático, pois estamos usando spark + cluster para gerar os dados
"""

def gerar_dados_cadastro_cluster(n_linhas: int = 100000):
    # Gera dados de um "dia" para a tabela de cadastros utilizando o Cluster + Spark

    def gerar_particao(iterator):
        fake = Faker('pt_BR') # Instância por executor, cria dados em português

        for pdf in iterator:
            tamanho = len(pdf)

            data = {
                'id': [str(uuid.uuid4()) for _ in range(tamanho)],
                'nome': [fake.name() for _ in range(tamanho)],
                'data_nascimento': [
                    fake.date_of_birth(minimum_age=18, maximum_age=90)
                    for _ in range(tamanho)
                ],
                'cpf': [fake.cpf() for _ in range(tamanho)],
                'cep': [fake.postcode() for _ in range(tamanho)],
                'cidade': [fake.city() for _ in range(tamanho)],
                'estado': [fake.state() for _ in range(tamanho)],
                'pais': [fake.current_country() for _ in range(tamanho)],
                'genero': [
                    fake.random_element(elements=('M', 'F'))
                    for _ in range(tamanho)
                    ],
                'telefone': [fake.phone_number() for _ in range(tamanho)],
                'email': [fake.email() for _ in range(tamanho)],
                'data_cadastro': [
                    fake.date_between(start_date='-2y', end_date='today')
                    for _ in range(tamanho)
                ]
            }

            yield pd.DataFrame(data)

    return (
        spark
        .range(n_linhas)
        .mapInPandas(gerar_particao, schema=schema_cadastro)
    )

In [0]:
def gerar_pedidos_cluster(DataFrame_cadastros, n_pedidos: int = 500000):
    # Gera dados de um "dia" para a tabela de pedidos utilizando o Cluster + Spark

    cpf_cadastros = DataFrame_cadastros.select('cpf').distinct()

    # Criando base distribuída de pedidos
    pedidos_base = (
        spark.range(n_pedidos)
        .withColumn('rand_key', rand())
    )

    # Criando chave aleatória também nos CPFs
    cpfs_random = cpf_cadastros.withColumn('rand_key', rand())

    # Join aleatório
    pedidos_com_cpf = (
        pedidos_base
        .join(cpfs_random, on='rand_key', how='left')
        .drop('rand_key')
    )

    def gerar_particao(iterador):
        fake = Faker('pt_BR') # Instância por executor, cria dados em português

        for pdf in iterador:
            tamanho = len(pdf)

            valor_pedido = [round(random.uniform(50, 1000), 2) for _ in range(tamanho)]
            valor_frete = [round(random.uniform(5, 100), 2) for _ in range(tamanho)]
            valor_desconto = [
                random.choice([0, round(random.uniform(5, 100), 2)])
                for _ in range(tamanho)
            ]

            status_list = random.choices(
                ['faturado', 'aguardando pagamento', 'cancelado'],
                weights=[80, 15, 5],
                k=tamanho
            )

            data = {
                'id_pedido': [str(uuid.uuid4()) for _ in range(tamanho)],
                'cpf': pdf['cpf'],
                'valor_pedido': valor_pedido,
                'valor_frete': valor_frete,
                'valor_desconto': valor_desconto,
                'cupom': [
                    fake.word() if valor_desconto[i] > 0 else None
                    for i in range(tamanho)
                ],
                'endereco_entrega_logradouro': [fake.street_name() for _ in range(tamanho)],
                'endereco_entrega_numero': [fake.building_number() for _ in range(tamanho)],
                'endereco_entrega_bairro': [fake.neighborhood() for _ in range(tamanho)],
                'endereco_entrega_cidade': [fake.city() for _ in range(tamanho)],
                'endereco_entrega_estado': [fake.state() for _ in range(tamanho)],
                'endereco_entrega_pais': [fake.current_country() for _ in range(tamanho)],
                'status_pedido': status_list,
                'data_pedido': [fake.date_between(start_date='-2y', end_date='today')
                                for _ in range(tamanho)]
            }

            yield pd.DataFrame(data)

    return (
        pedidos_com_cpf.mapInPandas(gerar_particao, schema=schema_pedido)
    )
    





In [0]:
# dbutils.fs.rm('/Volumes/dataset_cadastros/clientes/cadastros', recurse=True)
# dbutils.fs.rm('/Volumes/dataset_cadastros/clientes/pedidos', recurse=True)

df_cad = gerar_dados_cadastro_cluster(1_000_000)
df_ped = gerar_pedidos_cluster(df_cad, 5_000_000)

df_cad.write.mode("overwrite").parquet("/Volumes/costumers_registrations/raw/registrations")
df_ped.write.mode("overwrite").parquet("/Volumes/costumers_registrations/raw/orders")
