In [14]:
def run_pandas():
    """Implementação completa com Pandas e medição de métricas."""

    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import LabelEncoder, StandardScaler
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.impute import KNNImputer
    import time # Para medir o tempo de execução
    import tracemalloc # Para medir a utilização de memória
    import os # Para verificar o tamanho dos arquivos


    # --- Tempo de Execução ---
    start_time = time.time()
    tracemalloc.start() # Inicia o rastreamento de memória

    #---------------------------------------------------------------------------------------------
    # Parte 1: Modelo de ML (ML_Pisa) completa os valores omissos do dataset pisa_2006-2018
    #---------------------------------------------------------------------------------------------

    pisa = pd.read_csv("pisa_2006-2018.csv")
    pisa.replace('NA', np.nan, inplace=True)
    pisa['Score'] = pd.to_numeric(pisa['Score'], errors='coerce')

    # Codificação
    le = LabelEncoder()
    pisa['Country_encoded'] = le.fit_transform(pisa['Country'])
    pisa['Subject_encoded'] = le.fit_transform(pisa['Subject'])

    # Modelo
    train = pisa.dropna(subset=['Score'])
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(train[['Year', 'Subject_encoded', 'Country_encoded']], train['Score'])

    # Previsão
    missing = pisa[pisa['Score'].isna()]
    if not missing.empty:
        pred = model.predict(missing[['Year', 'Subject_encoded', 'Country_encoded']])
        pisa.loc[pisa['Score'].isna(), 'Score'] = np.round(pred)

    pisa.to_csv("pisa_imputed_pandas.csv", index=False)


    #---------------------------------------------------------------------------------------------
    # Parte 2: Junção dos datasets
    #---------------------------------------------------------------------------------------------

    # Carregar os datasets
    pisa_imputed = pd.read_csv("pisa_imputed_pandas.csv")
    gdp = pd.read_csv("GDP.csv")

    # Corrigir possíveis espaços nos nomes das colunas
    gdp.columns = gdp.columns.str.strip()

    # Filtrar apenas os anos de interesse no dataset PISA
    pisa_imputed = pisa_imputed[pisa_imputed['Year'].isin([2012, 2015, 2018])]

    # Filtrar colunas relevantes do GDP
    gdp = gdp[['Country', 'Country Code', '2012', '2015', '2018']]

    # Obter lista de países comuns
    paises_comuns = set(pisa_imputed['Country']).intersection(set(gdp['Country']))

    # Filtrar os datasets para manter apenas países comuns
    gdp_filtrado = gdp[gdp['Country'].isin(paises_comuns)].copy()
    pisa_filtrado = pisa_imputed[pisa_imputed['Country'].isin(paises_comuns)].copy()

    # Transformar GDP de wide para long
    gdp_long = gdp_filtrado.melt(
        id_vars=['Country', 'Country Code'],
        value_vars=['2012', '2015', '2018'],
        var_name='Year',
        value_name='GDP'
    )

    # Garantir que o ano está como inteiro
    gdp_long['Year'] = gdp_long['Year'].astype(int)

    # Transformar PISA de long para wide (Subject → colunas)
    pisa_wide = pisa_filtrado[['Country', 'Year', 'Subject', 'Score']].pivot_table(
        index=['Country', 'Year'],
        columns='Subject',
        values='Score'
    ).reset_index()

    # Juntar os datasets
    dados_combinados = pd.merge(gdp_long, pisa_wide, on=['Country', 'Year'], how='left')

    # Reestruturar para wide por ano
    pisa_e_GDP = dados_combinados.pivot(
        index=['Country', 'Country Code'],
        columns='Year',
        values=['GDP', 'Maths', 'Science', 'Reading']
    )

    # Ajustar nomes das colunas para formato mais legível
    pisa_e_GDP.columns = [f"{var}_{year}" for var, year in pisa_e_GDP.columns]
    pisa_e_GDP = pisa_e_GDP.reset_index()
    pisa_e_GDP = pisa_e_GDP.rename(columns={
        'Country Code': 'Country_code'
    })

    pisa_e_GDP.to_csv("pisa_e_GDP.csv", index=False)


    #---------------------------------------------------------------------------------------------
    # Parte 3: Processamento do GDP_PISA (ML_GDP_Pisa)
    #---------------------------------------------------------------------------------------------

    dt = pd.read_csv("pisa_e_GDP.csv")

    # Tratamento de NAs
    print("\n[Pandas] Valores NaN antes:", dt.isna().sum().sum())

    # Salvar máscara de valores ausentes para usar depois
    missing_mask = dt.isna()

    colunas_numericas = dt.select_dtypes(include=[np.number]).columns
    scaler = StandardScaler()
    dt_scaled = dt.copy()
    dt_scaled[colunas_numericas] = scaler.fit_transform(dt[colunas_numericas])

    imputer = KNNImputer(n_neighbors=5)
    dt_imputed_scaled = dt_scaled.copy()
    dt_imputed_scaled[colunas_numericas] = imputer.fit_transform(dt_scaled[colunas_numericas])

    dt_imputed = dt.copy()
    dt_imputed[colunas_numericas] = scaler.inverse_transform(dt_imputed_scaled[colunas_numericas])

    # Formatação
    colunas_gdp = ["GDP_2012", "GDP_2015", "GDP_2018"]
    dt_imputed[colunas_gdp] = dt_imputed[colunas_gdp].round(5)

    # Correção: Aplicar arredondamento e conversão para int apenas nas células que eram NaN
    # Exclui colunas GDP que já foram tratadas separadamente
    colunas_para_arredondar = [col for col in colunas_numericas if col not in colunas_gdp]

    for col in colunas_para_arredondar:
        if missing_mask[col].any():
            dt_imputed.loc[missing_mask[col], col] = dt_imputed.loc[missing_mask[col], col].round(0).astype(int)

    print("[Pandas] Valores NaN após:", dt_imputed.isna().sum().sum())


    #---------------------------------------------------------------------------------------------
    # Parte 4: Adicionar as médias dos anos e das disciplinas
    #---------------------------------------------------------------------------------------------

    # Criar novas colunas com médias das disciplinas arredondadas a 3 casas decimais
    dt_imputed['Means_Maths'] = dt_imputed[['Maths_2012', 'Maths_2015', 'Maths_2018']].mean(axis=1, skipna=True).round(3)
    dt_imputed['Means_Science'] = dt_imputed[['Science_2012', 'Science_2015', 'Science_2018']].mean(axis=1, skipna=True).round(3)
    dt_imputed['Means_Reading'] = dt_imputed[['Reading_2012', 'Reading_2015', 'Reading_2018']].mean(axis=1, skipna=True).round(3)

    dt_imputed['Mean_2012'] = dt_imputed[[c for c in dt_imputed.columns if '2012' in c and 'GDP' not in c]].mean(axis=1).round(3)
    dt_imputed['Mean_2015'] = dt_imputed[[c for c in dt_imputed.columns if '2015' in c and 'GDP' not in c]].mean(axis=1).round(3)
    dt_imputed['Mean_2018'] = dt_imputed[[c for c in dt_imputed.columns if '2018' in c and 'GDP' not in c]].mean(axis=1).round(3)


    #---------------------------------------------------------------------------------------------
    # Parte 5: Adicionar variações relativas (%) entre anos
    #---------------------------------------------------------------------------------------------

    # Variação percentual do GDP
    dt_imputed['GDP_change_2012_2015'] = ((dt_imputed['GDP_2015'] - dt_imputed['GDP_2012']) / dt_imputed['GDP_2012'] * 100).round(2)
    dt_imputed['GDP_change_2015_2018'] = ((dt_imputed['GDP_2018'] - dt_imputed['GDP_2015']) / dt_imputed['GDP_2015'] * 100).round(2)

    # Variação percentual das médias PISA
    dt_imputed['PISA_change_2012_2015'] = ((dt_imputed['Mean_2015'] - dt_imputed['Mean_2012']) / dt_imputed['Mean_2012'] * 100).round(2)
    dt_imputed['PISA_change_2015_2018'] = ((dt_imputed['Mean_2018'] - dt_imputed['Mean_2015']) / dt_imputed['Mean_2015'] * 100).round(2)


    #---------------------------------------------------------------------------------------------
    # Parte 6: Rácio GDP / PISA Score por ano
    #---------------------------------------------------------------------------------------------

    dt_imputed['GDP_per_PISA_2012'] = (dt_imputed['GDP_2012'] / dt_imputed['Mean_2012']).round(4)
    dt_imputed['GDP_per_PISA_2015'] = (dt_imputed['GDP_2015'] / dt_imputed['Mean_2015']).round(4)
    dt_imputed['GDP_per_PISA_2018'] = (dt_imputed['GDP_2018'] / dt_imputed['Mean_2018']).round(4)


    #---------------------------------------------------------------------------------------------
    # Parte 7: Diferença absoluta das pontuações PISA e do GDP entre anos
    #---------------------------------------------------------------------------------------------

    # Diferença absoluta
    dt_imputed['GDP_diff_2012_2015'] = (dt_imputed['GDP_2015'] - dt_imputed['GDP_2012']).round(3)
    dt_imputed['GDP_diff_2015_2018'] = (dt_imputed['GDP_2018'] - dt_imputed['GDP_2015']).round(3)

    dt_imputed['PISA_diff_2012_2015'] = (dt_imputed['Mean_2015'] - dt_imputed['Mean_2012']).round(3)
    dt_imputed['PISA_diff_2015_2018'] = (dt_imputed['Mean_2018'] - dt_imputed['Mean_2015']).round(3)

    output_filename = "Dataset_final_pandas.xlsx"
    dt_imputed.to_excel(output_filename, index=False)

    # --- Medição do Tempo de Execução e Memória ---
    current, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop() # Para o rastreamento de memória
    end_time = time.time()
    execution_time = end_time - start_time

    # --- Resultados das Métricas ---
    print(f"\n--- Métricas de Desempenho ---")
    print(f"Tempo de Execução: {execution_time:.4f} segundos")
    print(f"Utilização de Memória Pico: {peak / (1024 * 1024):.2f} MB")
    print(f"Tamanho do arquivo de saída ('{output_filename}'): {os.path.getsize(output_filename) / (1024 * 1024):.2f} MB")

    return f"Resultados gravados em '{output_filename}'"


