In [2]:
#versione read_sas7bdat

from concurrent.futures import ThreadPoolExecutor, as_completed
import pyreadstat
from datetime import datetime
import sys
import pyarrow as pa
import pyarrow.parquet as pq
import shutil
import time
import os
import gc
import glob


sas_file = "45cols_202510.sas7bdat"
file_prefix = sas_file.replace(".sas7bdat","") 
target_extension = ".parquet"

table_folder = file_prefix
output_folder = os.path.join("output_folder_zstd", table_folder)


chunksize = 1000000
offset = 0
chunk_idx = 0
max_workers = 1

#provacommit


def timenow():
    timenow = datetime.now().strftime('%H:%M:%S')
    return timenow

#Operator
def process_chunk(output_folder, parquet_file, extension, chunk_idx, chunksize):

    if chunk_idx == 0: print("Chunk Processing: Start")
    filepath = os.path.join(output_folder, parquet_file + "#"  + str(chunk_idx) + extension)

    chunk_start = time.time()
    offset = chunk_idx * chunksize
    
    """"
     #Ha senso solo se ti carichi tanti GB in una volta
    chunk, meta = pyreadstat.read_file_multiprocessing(
        pyreadstat.read_sas7bdat,
        sas_file,
        num_processes = 4,
        row_offset=offset,
        row_limit=chunksize,
        output_format = "polars",
        encoding = "windows-1252",
        disable_datetime_conversion = True
    )
    
    """
    try:
        chunk, _ = pyreadstat.read_sas7bdat(
            sas_file,
            row_offset= offset,
            row_limit= chunksize,
            output_format = "dict",
            encoding = "windows-1252",
            usecols = [
                "libname", "memname", "memtype", "dbms_memtype",
                "memlabel",
                "typemem", "crdate", "modate", "nobs", "obslen",
                "nvar", "protect", "compress", "encrypt", "npage",
                "filesize", "pcompress", "reuse", "bufsize", "delobs",
                "nlobs", "maxvar", "maxlabel", "maxgen", "gen", "attr",
                "indxtype", "datarep", "sortname", "sorttype", "sortchar",
                "datarepname", "encoding", "audit", "audit_before", "audit_admin",
                "audit_error", "audit_data", "num_character", "num_numeric"
                ]
            )
    except Exception as e:
        print(f"{chunk_idx}: Riga{chunk_idx + 1}: Errore critico in lettura: {e}")
    gc.collect()

    table = pa.table(chunk)
    del chunk, _

    pq.write_table(table, filepath, compression = 'zstd', compression_level = 1)
    del table

    gc.collect()
    chunk_end = time.time()
    chunk_elab_time = round((chunk_end - chunk_start),0)

    print(f"Chunk {chunk_idx}: Completed ‚úÖ | ElabTime: {chunk_elab_time} s | {timenow()}")



#Main with Iteration
def main():

    print("A-1")
    if os.path.exists(output_folder):
            shutil.rmtree(output_folder)
    os.mkdir(output_folder)

    #Controllo_Iniziale_File
    print("A0")
    if os.path.exists(sas_file):
        _,meta = pyreadstat.read_sas7bdat(sas_file, metadataonly = True)
    else:
        print(f"File_Check: File {sas_file} non trovato")
        sys.exit(1)

    print("A1")
    rows = meta.number_rows
    num_chunks = (rows + chunksize - 1)//chunksize
    del _
    del meta

    print(f"File Trovato: #Rows: {rows}, chunksize: {chunksize}, #Chunks:{num_chunks}") 

    with ThreadPoolExecutor(max_workers = max_workers) as executor:
            futures = {
                executor.submit(
                    process_chunk,
                    output_folder,
                    file_prefix,
                    target_extension, 
                    chunk_idx,
                    chunksize
                    ): chunk_idx for chunk_idx in range(num_chunks)
            }

    for future in as_completed(futures):
        idx = futures[future]
        try:
            future.result()
        except Exception as e:
            print(f"[Chunk {idx}] Producer error: {e}")

    print("Consolidazione Parquet Finale")
    consolidation_start = time.time()
    #Operazione di unificazione Parquet in Streaming, consuma meno RAM del process_chunk, ci permette di mandare su GCS un unico file (molto pi√π leggero), semplificare lo spark, e fare meno insert su iceberg, mantenedo un versioning dell'iceberg ottimale.
    writer = None
    for file in glob.glob(f"{output_folder}/*.parquet"):
        table = pq.read_table(file)
        if writer is None:
            writer = pq.ParquetWriter(f"{output_folder}/{file_prefix}.parquet", table.schema, compression="zstd",compression_level = 1)
        writer.write_table(table)
        #os.remove(file)
    consolidation_end = time.time()
    consolidation_elab_time = round((consolidation_end - consolidation_start),0)
    print(f"Consolidation: Terminated | ElabTime: {consolidation_elab_time}")

    if writer:
        writer.close()

main()

A-1
A0
A1
File Trovato: #Rows: 8979525, chunksize: 1000000, #Chunks:9
Chunk Processing: Start
Chunk 0: Completed ‚úÖ | ElabTime: 24.0 s | 00:38:46
Chunk 1: Completed ‚úÖ | ElabTime: 24.0 s | 00:39:10
Chunk 2: Completed ‚úÖ | ElabTime: 25.0 s | 00:39:35
Chunk 3: Completed ‚úÖ | ElabTime: 25.0 s | 00:40:00
Chunk 4: Completed ‚úÖ | ElabTime: 28.0 s | 00:40:28
Chunk 5: Completed ‚úÖ | ElabTime: 28.0 s | 00:40:55
Chunk 6: Completed ‚úÖ | ElabTime: 35.0 s | 00:41:30
Chunk 7: Completed ‚úÖ | ElabTime: 29.0 s | 00:41:59
Chunk 8: Completed ‚úÖ | ElabTime: 29.0 s | 00:42:27
Consolidazione Parquet Finale
Consolidation: Terminated | ElabTime: 23.0


In [33]:
import time
import pyarrow.dataset as ds

input_folder = "output_folder_zstd"

start_time = time.time()

# Leggi tutti i file Parquet nella cartella come dataset
dataset = ds.dataset(input_folder, format="parquet")

# Carica l'intero dataset in memoria (solo per test)
table = dataset.to_table()

end_time = time.time()
elapsed = end_time - start_time



# Stampa informazioni
print("üìÑ Schema del dataset:")
print(f"\nüìä Numero di righe: {table.num_rows}")
print(f"üì¶ Numero di colonne: {table.num_columns}")
print("\n‚è±Ô∏è Tempo di lettura: {:.3f} secondi".format(elapsed))

del table


üìÑ Schema del dataset:

üìä Numero di righe: 8560000
üì¶ Numero di colonne: 15

‚è±Ô∏è Tempo di lettura: 0.861 secondi
