# Import libraries and create spark session

In [2]:
# Garantindo a importação da biblioteca spark.
import sys
if "/home/user" not in sys.path:
    sys.path.append("/home/user")
from src.spark_session import create_spark_session
spark = create_spark_session()

# Importação das outras bibliotecas
from datetime import datetime
from minio import Minio
import tempfile
import time
import zipfile
import io
import os

In [3]:
def DataFrameGenerator(zip_path: str, bucket: str, project_name: str):
    start = time.time()
    
    client = Minio(
        "minio:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
        )
    
    response = client.get_object("raw", zip_path)
    # Ler bytes do stream
    data = response.read()

    # Fechar conexão do objeto minio
    response.close()
    response.release_conn()

    # Abrir ZIP em memória
    zip_buffer = io.BytesIO(data)
    with tempfile.TemporaryDirectory() as temp_dir:
        with zipfile.ZipFile(zip_buffer) as zip_ref:
            zip_ref.extractall(temp_dir)  # Extrai tudo para temp_dir
            
            files = os.listdir(temp_dir)
            print(f"📁 Diretório local de busca: {temp_dir}")
            files_return = [file for file in files if file.endswith('14.csv')]
            print(f"📄 Um total de {len(files_return)} arquivos foram encontrados para os filtros aplicados.")
            
            # Save delta section 
            print(f"Iniciando processo de salvar dados em Delta.")
            delta_save_path = f"s3a://{bucket}/{project_name}/{datetime.now().month}_{datetime.now().year}"
            for csv_file in files_return:
                print(f"⬆️ Enviando {csv_file} no formato Delta para MinIO...")
                path_csv = f"{temp_dir}/{csv_file}"
                folder_nome_for_csv = csv_file.split('-')[-1].replace('14.csv','')
                delta_save_path = f"s3a://{bucket}/{project_name}/{datetime.now().month}_{datetime.now().year}/{folder_nome_for_csv}"
                spark.read.csv(path_csv,header=True,inferSchema=True)\
                .write.format("parquet").mode("append").save(delta_save_path)

            print(f"✅ Arquivos salvos no MinIO")
            end = time.time()
            print(f"⏱️ Processou levou um total de {end - start:.2f} segundos.")
        


In [4]:
bucket='bronze'
zip_path = "raw_uber_dataset/uber-pickups-in-new-york-city.zip"
project_name="uber_dataset"

DataFrameGenerator(zip_path,bucket,project_name)

📁 Diretório local de busca: /tmp/tmp18rto6k5
📄 Um total de 6 arquivos foram encontrados para os filtros aplicados.
Iniciando processo de salvar dados em Delta.
⬆️ Enviando uber-raw-data-jul14.csv no formato Delta para MinIO...
⬆️ Enviando uber-raw-data-aug14.csv no formato Delta para MinIO...
⬆️ Enviando uber-raw-data-apr14.csv no formato Delta para MinIO...
⬆️ Enviando uber-raw-data-may14.csv no formato Delta para MinIO...
⬆️ Enviando uber-raw-data-sep14.csv no formato Delta para MinIO...
⬆️ Enviando uber-raw-data-jun14.csv no formato Delta para MinIO...
✅ Arquivos salvos no MinIO
⏱️ Processou levou um total de 28.34 segundos.
