# Convert parquet to delta 

### Setup

In [0]:
# import libs
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Definição de catalog e Schema
catalog_name = "nome_do_seu_catalog"
schema_name = "nome_do_seu_schema"

# Definição do path 
base_path = f"/Volumes/{catalog_name}/{schema_name}/raw"

# Carregamento de Catalog e Schema para o Notebook
try:
    spark.sql(f"USE CATALOG {catalog_name}")
    spark.sql(f"USE SCHEMA {schema_name}")
    print(f"Catálogo '{catalog_name}' e schema '{schema_name}' carregados com sucesso.")
    
except Exception as e:
    print(f"Erro ao carregar catálogo ou/e schema: {e}")

### Criação das tabelas em formato delta

In [0]:
# Loop no diretório para obter o caminho relativo dos arquivos parquet
for path in os.listdir(base_path):
    full_path = os.path.join(base_path, path)

    for parquet in os.listdir(full_path):
        parquet_file = os.path.join(full_path, parquet)

        try:
            # Leitura e escrita dos dados como tabela delta
            spark.read.format("parquet").load(parquet_file)\
                .write.format("delta").option("delta.columnMapping.mode", "name")\
                .mode("overwrite").saveAsTable(f"delta_raw_{str(parquet).replace('-', '_')}_{path}") 

            print(f"Tabela delta criada com sucesso para o arquivo: {parquet_file}")

        except Exception as e:
            print(f"Erro ao criar tabela delta com o arquivo {parquet_file}: {e}")