In [31]:
import ast
import traceback
import pandas as pd
from pyspark.sql import SparkSession


class ParquetPathRewriter(ast.NodeTransformer):
    def __init__(self, var_name):
        self.var_name = var_name

    def visit_Assign(self, node):
        if (
            isinstance(node.targets[0], ast.Name)
            and node.targets[0].id == self.var_name
            and isinstance(node.value, ast.Call)
            and isinstance(node.value.func, ast.Attribute)
            and node.value.func.attr in {"parquet", "table"}
        ):
            node.value.args = [ast.Constant(f"{self.var_name}.parquet")]
        return node


def rewrite_code_with_ast(code_str: str, var_name: str) -> object:
    tree = ast.parse(code_str)
    rewriter = ParquetPathRewriter(var_name)
    new_tree = rewriter.visit(tree)
    ast.fix_missing_locations(new_tree)
    return compile(new_tree, filename="<ast>", mode="exec")


def create_source_from_synthetic_data(spark, name, fmt, synthetic_data, exec_context):
    if name not in synthetic_data:
        raise ValueError(f"O DataFrame synthetic_data['{name}'] não foi fornecido.")

    pdf = synthetic_data[name]

    match fmt.upper():
        case "PARQUET":
            pdf.to_parquet(f"{name}.parquet", index=False)
        case "TABLE":
            df = spark.createDataFrame(pdf)
            df.write.mode("overwrite").saveAsTable(name)
        case "VIEW":
            df = spark.createDataFrame(pdf)
            df.createOrReplaceTempView(name)
        case "DATAFRAME":
            exec_context[name] = spark.createDataFrame(pdf)
        case _:
            raise ValueError(f"Formato de entrada '{fmt}' não suportado.")


