In [17]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time

In [24]:
df = pd.read_csv('data.csv', sep=';')
display(df.head())

df.shape

Unnamed: 0,Date,Service,Gare de départ,Gare d'arrivée,Durée moyenne du trajet,Nombre de circulations prévues,Nombre de trains annulés,Commentaire annulations,Nombre de trains en retard au départ,Retard moyen des trains en retard au départ,...,Nombre trains en retard > 15min,Retard moyen trains en retard > 15 (si liaison concurrencée par vol),Nombre trains en retard > 30min,Nombre trains en retard > 60min,Prct retard pour causes externes,Prct retard pour cause infrastructure,Prct retard pour cause gestion trafic,Prct retard pour cause matériel roulant,Prct retard pour cause gestion en gare et réutilisation de matériel,"Prct retard pour cause prise en compte voyageurs (affluence, gestions PSH, correspondances)"
0,2018-01,National,GRENOBLE,PARIS LYON,183,245,0,,37,8.027027,...,25,6.123741,13,6,17.647059,52.941176,0.0,23.529412,5.882353,0.0
1,2018-01,International,PARIS LYON,ITALIE,394,94,0,,27,11.261728,...,22,11.601064,15,6,33.333333,19.047619,23.809524,14.285714,9.52381,0.0
2,2018-01,National,MARSEILLE ST CHARLES,LYON PART DIEU,106,557,7,,133,6.978195,...,40,5.195333,19,5,23.076923,23.076923,19.230769,23.076923,3.846154,7.692308
3,2018-01,National,PARIS NORD,DUNKERQUE,116,271,3,,46,11.236594,...,18,3.738806,9,4,35.714286,28.571429,7.142857,25.0,3.571429,0.0
4,2018-01,National,ANNECY,PARIS LYON,224,198,0,,12,8.070833,...,38,8.552525,14,5,23.809524,42.857143,9.52381,14.285714,4.761905,4.761905


(10687, 26)

