In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, rand
from pyspark.sql.functions import round as ps_round
from pyspark.sql.types import StructField, StringType, StructType

In [None]:
conf = SparkConf().setAppName("App").setMaster("local[*]")

# Habilitar otimizações e configurações adicionais
conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
conf.set("spark.sql.repl.eagerEval.enabled", "true")
conf.set("spark.sql.repl.eagerEval.truncate", 100)
conf.set("spark.sql.execution.arrow.pyspark.ignore_timezone", "true")

# AWS S3 CONNECTION
AWS_ENDPOINT_URL = ""
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
AWS_REGION = "bhs"

conf.set("spark.jars", "/home/shared/drivers/postgresql-42.7.2.jar")
conf.set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
conf.set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
conf.set("spark.hadoop.fs.s3a.endpoint", AWS_ENDPOINT_URL)
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.2.2")
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")

# Configurações de tempo e legacy
conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
conf.set("spark.sql.parquet.datetimeRebaseModeInWrite","LEGACY")

# Configurações de memória
conf.set("spark.driver.memory", "60g")
conf.set("spark.executor.memory", "60g")
conf.set("spark.executor.pyspark.memory", "60g")
conf.set("spark.memory.offHeap.enabled", "true")
conf.set("spark.memory.offHeap.size", "60g")

# Inicializa o SparkSession com a configuração
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print("Spark session configurada com sucesso!")

In [None]:
DB_HOST = "driva-db.driva.io"
DB_PORT = 5432
DB_NAME = "postgres"
DB_SITES_SCHEMA = "sites.vinculados"
DB_EB_DRIVA = "empresas_do_brasil.simplificado"
DB_USER = ""
DB_PASSWORD = ""

sites_df = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}") \
    .option("dbtable", DB_SITES_SCHEMA) \
    .option("user", DB_USER) \
    .option("password", DB_PASSWORD) \
    .option("driver", "org.postgresql.Driver") \
    .load()

eb_df = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}") \
    .option("dbtable", DB_EB_DRIVA) \
    .option("user", DB_USER) \
    .option("password", DB_PASSWORD) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
sites_df.printSchema()

In [None]:
eb_df.printSchema()

In [None]:
sites_df = sites_df.filter(col("score_vinculo") >= 0.7)
sites_df.count()

In [None]:
eb_df = eb_df.filter(
    (col("cnae_principal_divisao") == 85) | # Educação
    (col("cnae_principal_divisao") == 86) | # Saúde
    (col("cnae_principal_classe") == 62023) | # Software / Saas
    (col("cnae_principal_classe") == 62031) # Software / Saas
)
eb_df.count()

In [None]:
merged_df = sites_df.join(eb_df, on="raiz_cnpj", how="inner")
merged_df = merged_df.select(
    "dominio",
    "raiz_cnpj",
    "cnpj",
    "cnae_principal_divisao",
    "cnae_principal_secao",
    "cnae_principal_grupo",
    "cnae_principal_classe",
    "cnae_principal_subclasse",
    "segmento",
)
merged_df.count()

In [None]:
cnae_divisions = [85, 86, 62]
num_samples = 10000
seed = 42

new_df = spark.createDataFrame(
    spark.sparkContext.emptyRDD(),
    schema=merged_df.schema,
)

# field = [
#     StructField("raiz_cnpj", StringType(), True),
#     StructField("raiz_cnpj", StringType(), True),
#     StructField("Segmento iugu", StringType(), True),
# ]
# schema = StructType(field)

# new_segment_column = spark.createDataFrame(
#     spark.sparkContext.emptyRDD(),
#     schema=schema,
# )

for cnae in cnae_divisions:
    if cnae in [85, 86]:
        temp = merged_df.filter(col("cnae_principal_divisao") == cnae)
        temp = spark.createDataFrame(
            temp.rdd.takeSample(
                withReplacement=False,
                num=num_samples,
                seed=seed,
            ),
            schema=merged_df.schema,
        )

        # if cnae == 85:
        #     temp_segment = temp.withColumn("Segmento iugu", lit("Educação")).select("raiz_cnpj", "cnpj", "Segmento iugu")
        # elif cnae == 86:
        #     temp_segment = temp.withColumn("Segmento iugu", lit("Saúde")).select("raiz_cnpj", "cnpj", "Segmento iugu")

    else:
        temp = merged_df.filter(
            (col("cnae_principal_classe") == 62023) |
            (col("cnae_principal_classe") == 62031)
        )
        
        temp = spark.createDataFrame(
            temp.rdd.takeSample(
                withReplacement=False,
                num=num_samples,
                seed=seed,
            ),
            schema=merged_df.schema,
        )
        # temp_segment = temp.withColumn("Segmento iugu", lit("Saas")).select("raiz_cnpj", "cnpj", "Segmento iugu")
        
    new_df = new_df.union(temp)
    # new_segment_column = new_segment_column.union(temp_segment)

# new_df = new_df.join(new_segment_column, on="raiz_cnpj", how="inner")
new_df.printSchema()

In [None]:
new_df.count()

In [None]:
# new_df.write.save("s3a://drivalake/raw/sites/iugu/iugu_with_saas.parquet")

In [None]:
spark.stop()

In [None]:
import pandas as pd

df = pd.read_parquet("../data/new_iugu_saas2_with_html.parquet", engine="pyarrow")
df.head()

In [None]:
label = pd.read_parquet("../data/iugu_with_saas2.parquet", engine="pyarrow")
label = label.rename(columns={"dominio": "host"})
label

In [None]:
df = df.merge(label, how="inner", on="host")
df = df[
    [
        "url",
        "host",
        "html",
        "raiz_cnpj",
        "cnpj",
        "cnae_principal_divisao",
        "cnae_principal_secao",
        "cnae_principal_grupo",
        "cnae_principal_classe",
        "cnae_principal_subclasse",
        "segmento",
    ]
]
df.head()

In [None]:
mapping = {
    85: "Educação",
    86: "Saúde",
    62: "Saas"
}

df["Segmento iugu"] = df["cnae_principal_divisao"].apply(lambda x: mapping[x])
df

In [None]:
df["Segmento iugu"].value_counts()

In [None]:
df.to_parquet("../data/new_iugu_saas2_with_html.parquet", engine="pyarrow")