def normalize_dataframe_types(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        try:
            df[col] = pd.to_numeric(df[col], errors='ignore')
        except Exception:
            pass
    return df


def compare_dataframes(df1: pd.DataFrame, df2: pd.DataFrame) -> list:
    logs = []
    try:
        if df1.shape != df2.shape:
            logs.append(f"[FAIL] Tamanho diferente: esperado {df2.shape}, obtido {df1.shape}")
        else:
            logs.append(f"[OK] Tamanho: {df1.shape[0]} linhas, {df1.shape[1]} colunas")

        cols1 = list(df1.columns)
        cols2 = list(df2.columns)
        if cols1 != cols2:
            logs.append(f"[FAIL] Colunas diferentes: esperado {cols2}, obtido {cols1}")
        else:
            logs.append(f"[OK] Colunas: {cols1}")

        for col in df2.columns:
            expected_type = df2[col].dtype
            result_type = df1[col].dtype if col in df1.columns else None
            if expected_type != result_type:
                logs.append(f"[FAIL] Tipo da coluna '{col}' difere: esperado {expected_type}, obtido {result_type}")
            else:
                logs.append(f"[OK] Tipo da coluna '{col}': {expected_type}")

        if not df1.equals(df2):
            diff = (df1 != df2).sum().sum()
            logs.append(f"[FAIL] Valores diferentes: {diff} célula(s) divergentes")
        else:
            logs.append(f"[OK] Os valores das células são idênticos")

    except Exception as e:
        logs.append(f"[ERROR] Erro ao comparar DataFrames: {e}")
    return logs


def validate_syntax(code: str) -> str:
    try:
        ast.parse(code)
        return "[OK] Código PySpark válido (sintaxe AST verificada)."
    except SyntaxError as e:
        return f"[ERROR] Código PySpark inválido: {e}"


def execute_pyspark_block(payload: dict, synthetic_data: dict, expected_result: pd.DataFrame = None):
    spark = SparkSession.builder.appName("Executor").getOrCreate()
    exec_context = {"spark": spark}
    logs = []
    block_id = payload.get("block_id", "unknown")
    input_data = payload.get("input_data", [])
    output_data = payload.get("output_data", [])
    metadata_code = payload.get("metadata", {}).get("pyspark", {}).get("code", "")

    try:
        if not input_data:
            raise ValueError("Payload precisa conter ao menos um 'input_data'.")
        if not output_data:
            raise ValueError("Payload precisa conter ao menos um 'output_data'.")

        for entry in input_data:
            info = entry.get("pyspark_data", {})
            name = info.get("name")
            fmt = info.get("format", "PARQUET")
            create_source_from_synthetic_data(spark, name, fmt, synthetic_data, exec_context)
            logs.append(f"[OK] Fonte '{name}' criada com formato '{fmt}'.")

        for entry in input_data:
            info = entry["pyspark_data"]
            name = info["name"]
            code = entry.get("pyspark_code", f"{name} = spark.read.parquet('{name}.parquet')")
            logs.append(validate_syntax(code))
            compiled = rewrite_code_with_ast(code, name)
            exec(compiled, exec_context)
            logs.append(f"[OK] Código executado para '{name}'.")

        if metadata_code:
            logs.append(validate_syntax(metadata_code))
            exec(metadata_code, exec_context)
            logs.append(f"[OK] Código do metadata.pyspark.code executado.")

        for entry in output_data:
            info = entry["pyspark_data"]
            name = info["name"]
            fmt = info.get("format", "VIEW").upper()

            if name not in exec_context:
                raise ValueError(f"DataFrame '{name}' não encontrado após execução.")

            df = exec_context[name]

            match fmt:
                case "VIEW":
                    df.createOrReplaceTempView(name)
                    logs.append(f"[OK] DataFrame '{name}' registrado como VIEW.")
                case "TABLE":
                    df.write.mode("overwrite").saveAsTable(name)
                    logs.append(f"[OK] DataFrame '{name}' salvo como TABELA.")
                case "DATAFRAME":
                    logs.append(f"[OK] DataFrame '{name}' será retornado como pandas.DataFrame.")
                case "PARQUET":
                    df.toPandas().to_parquet(f"{name}_output.parquet", index=False)
                    logs.append(f"[OK] DataFrame '{name}' salvo como arquivo Parquet.")
                case _:
                    raise ValueError(f"Formato de saída '{fmt}' não é suportado.")

            try:
                info["result"] = df.toPandas().to_dict(orient="records")
                logs.append(f"[OK] Resultado de '{name}' convertido para pandas.DataFrame.")
            except Exception as e:
                logs.append(f"[ERROR] Conversão para pandas falhou para '{name}': {e}")
                info["result"] = f"Erro: {e}"

        metadata_result = None
        if expected_result is not None:
            for entry in output_data:
                output_name = entry["pyspark_data"]["name"]
                if output_name in exec_context:
                    result_df = exec_context[output_name]
                    if hasattr(result_df, "toPandas"):
                        logs.append(f"[OK] Resultado final capturado do contexto como '{output_name}'.")
                        pandas_result = normalize_dataframe_types(result_df.toPandas())
                        expected_result = normalize_dataframe_types(expected_result)
                        comparison_logs = compare_dataframes(pandas_result, expected_result)
                        logs.extend(comparison_logs)
                        metadata_result = pandas_result.to_dict(orient="records")
                        break
            if metadata_result is None:
                logs.append("[FAIL] Nenhum resultado correspondente ao output_data encontrado no contexto para comparar.")

    except Exception as e:
        logs.append(f"[ERROR] Falha no bloco '{block_id}': {e}")
        logs.append(traceback.format_exc())
        metadata_result = None

    return {
        "block_id": block_id,
        "logs": logs,
        "output_data": output_data,
        "metadata_result": metadata_result
    }


def execute_multiple_blocks(payloads: list, synthetic_data_map: dict, expected_results: dict):
    all_results = []
    for payload in payloads:
        block_id = payload.get("block_id")
        synthetic_data = synthetic_data_map.get(block_id, {})
        expected = expected_results.get(block_id)
        result = execute_pyspark_block(payload, synthetic_data, expected)
        all_results.append(result)
    return all_results


if __name__ == "__main__":
    payloads = [
        {
            "block_id": "1",
            "metadata": {
                "pyspark": {
                    "code": "df_saida = spark.sql('SELECT * FROM input_name WHERE CAST(VLVENINC AS DOUBLE) > 200000')"
                }
            },
            "input_data": [
                {
                    "pyspark_data": {
                        "name": "input_name",
                        "format": "VIEW"
                    }
                }
            ],
            "output_data": [
                {
                    "pyspark_data": {
                        "name": "df_saida",
                        "format": "VIEW"
                    }
                }
            ]
        }
    ]

    synthetic_data_map = {
        "1": {
            "input_name": pd.DataFrame([
                ["12345", "100000.50"],
                ["67890", "250000.00"],
                ["11123", "300000.75"],
                ["44455", "150000.00"],
                ["78901", "500000.25"]
            ], columns=["CONTRATO", "VLVENINC"])
        }
    }

    expected_results = {
        "1": pd.DataFrame([
            ["67890", 250000.00],
            ["11123", 300000.75],
            ["78901", 500000.25]
        ], columns=["CONTRATO", "VLVENINC"])
    }
    expected_results["1"]["CONTRATO"] = expected_results["1"]["CONTRATO"].astype(str)
    expected_results["1"]["VLVENINC"] = expected_results["1"]["VLVENINC"].astype(float)

    results = execute_multiple_blocks(payloads, synthetic_data_map, expected_results)

    for result in results:
        print(f"\n🔹 Resultados do bloco {result['block_id']}")
        for log in result["logs"]:
            print(log)
        print("\n📊 Resultado final:")
        print(result["metadata_result"])



🔹 Resultados do bloco 1
[OK] Fonte 'input_name' criada com formato 'VIEW'.
[OK] Código PySpark válido (sintaxe AST verificada).
[OK] Código executado para 'input_name'.
[OK] Código PySpark válido (sintaxe AST verificada).
[OK] Código do metadata.pyspark.code executado.
[OK] DataFrame 'df_saida' registrado como VIEW.
[OK] Resultado de 'df_saida' convertido para pandas.DataFrame.
[OK] Resultado final capturado do contexto como 'df_saida'.
[OK] Tamanho: 3 linhas, 2 colunas
[OK] Colunas: ['CONTRATO', 'VLVENINC']
[OK] Tipo da coluna 'CONTRATO': int64
[OK] Tipo da coluna 'VLVENINC': float64
[OK] Os valores das células são idênticos

📊 Resultado final:
[{'CONTRATO': 67890, 'VLVENINC': 250000.0}, {'CONTRATO': 11123, 'VLVENINC': 300000.75}, {'CONTRATO': 78901, 'VLVENINC': 500000.25}]


  df[col] = pd.to_numeric(df[col], errors='ignore')
