In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
import shutil
import glob

# Inicialize a sessão do Spark
spark = SparkSession.builder.appName("PlanilhaAprimorada").getOrCreate()

# Caminho do arquivo original
input_path = '/content/saida_organizada.csv'

# Carregue o arquivo CSV, pulando as 3 primeiras linhas que contêm cabeçalhos manuais
original_data = pd.read_csv(input_path, sep=";", skiprows=3, header=None)

# Verifique o número de colunas para definir o esquema
num_columns = len(original_data.columns)
schema = StructType([StructField(f"col_{i}", StringType(), True) for i in range(num_columns)])

# Convertendo o dataframe do Pandas para PySpark
df_spark = spark.createDataFrame(original_data.values.tolist(), schema=schema)

# Padronizar e renomear colunas relevantes
colunas_renomeadas = ["col_0", "col_1", "col_2", "col_3", "col_4", "col_5", "col_6", "col_7"]
renomeacoes = {
    "col_0": "Data Base",
    "col_1": "Documento",
    "col_2": "CNPJ",
    "col_3": "Agência",
    "col_4": "Nome da Instituição",
    "col_5": "Banco",
    "col_6": "Conta",
    "col_7": "Saldo"
}

for coluna_antiga, coluna_nova in renomeacoes.items():
    if coluna_antiga in df_spark.columns:
        df_spark = df_spark.withColumnRenamed(coluna_antiga, coluna_nova)

# Aplicar as melhorias de formatação para as colunas específicas
from pyspark.sql.functions import col, when, format_number, lpad

# Padronizar CNPJ e Agência com zeros à esquerda
df_spark = df_spark.withColumn("CNPJ", when(col("CNPJ") == "", "Não Informado").otherwise(lpad(col("CNPJ"), 8, '0')))
df_spark = df_spark.withColumn("Agência", when(col("Agência") == "", "Não Informado").otherwise(lpad(col("Agência"), 4, '0')))

# Formatando o campo Saldo
df_spark = df_spark.withColumn("Saldo", when(col("Saldo") == "", "Não Informado").otherwise(format_number(col("Saldo").cast("double"), 2)))

# Preenchendo colunas vazias com "Não Informado"
for coluna in ["Documento", "Nome da Instituição", "Banco", "Conta"]:
    if coluna in df_spark.columns:
        df_spark = df_spark.withColumn(coluna, when(col(coluna) == "", "Não Informado").otherwise(col(coluna)))

# Caminho de saída temporário do novo arquivo
temp_output_path = "/content/saida_organizada_aprimorada_temp.csv"

# Cabeçalho a ser adicionado ao arquivo
header = "Balancete/Balanco Geral (Documentos: 4010 - 4016 - 4020 - 4026)\nData de geracao dos dados: 2024-11-01\nFonte: Instituicoes financeiras\n#Data Base;Documento;CNPJ;Agência;Nome da Instituição;Banco;Conta;Saldo"

# Salvar o DataFrame como CSV (sem o cabeçalho do DataFrame, para adicionar o cabeçalho manual)
df_spark.coalesce(1).write.option("header", "false").option("delimiter", ";").csv(temp_output_path)

# Encontrar o arquivo CSV gerado no diretório temporário
temp_file = glob.glob(temp_output_path + "/*.csv")[0]

# Lê o conteúdo do arquivo gerado
with open(temp_file, 'r') as file:
    data = file.read()

# Caminho final para salvar o arquivo com cabeçalhos
final_output_path = "/content/saida_organizada_aprimorada.csv"

# Escreva o cabeçalho e os dados no arquivo final
with open(final_output_path, 'w') as final_file:
    final_file.write(header + "\n" + data)

print("Arquivo aprimorado gerado com sucesso! Clique no link abaixo para baixá-lo:")

# Forneça o link para download
from google.colab import files
files.download(final_output_path)
