## Criando Dados Fakes

In [1]:
from datetime import datetime, timedelta
from pyspark.sql.window import Window
from pyspark.sql import Row, functions as f, DataFrame as SparkDataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import random, os
# Spark connection
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

spark = SparkSession.builder \
    .appName("local") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "1")

sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

def generate_cpf_from_int(seed: int) -> str:
    rng = random.Random(seed)
    n = [rng.randint(0, 9) for _ in range(9)]
    s = sum([(10 - i) * n[i] for i in range(9)])
    d1 = 11 - s % 11
    if d1 >= 10:
        d1 = 0
    n.append(d1)
    s = sum([(11 - i) * n[i] for i in range(10)])
    d2 = 11 - s % 11
    if d2 >= 10:
        d2 = 0
    n.append(d2)
    return ''.join(map(str, n))

def generate_cnpj_from_int(seed: int) -> str:
    rng = random.Random(seed)
    n = [rng.randint(0, 9) for _ in range(12)]
    s = sum([n[i] * x for i, x in enumerate([5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2])])
    d1 = 11 - s % 11
    if d1 >= 10:
        d1 = 0
    n.append(d1)
    s = sum([n[i] * x for i, x in enumerate([6, 5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2])])
    d2 = 11 - s % 11
    if d2 >= 10:
        d2 = 0
    n.append(d2)
    return ''.join(map(str, n))

def make_personas_table(qtd_f: int, qtd_j: int) -> SparkDataFrame:

    rng = random.Random(42)

    banks = [1,2,3,4,5]

    data = []
    dict_uniques = {}

    # Pessoa Física (CPF)
    for i in range(qtd_f):
        cpf = generate_cpf_from_int(i)

        p = rng.random()
        if p < 0.5:
            num_accounts = 1
        elif p < 0.95:
            num_accounts = 2
        else:
            # Distribuição exponencial decrescente para 3 a 10
            num_accounts = min(10, int(rng.expovariate(0.8)) + 3)
        
        banks_for_person = []

        if rng.random() < 0.8:
            banks_for_person.append(1)
        
        while len(banks_for_person) < num_accounts:
            b = rng.choice(banks)
            if rng.random() < 0.05:
                # 5% de chance de ter mais de uma conta no mesmo banco
                banks_for_person.append(b)
            else:
                if b not in banks_for_person:
                    banks_for_person.append(b)

        for b in banks_for_person:
            
            while True:
                nr_office = str(rng.randint(1000, 4999))
                nr_account = str(rng.randint(100000, 499999))

                fl = dict_uniques.get((nr_office,nr_account,b), False)
                
                if fl:
                    continue
                else:
                    dict_uniques[(nr_office,nr_account,b)] = True
                    break

            data.append((
                cpf, 'F', str(b), nr_office, nr_account
            ))

    ### Pessoa Jurídica (CNPJ)
    for i in range(qtd_j):
        cnpj = generate_cnpj_from_int(i)

        num_accounts = rng.randint(1, 10)

        for _ in range(num_accounts):

            b = rng.choice(banks)

            while True:

                nr_office = str(rng.randint(1000, 4999))
                nr_account = str(rng.randint(100000, 499999))

                fl = dict_uniques.get((nr_office,nr_account,b), False)
                
                if fl:
                    continue
                else:
                    dict_uniques[(nr_office,nr_account,b)] = True
                    break

            data.append((
                cnpj, 'J', str(b), nr_office, nr_account
            ))
        
    schema = StructType([
        StructField("nr_cpf_cnpj", StringType(), False),
        StructField("cd_tipo_doc", StringType(), True),
        StructField("cd_bank", StringType(), True),
        StructField("nr_office", StringType(), True),
        StructField("nr_account", StringType(), True)
    ])

    return spark.createDataFrame(data, schema=schema)
   
