In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
import os
import random
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def generate_large_csv(filename="vendas.csv", num_rows=1_000_000):
    """Gera um arquivo CSV grande com dados de vendas simulados."""
    products = ["Smartphone", "Notebook", "Televisao", "Fone de ouvido", "Smartwatch", "Tablet", "Monitor", "Webcam"]
    start_date = datetime(2023, 1, 1)

    logger.info(f"Gerando arquivo CSV grande: {filename} com {num_rows} linhas")
    with open(filename, 'w') as f:
        f.write("id_venda,produto,quantidade,preco_unitario,data_venda\n")
        for i in range(1, num_rows + 1):
            product = random.choice(products)
            quantity = random.randint(1, 50)
            price = round(random.uniform(50.00, 5000.00), 2)

            random_days = random.randint(0, 365 * 2)
            sale_date = (start_date + timedelta(days=random_days)).strftime("%Y-%m-%d")

            f.write(f"{i},{product},{quantity},{price},{sale_date}\n")
    logger.info("Geração do CSV concluída!")

generate_large_csv()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("ETLFunctions") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

logger.info("SparkSession inicializada com sucesso!")

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType


def extract(spark_session: SparkSession, input_path: str):
    logger.info(f"\n--- Iniciando Extração de dados de: {input_path} ---")
    try:
        schema = StructType([
            StructField("id_venda", IntegerType(), True),
            StructField("produto", StringType(), True),
            StructField("quantidade", IntegerType(), True),
            StructField("preco_unitario", DoubleType(), True),
            StructField("data_venda", DateType(), True)
        ])
        df = spark_session.read \
            .option("header", "true") \
            .schema(schema) \
            .csv(input_path)

        logger.info(f"Dados extraídos. Número de linhas: {df.count()}")
        logger.info("Schema dos dados extraídos:")
        df.printSchema()
        logger.info("Primeiras 5 linhas dos dados extraídos:")
        df.show(5)
        return df
    except Exception as e:
        logger.info(f"Erro na fase de Extração: {e}")
        return None

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

def transform(df_input):
    logger.info("\n--- Iniciando Transformação de dados ---")
    if df_input is None:
        logger.error("DataFrame de entrada é nulo")
        raise Exception("DataFrame de entrada é nulo")

    df_transformed = df_input.withColumn(
        "valor_total_venda",
        col("quantidade") * col("preco_unitario")
    )

    logger.info("Coluna 'valor_total_venda' adicionada.")
    logger.info("Schema do DataFrame transformado:")
    df_transformed.printSchema()
    logger.info("Primeiras 5 linhas do DataFrame transformado:")
    df_transformed.show(5)

    df_sales_by_product = df_transformed.groupBy("produto") \
        .agg(F.sum("valor_total_venda").alias("total_vendas_produto"))

    logger.info("\nVendas totais por produto (amostra):")
    df_sales_by_product.show(5, truncate=False)

    return df_transformed, df_sales_by_product


In [None]:
import shutil

def load(df_to_load, output_path: str):
    logger.info(f"\n--- Iniciando Carga de dados para: {output_path} ---")

    if df_to_load is None:
        logger.error("DataFrame principal para carregar é nulo, pulando carga")
        raise Exception("DataFrame principal para carregar é nulo, pulando carga")

    if os.path.exists(output_path):
        shutil.rmtree(output_path)
        logger.info(f"Diretório '{output_path}' removido.")

    try:
        df_to_load.write \
            .mode("overwrite") \
            .parquet(output_path)
        logger.info(f"Dados salvos com sucesso em '{output_path}'")

        logger.info(f"\nVerificando algumas linhas dos dados salvos em '{output_path}':")
        df_verificacao = spark.read.parquet(output_path)
        df_verificacao.show(5)
        logger.info(f"Número total de linhas no Parquet salvo: {df_verificacao.count()}")

    except Exception as e:
        logger.error(f"Erro na fase de Carga do DataFrame principal: {e}")
        raise Exception(f"Erro na fase de Carga do DataFrame principal: {e}")

In [None]:
input_csv_path = "vendas.csv"
output_main_parquet_path = "vendas_processadas_principais.parquet"
output_agg_parquet_path = "vendas_agregadas_por_produto.parquet"

df_raw = extract(spark, input_csv_path)

if df_raw is not None:
    df_processed, df_aggregated = transform(df_raw)

    if df_processed is not None:
        load(df_processed, output_main_parquet_path)
        load(df_aggregated, output_agg_parquet_path)
    else:
        logger.info("Transformação não gerou DataFrame, pulando carga.")
else:
    logger.info("Extração não gerou DataFrame, pulando transformação e carga.")

spark.stop()
logger.info("\nSparkSession encerrada.")