In [12]:
def run_pyspark():
    import time
    import numpy as np
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import DoubleType, IntegerType
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml.regression import RandomForestRegressor
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml import Pipeline
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.feature import Imputer, StandardScaler
    from pyspark.sql.window import Window
    import os # Para verificar o tamanho dos arquivos
    # import tracemalloc # <-- Remova esta linha, pois não é adequada para o Spark JVM memory

    # --- Tempo de Execução ---
    start_time = time.time()
    # tracemalloc.start() # <-- Remova esta linha

    # Iniciar sessão Spark com configuração apropriada
    # Ajuste 'spark.driver.memory' e 'spark.executor.memory' conforme necessário para o seu ambiente
    # Vamos capturar a configuração da memória do driver aqui:
    spark = SparkSession.builder \
        .appName("PISA_GDP_Analysis") \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()

    # Captura a memória do driver configurada
    spark_driver_memory = spark.conf.get("spark.driver.memory")


    #---------------------------------------------------------------------------------------------
    # Parte 1: Modelo de Machine Learning para Imputação de Dados PISA
    #---------------------------------------------------------------------------------------------

    # Carregar dataset PISA a partir de ficheiro CSV
    pisa = spark.read.csv("pisa_2006-2018.csv", header=True, inferSchema=True)

    # Converter coluna Score para tipo double e lidar com valores 'NA'
    pisa = pisa.withColumn("Score",
                          F.when(F.col("Score") == "NA", None)
                          .otherwise(F.col("Score").cast(DoubleType())))

    # Preparar codificação categórica para colunas Country e Subject
    countryIndexer = StringIndexer(inputCol="Country", outputCol="Country_encoded")
    subjectIndexer = StringIndexer(inputCol="Subject", outputCol="Subject_encoded")

    # Criar e aplicar pipeline de codificação
    indexPipeline = Pipeline(stages=[countryIndexer, subjectIndexer])
    indexModel = indexPipeline.fit(pisa)
    pisaIndexed = indexModel.transform(pisa)

    # Dividir dados em conjunto de treino (com scores) e conjunto para previsão (scores em falta)
    train = pisaIndexed.filter(pisaIndexed.Score.isNotNull())
    missing = pisaIndexed.filter(pisaIndexed.Score.isNull())

    # Preparar vetor de features para modelo de machine learning
    featureAssembler = VectorAssembler(
        inputCols=["Year", "Subject_encoded", "Country_encoded"],
        outputCol="features"
    )

    # Configurar e treinar modelo de regressão Random Forest
    rf = RandomForestRegressor(
        labelCol="Score",
        featuresCol="features",
        numTrees=100,
        seed=42
    )

    pipeline = Pipeline(stages=[featureAssembler, rf])
    model = pipeline.fit(train)

    # Gerar previsões para scores em falta, se necessário
    if missing.count() > 0:
        predictions = model.transform(missing)
        predictions = predictions.withColumn("Score", F.round(F.col("prediction")))

        # Combinar dados originais com previsões
        pisaImputed = train.select("Country", "Subject", "Year", "Score") \
            .union(predictions.select("Country", "Subject", "Year", "Score"))
    else:
        pisaImputed = train.select("Country", "Subject", "Year", "Score")

    # Guardar dataset imputado como CSV
    pisaImputed.toPandas().to_csv("pisa_imputed_pyspark.csv", index=False)

    #---------------------------------------------------------------------------------------------
    # Parte 2: Fusão dos Datasets PISA e PIB
    #---------------------------------------------------------------------------------------------

    # Carregar datasets processados
    pisa_imputed = spark.read.csv("pisa_imputed_pyspark.csv", header=True, inferSchema=True)
    gdp = spark.read.csv("GDP.csv", header=True, inferSchema=True)

    # Limpar nomes de colunas removendo espaços
    for col_name in gdp.columns:
        if ' ' in col_name:
            gdp = gdp.withColumnRenamed(col_name, col_name.strip())

    # Filtrar dados PISA para anos específicos de interesse
    pisa_imputed = pisa_imputed.filter(F.col("Year").isin([2012, 2015, 2018]))

    # Selecionar colunas relevantes dos dados PIB
    gdp = gdp.select("Country", "Country Code", "2012", "2015", "2018")

    # Identificar países comuns entre ambos os datasets
    paises_pisa = set([row["Country"] for row in pisa_imputed.select("Country").distinct().collect()])
    paises_gdp = set([row["Country"] for row in gdp.select("Country").distinct().collect()])
    paises_comuns = paises_pisa.intersection(paises_gdp)

    # Filtrar datasets para manter apenas países comuns
    gdp_filtrado = gdp.filter(F.col("Country").isin(list(paises_comuns)))
    pisa_filtrado = pisa_imputed.filter(F.col("Country").isin(list(paises_comuns)))

    # Reformatar dados PIB de formato largo para longo
    gdp_long = gdp_filtrado.select(
        "Country",
        F.col("Country Code").alias("Country_Code"),
        F.lit("2012").alias("Year"),
        F.col("2012").alias("GDP")
    ).union(
        gdp_filtrado.select(
            "Country",
            F.col("Country Code").alias("Country_Code"),
            F.lit("2015").alias("Year"),
            F.col("2015").alias("GDP")
        )
    ).union(
        gdp_filtrado.select(
            "Country",
            F.col("Country Code").alias("Country_Code"),
            F.lit("2018").alias("Year"),
            F.col("2018").alias("GDP")
        )
    )

    # Garantir que a coluna Year tem o tipo inteiro correto
    gdp_long = gdp_long.withColumn("Year", F.col("Year").cast(IntegerType()))

    # Reformatar dados PISA de formato longo para largo (disciplinas como colunas)
    pisa_pivot = pisa_filtrado.groupBy("Country", "Year").pivot("Subject").agg(F.first("Score"))

    # Combinar ambos os datasets em Country e Year
    dados_combinados = gdp_long.join(pisa_pivot, on=["Country", "Year"], how="left")

    # Reestruturar dados para formato largo por ano
    dados_wide = dados_combinados

    # Criar colunas específicas por ano para PIB e cada disciplina
    for year in [2012, 2015, 2018]:
        dados_wide = dados_wide.withColumn(
            f"GDP_{year}",
            F.when(F.col("Year") == year, F.col("GDP")).otherwise(None)
        )

        for subject in ["Maths", "Science", "Reading"]:
            dados_wide = dados_wide.withColumn(
                f"{subject}_{year}",
                F.when(F.col("Year") == year, F.col(subject)).otherwise(None)
            )

    # Agregar por país para consolidar todos os valores
    pisa_e_GDP = dados_wide.groupBy("Country", "Country_Code").agg(
        F.max("GDP_2012").alias("GDP_2012"),
        F.max("GDP_2015").alias("GDP_2015"),
        F.max("GDP_2018").alias("GDP_2018"),
        F.max("Maths_2012").alias("Maths_2012"),
        F.max("Maths_2015").alias("Maths_2015"),
        F.max("Maths_2018").alias("Maths_2018"),
        F.max("Science_2012").alias("Science_2012"),
        F.max("Science_2015").alias("Science_2015"),
        F.max("Science_2018").alias("Science_2018"),
        F.max("Reading_2012").alias("Reading_2012"),
        F.max("Reading_2015").alias("Reading_2015"),
        F.max("Reading_2018").alias("Reading_2018")
    )

    # Padronizar nomes de colunas
    pisa_e_GDP = pisa_e_GDP.withColumnRenamed("Country_Code", "Country_code")

    # Guardar dataset combinado
    pisa_e_GDP.toPandas().to_csv("pisa_e_GDP_pyspark.csv", index=False)

    #---------------------------------------------------------------------------------------------
    # Parte 3: Processamento e Imputação de Dados
    #---------------------------------------------------------------------------------------------

    # Carregar dataset combinado
    dt = spark.read.csv("pisa_e_GDP_pyspark.csv", header=True, inferSchema=True)

    # Contar valores em falta antes da imputação
    nan_count_before = sum([dt.filter(F.col(c).isNull()).count() for c in dt.columns])
    print(f"\n[PySpark] Valores NaN antes do processamento: {nan_count_before}")

    # Identificar colunas numéricas para processamento
    colunas_numericas = [field.name for field in dt.schema.fields
                          if isinstance(field.dataType, (DoubleType, IntegerType))]

    # Lidar com valores em falta usando imputação (substituindo o KNNImputer do Pandas)
    imputer = Imputer(
        inputCols=colunas_numericas,
        outputCols=[f"{c}" for c in colunas_numericas], # Sobrescreve as colunas originais
        strategy="mean"
    )

    imputer_model = imputer.fit(dt)
    dt_imputed = imputer_model.transform(dt)

    # Formatar colunas PIB para 5 casas decimais
    colunas_gdp = ["GDP_2012", "GDP_2015", "GDP_2018"]
    for col in colunas_gdp:
        dt_imputed = dt_imputed.withColumn(col, F.round(F.col(col), 5))

    # Aplicar arredondamento e conversão de tipo apenas a valores que seriam imputados (se fossem nulos originalmente)
    colunas_para_arredondar = [col for col in colunas_numericas if col not in colunas_gdp]

    for col in colunas_para_arredondar:
        dt_imputed = dt_imputed.withColumn(col, F.round(F.col(col), 0).cast(IntegerType()))

    # Contar valores em falta restantes após processamento
    nan_count_after = sum([dt_imputed.filter(F.col(c).isNull()).count() for c in dt_imputed.columns])
    print(f"[PySpark] Valores NaN após processamento: {nan_count_after}")

    #---------------------------------------------------------------------------------------------
    # Parte 4: Cálculo de Médias por Ano e Disciplina
    #---------------------------------------------------------------------------------------------

    # Calcular médias de disciplinas ao longo dos anos
    dt_imputed = dt_imputed.withColumn(
        "Means_Maths",
        F.round(F.array_avg(F.array(F.col("Maths_2012"), F.col("Maths_2015"), F.col("Maths_2018"))), 3)
    )

    dt_imputed = dt_imputed.withColumn(
        "Means_Science",
        F.round(F.array_avg(F.array(F.col("Science_2012"), F.col("Science_2015"), F.col("Science_2018"))), 3)
    )

    dt_imputed = dt_imputed.withColumn(
        "Means_Reading",
        F.round(F.array_avg(F.array(F.col("Reading_2012"), F.col("Reading_2015"), F.col("Reading_2018"))), 3)
    )

    # Calcular médias anuais por disciplina
    dt_imputed = dt_imputed.withColumn(
        "Mean_2012",
        F.round(F.array_avg(F.array(F.col("Maths_2012"), F.col("Science_2012"), F.col("Reading_2012"))), 3)
    )

    dt_imputed = dt_imputed.withColumn(
        "Mean_2015",
        F.round(F.array_avg(F.array(F.col("Maths_2015"), F.col("Science_2015"), F.col("Reading_2015"))), 3)
    )

    dt_imputed = dt_imputed.withColumn(
        "Mean_2018",
        F.round(F.array_avg(F.array(F.col("Maths_2018"), F.col("Science_2018"), F.col("Reading_2018"))), 3)
    )


    #---------------------------------------------------------------------------------------------
    # Parte 5: Cálculo de Variações Percentuais Entre Anos
    #---------------------------------------------------------------------------------------------

    # Calcular variações percentuais do PIB
    dt_imputed = dt_imputed.withColumn(
        'GDP_change_2012_2015',
        F.round(((F.col('GDP_2015') - F.col('GDP_2012')) / F.col('GDP_2012') * 100), 2)
    )
    dt_imputed = dt_imputed.withColumn(
        'GDP_change_2015_2018',
        F.round(((F.col('GDP_2018') - F.col('GDP_2015')) / F.col('GDP_2015') * 100), 2)
    )

    # Calcular variações percentuais dos scores PISA
    dt_imputed = dt_imputed.withColumn(
        'PISA_change_2012_2015',
        F.round(((F.col('Mean_2015') - F.col('Mean_2012')) / F.col('Mean_2012') * 100), 2)
    )
    dt_imputed = dt_imputed.withColumn(
        'PISA_change_2015_2018',
        F.round(((F.col('Mean_2018') - F.col('Mean_2015')) / F.col('Mean_2015') * 100), 2)
    )


    #---------------------------------------------------------------------------------------------
    # Parte 6: Cálculo de Rácios PIB por Score PISA
    #---------------------------------------------------------------------------------------------

    dt_imputed = dt_imputed.withColumn(
        'GDP_per_PISA_2012',
        F.round((F.col('GDP_2012') / F.col('Mean_2012')), 4)
    )
    dt_imputed = dt_imputed.withColumn(
        'GDP_per_PISA_2015',
        F.round((F.col('GDP_2015') / F.col('Mean_2015')), 4)
    )
    dt_imputed = dt_imputed.withColumn(
        'GDP_per_PISA_2018',
        F.round((F.col('GDP_2018') / F.col('Mean_2018')), 4)
    )


    #---------------------------------------------------------------------------------------------
    # Parte 7: Cálculo de Diferenças Absolutas Entre Anos
    #---------------------------------------------------------------------------------------------

    # Calcular diferenças absolutas do PIB
    dt_imputed = dt_imputed.withColumn(
        'GDP_diff_2012_2015',
        F.round((F.col('GDP_2015') - F.col('GDP_2012')), 3)
    )
    dt_imputed = dt_imputed.withColumn(
        'GDP_diff_2015_2018',
        F.round((F.col('GDP_2018') - F.col('GDP_2015')), 3)
    )

    # Calcular diferenças absolutas dos scores PISA
    dt_imputed = dt_imputed.withColumn(
        'PISA_diff_2012_2015',
        F.round((F.col('Mean_2015') - F.col('Mean_2012')), 3)
    )
    dt_imputed = dt_imputed.withColumn(
        'PISA_diff_2015_2018',
        F.round((F.col('Mean_2018') - F.col('Mean_2015')), 3)
    )

    output_filename = "Dataset_final_pyspark.xlsx"
    dt_imputed.toPandas().to_excel(output_filename, index=False)

    # Limpar terminando a sessão Spark
    spark.stop()

    # --- Medição do Tempo de Execução e Tamanho do Arquivo ---
    # current, peak = tracemalloc.get_traced_memory() # <-- Remova esta linha
    # tracemalloc.stop() # <-- Remova esta linha
    end_time = time.time()
    execution_time = end_time - start_time

    # --- Resultados das Métricas ---
    print(f"\n--- Métricas de Desempenho (PySpark) ---")
    print(f"Tempo de Execução: {execution_time:.4f} segundos")
    # Para a memória, reportamos a memória configurada do driver, que é a capacidade principal para este dataset pequeno.
    print(f"Memória do Driver Spark Configurada: {spark_driver_memory}") # <-- Adiciona esta linha
    print(f"Tamanho do arquivo de saída ('{output_filename}'): {os.path.getsize(output_filename) / (1024 * 1024):.2f} MB")

    return f"Resultados gravados em '{output_filename}'"



In [19]:
run_pandas()

run_pyspark()


[Pandas] Valores NaN antes: 70
[Pandas] Valores NaN após: 0

--- Métricas de Desempenho ---
Tempo de Execução: 1.3988 segundos
Utilização de Memória Pico: 1.75 MB
Tamanho do arquivo de saída ('Dataset_final_pandas.xlsx'): 0.02 MB

[PySpark] Valores NaN antes do processamento: 70
[PySpark] Valores NaN após processamento: 0

--- Métricas de Desempenho (PySpark) ---
Tempo de Execução: 5.8500 segundos
Memória do Driver Spark Configurada: 4g
Tamanho do arquivo de saída ("Dataset_final_pyspark.xlsx"): 0.02 MB