def make_transactional_tables(personas_df, start_date: str, end_date: str):

    rng = random.Random(123)

    contas_list = [{
        "nr_cpf_cnpj": row["nr_cpf_cnpj"],
        "cd_tipo_doc": row["cd_tipo_doc"],
        "cd_bank": row["cd_bank"],
        "nr_office": row["nr_office"],
        "nr_account": row["nr_account"]
    } for row in personas_df.collect()]

    cpfs = [c for c in contas_list if c["cd_tipo_doc"] == 'F']
    cnpjs = [c for c in contas_list if c["cd_tipo_doc"] == 'J']

    data = []
    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt = datetime.strptime(end_date, "%Y-%m-%d")
    days_range = (end_dt - start_dt).days + 1

    for day in range(days_range):
        current_date = start_dt + timedelta(days=day)
        pct = rng.uniform(0.1, 0.7)
        num_participantes = int(pct * len(cpfs))
        participantes = rng.sample(cpfs, num_participantes)
        total_ted_doc = rng.randint(num_participantes // 4, num_participantes // 2)
        total_pix = total_ted_doc * 2
        transacoes = total_pix + total_ted_doc
        for _ in range(transacoes):
            sender = rng.choice(participantes)
            
            if rng.random() < 0.6 and len(cnpjs) > 0:
                receiver = rng.choice(cnpjs)
            else:
                receiver = rng.choice(cpfs)
            
            if receiver == sender:
                receiver = rng.choice(cpfs + cnpjs)
            
            if rng.random() < total_pix / (total_pix + total_ted_doc):
                tp_transacao = "PIX"
            else:
                tp_transacao = rng.choice(["TED", "DOC"])

            valor = round(min(rng.expovariate(1/300), 10000), 2)

            data.append((
                sender["nr_cpf_cnpj"],
                sender["nr_account"],
                sender["nr_office"],
                sender["cd_bank"],

                receiver["nr_cpf_cnpj"],
                receiver["nr_account"],
                receiver["nr_office"],
                receiver["cd_bank"],

                valor,
                current_date.strftime('%Y-%m-%d'),
                tp_transacao
            ))


    schema = StructType([
        StructField("nr_cpf_cnpj_sender", StringType(), True),
        StructField("nr_account_sender", StringType(), True),
        StructField("nr_office_sender", StringType(), True),
        StructField("cd_bank_sender", StringType(), True),

        StructField("nr_cpf_cnpj_receiver", StringType(), True),
        StructField("nr_account_receiver", StringType(), True),
        StructField("nr_office_receiver", StringType(), True),
        StructField("cd_bank_receiver", StringType(), True),

        StructField("vl_pgto", DoubleType(), True),
        StructField("dat_ref_carga", StringType(), True),
        StructField("tp_transacao", StringType(), True)
    ])

    df = spark.createDataFrame(data, schema=schema)

    df_pix = df.filter(f.col("tp_transacao") == "PIX").select(df.columns[:-1])
    df_ted = df.filter(f.col("tp_transacao") == "TED").select(df.columns[:-1])
    df_doc = df.filter(f.col("tp_transacao") == "DOC").select(df.columns[:-1])

    return df_pix, df_ted, df_doc

def make_campanha_table(personas_df, end_date: str):
    nr_cpf_list = personas_df.filter(f.col('cd_tipo_doc') == 'F').collect()

    data = []

    for row in nr_cpf_list:
        data.append((
            row['nr_cpf_cnpj'],
            0,
            0,
            end_date
        ))
    
    schema = StructType([
            StructField("nr_cpf_cnpj", StringType(), True),
            StructField("camp_disparo", IntegerType(), True),
            StructField("camp_conversao", IntegerType(), True),
            StructField("dat_ref_carga", StringType(), True)
        ])

    return spark.createDataFrame(data, schema=schema)

def make_inadimplencia_table(personas_df, transacoes_df):

    rng = random.Random(42)

    cnpjs_receb = transacoes_df.join(
        personas_df.select('nr_cpf_cnpj', 'cd_tipo_doc').withColumnRenamed("nr_cpf_cnpj", "nr_cpf_cnpj_receiver"),
        on=["nr_cpf_cnpj_receiver"],
        how="inner"
    ).filter(f.col("cd_tipo_doc") == "J")


    cnpj_agg = cnpjs_receb.select('nr_cpf_cnpj_receiver', 'vl_pgto').groupBy("nr_cpf_cnpj_receiver").agg(
        f.sum("vl_pgto").alias("vl_total_recebido")
    )

    window_cnpj = Window.orderBy(f.col("vl_total_recebido").desc())

    cnpj_ranked = cnpj_agg.withColumn(
        "rank", f.row_number().over(window_cnpj)
    )

    cnpj_cartola_df = cnpj_ranked.filter(
        (f.col("rank") >= 875) & (f.col("rank") < 900)
    ).select(
        f.col("nr_cpf_cnpj_receiver").alias("nr_cpf_cnpj"),
        f.lit(1).alias("fl_cartola")
    )

    trans_to_cartola = transacoes_df.join(
        cnpj_cartola_df.select(f.col('nr_cpf_cnpj').alias('nr_cpf_cnpj_receiver')),
        on=['nr_cpf_cnpj_receiver'],
        how="inner"
    )

    last_date_trans = trans_to_cartola.groupBy('nr_cpf_cnpj_sender').agg(
        f.max('dat_ref_carga').alias('data_ulti_trans_cartola')
    ).distinct()

    last_date_trans = last_date_trans.withColumnRenamed('nr_cpf_cnpj_sender', 'nr_cpf_cnpj')

    riscos_df = personas_df.filter(f.col('cd_tipo_doc') == 'F').select('nr_cpf_cnpj').distinct().join(
        last_date_trans,
        on = ['nr_cpf_cnpj'],
        how='left'
    )

    cpfs = personas_df.filter(f.col("cd_tipo_doc") == "F").select("nr_cpf_cnpj").collect()

    num_random_inad = max(1, int(len(cpfs) * 0.005))

    random_inads = rng.sample(cpfs, num_random_inad)

    dates = [row["dat_ref_carga"] for row in transacoes_df.select("dat_ref_carga").distinct().collect()]

    random_dates_dict = {row.nr_cpf_cnpj: rng.choice(dates) for row in random_inads}

    random_inads_df = spark.createDataFrame(
        [Row(nr_cpf_cnpj=k, data_ulti_trans_cartola_1=v) for k, v in random_dates_dict.items()]
    )

    riscos_df = riscos_df.join(
        random_inads_df, on="nr_cpf_cnpj", how="left"
    ).withColumn(
        "data_ulti_trans_cartola",
        f.coalesce(
            f.col("data_ulti_trans_cartola"), 
            f.col("data_ulti_trans_cartola_1")
        )
    ).drop("data_ulti_trans_cartola_1")

    riscos_df = riscos_df.filter(f.col("data_ulti_trans_cartola").isNotNull()).select(
        f.col('nr_cpf_cnpj'),
        f.col('data_ulti_trans_cartola').alias('dat_ref_carga'),
        f.lit(1).alias("flag_bloqueio")
    )

    return cnpj_cartola_df, riscos_df


# start_date = "2024-01-01"
# end_date = "2024-06-30"

# personas_path = "../data/02_intermediate/personas"
# if os.path.exists(personas_path):
#     df_personas = spark.read.parquet(personas_path)
# else:
#     df_personas = make_personas_table(qtd_f=50000, qtd_j=1000)
#     df_personas.write.mode("overwrite").parquet(personas_path)


# path_pix = "../data/01_raw/raw_pix"
# path_ted = "../data/01_raw/raw_ted"
# path_doc = "../data/01_raw/raw_doc"
# if os.path.exists(path_pix):
#     df_pix = spark.read.parquet(path_pix)
#     df_ted = spark.read.parquet(path_ted)
#     df_doc = spark.read.parquet(path_doc)
# else:
#     df_pix, df_ted, df_doc = make_transactional_tables(df_personas, start_date, end_date)
#     df_pix.write.mode("overwrite").parquet(path_pix)
#     df_ted.write.mode("overwrite").parquet(path_ted)
#     df_doc.write.mode("overwrite").parquet(path_doc)


# path_campanhas = "../data/02_intermediate/campanhas"

# if os.path.exists(path_campanhas):
#     df_campanhas = spark.read.parquet(path_campanhas)
# else:
#     df_campanhas = make_campanha_table(df_personas, end_date=end_date)
#     df_campanhas.write.mode("overwrite").parquet(path_campanhas)

# path_inadimplentes = "../data/02_intermediate/riscos"

# path_cartola = "../data/02_intermediate/cartola"
# path_riscos = "../data/02_intermediate/riscos"

# if os.path.exists(path_cartola):
#     df_cartola = spark.read.parquet(path_cartola)
#     df_riscos = spark.read.parquet(path_riscos)
# else:
#     df_cartola, df_riscos = make_inadimplencia_table(df_personas, df_pix.union(df_ted).union(df_doc))
#     df_cartola.write.mode("overwrite").parquet(path_cartola)
#     df_riscos.write.mode("overwrite").parquet(path_riscos)

25/06/17 15:10:55 WARN Utils: Your hostname, maccpcn.local resolves to a loopback address: 127.0.0.1; using 192.168.15.106 instead (on interface en0)
25/06/17 15:10:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/17 15:10:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/17 15:10:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Validação de balanceamento

In [4]:
master_table = spark.read.parquet("../data/04_model_input/master_table")
master_table.groupBy(f.col("flag_bloqueio")).count().show()

+-------------+-----+
|flag_bloqueio|count|
+-------------+-----+
|            1|24993|
|            0|25006|
+-------------+-----+



## Resultado Scoragem

In [None]:
master_table = spark.read.parquet("../data/05_score/score_table")
master_table.show()