In [36]:
def csv_to_parquet_optimized(csv_file_path, parquet_file_path, index_col=None, 
                             compression='snappy', chunk_size=None, separator=','):
    """
    Converts a CSV file to Parquet format with optimizations using PyArrow.

    Args:
        csv_file_path (str): Path to the input CSV file.
        parquet_file_path (str): Path where the output Parquet file will be saved.
        index_col (str, optional): Name of the column to use as index (None by default).
        compression (str, optional): Compression algorithm to use ('snappy', 'gzip', 'brotli', 'zstd').
        chunk_size (int, optional): Number of rows to read at a time for large files (None reads all at once).
    
    Returns:
        bool: True if conversion succeeded, False otherwise.
    """
    start_time = time.time()
    
    print(f"Starting CSV file reading: {csv_file_path}")
    
    try:
        # Read CSV with Pandas
        # Using low_memory=False to prevent dtype warnings on large files
        df = pd.read_csv(csv_file_path, index_col=index_col, low_memory=False, chunksize=chunk_size, sep=separator)
    except FileNotFoundError:
        print(f"Error: CSV file not found at specified location: {csv_file_path}")
        return False
    except Exception as e:
        print(f"Error reading CSV file: {e}")
        return False

    read_time = time.time()
    print(f"CSV reading completed in {read_time - start_time:.2f} seconds.")
    print(f"Rows read: {len(df):,}")
    print(f"Columns: {len(df.columns)}")
    print(f"Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

    # Schema Optimization
    # Convert DataFrame to PyArrow Table
    # This step infers PyArrow schema from Pandas types
    print("Converting to PyArrow table...")
    try:
        table = pa.Table.from_pandas(df, preserve_index=False)
    except Exception as e:
        print(f"Error converting to PyArrow table: {e}")
        return False
    
    # Add Custom Metadata
    # Metadata is stored in the Parquet file footer
    metadata = {
        'creation_tool': 'csv_to_parquet_optimized.py',
        'conversion_timestamp': pd.Timestamp.now().isoformat(),
        'source_file': csv_file_path,
        'original_row_count': str(len(df)),
        'original_column_count': str(len(df.columns)),
        'compression_algorithm': compression
    }
    
    # Integrate metadata into schema
    # PyArrow stores metadata at the schema level
    existing_metadata = table.schema.metadata or {}
    existing_metadata[b'custom_metadata'] = str(metadata).encode('utf8')
    table = table.replace_schema_metadata(existing_metadata)
    
    print(f"Writing Parquet file with '{compression}' compression...")

    # Write Parquet file
    # PyArrow provides efficient Parquet writing
    try:
        pq.write_table(
            table, 
            parquet_file_path, 
            compression=compression,
            use_dictionary=True,      # Efficient for categorical columns
            write_statistics=True,     # Enable statistics for better query performance
            row_group_size=100000,     # Optimize row group size for balance between memory and I/O
            version='2.6'              # Use newer Parquet format version for better features
        )
    except Exception as e:
        print(f"Error writing Parquet file: {e}")
        return False

    end_time = time.time()
    
    # Display results
    import os
    csv_size = os.path.getsize(csv_file_path) / 1024**2
    parquet_size = os.path.getsize(parquet_file_path) / 1024**2
    compression_ratio = (1 - parquet_size / csv_size) * 100
    
    print(f"\nConversion successful!")
    print(f"Parquet file saved to: {parquet_file_path}")
    print(f"Original CSV size: {csv_size:.2f} MB")
    print(f"Parquet file size: {parquet_size:.2f} MB")
    print(f"Compression ratio: {compression_ratio:.1f}%")
    print(f"Total duration: {end_time - start_time:.2f} seconds")
    
    return True


def verify_parquet_file(parquet_file_path, num_rows_preview=5):
    """
    Verifies and displays information about a Parquet file.
    
    Args:
        parquet_file_path (str): Path to the Parquet file to verify.
        num_rows_preview (int): Number of rows to preview (default: 5).
    """
    print(f"\nVerifying Parquet file: {parquet_file_path}")
    
    try:
        parquet_file = pq.ParquetFile(parquet_file_path)
        
        print(f"\nParquet Schema:")
        print(parquet_file.schema)
        
        print(f"\nFile metadata:")
        print(f"Number of row groups: {parquet_file.num_row_groups}")
        print(f"Total rows: {parquet_file.metadata.num_rows:,}")
        
        # Read custom metadata
        metadata_bytes = parquet_file.metadata.metadata.get(b'custom_metadata')
        if metadata_bytes:
            print(f"\nCustom metadata:")
            print(metadata_bytes.decode('utf8'))
        
        # Sample first few rows using pandas read_parquet
        print(f"\nFirst {num_rows_preview} rows preview:")
        df_sample = pd.read_parquet(parquet_file_path, engine='pyarrow').head(num_rows_preview)
        display(df_sample)
        
    except Exception as e:
        print(f"Error verifying Parquet file: {e}")

In [38]:
# Replace with your actual file paths
input_csv = 'data.csv'
output_parquet = 'data.parquet'
    
 # Convert CSV to Parquet
success = csv_to_parquet_optimized(
    input_csv, 
    output_parquet, 
    compression='snappy', # Options: 'snappy', 'gzip', 'brotli', 'zstd'
    separator=";"
)
    
# Verify the conversion if successful
if success:
    verify_parquet_file(output_parquet)

Starting CSV file reading: data.csv
CSV reading completed in 0.04 seconds.
Rows read: 10,687
Columns: 26
Memory usage: 4.99 MB
Converting to PyArrow table...
Writing Parquet file with 'snappy' compression...

Conversion successful!
Parquet file saved to: data.parquet
Original CSV size: 2.32 MB
Parquet file size: 0.82 MB
Compression ratio: 64.6%
Total duration: 0.07 seconds

Verifying Parquet file: data.parquet

Parquet Schema:
<pyarrow._parquet.ParquetSchema object at 0x0000024D06E07600>
required group field_id=-1 schema {
  optional binary field_id=-1 Date (String);
  optional binary field_id=-1 Service (String);
  optional binary field_id=-1 Gare de départ (String);
  optional binary field_id=-1 Gare d'arrivée (String);
  optional int64 field_id=-1 Durée moyenne du trajet;
  optional int64 field_id=-1 Nombre de circulations prévues;
  optional int64 field_id=-1 Nombre de trains annulés;
  optional double field_id=-1 Commentaire annulations;
  optional int64 field_id=-1 Nombre de trai

Unnamed: 0,Date,Service,Gare de départ,Gare d'arrivée,Durée moyenne du trajet,Nombre de circulations prévues,Nombre de trains annulés,Commentaire annulations,Nombre de trains en retard au départ,Retard moyen des trains en retard au départ,...,Nombre trains en retard > 15min,Retard moyen trains en retard > 15 (si liaison concurrencée par vol),Nombre trains en retard > 30min,Nombre trains en retard > 60min,Prct retard pour causes externes,Prct retard pour cause infrastructure,Prct retard pour cause gestion trafic,Prct retard pour cause matériel roulant,Prct retard pour cause gestion en gare et réutilisation de matériel,"Prct retard pour cause prise en compte voyageurs (affluence, gestions PSH, correspondances)"
0,2018-01,National,GRENOBLE,PARIS LYON,183,245,0,,37,8.027027,...,25,6.123741,13,6,17.647059,52.941176,0.0,23.529412,5.882353,0.0
1,2018-01,International,PARIS LYON,ITALIE,394,94,0,,27,11.261728,...,22,11.601064,15,6,33.333333,19.047619,23.809524,14.285714,9.52381,0.0
2,2018-01,National,MARSEILLE ST CHARLES,LYON PART DIEU,106,557,7,,133,6.978195,...,40,5.195333,19,5,23.076923,23.076923,19.230769,23.076923,3.846154,7.692308
3,2018-01,National,PARIS NORD,DUNKERQUE,116,271,3,,46,11.236594,...,18,3.738806,9,4,35.714286,28.571429,7.142857,25.0,3.571429,0.0
4,2018-01,National,ANNECY,PARIS LYON,224,198,0,,12,8.070833,...,38,8.552525,14,5,23.809524,42.857143,9.52381,14.285714,4.761905,4.761905


In [29]:
import duckdb

# Ouvre (ou crée) une base de données sur disque
con = duckdb.connect("ma_base.duckdb")

# Charger un parquet dans une table DuckDB
con.execute("""
    CREATE TABLE IF NOT EXISTS ma_table AS
    SELECT * FROM 'data.parquet';
""")

print(con.execute("SELECT COUNT(*) FROM ma_table").fetchall())

[(10687,)]


In [30]:
con.close()