In [4]:
# Version optimisée pour TRÈS GROS fichiers JSONL vers Parquet
import json
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import os
import gc
from tqdm import tqdm
import psutil

def get_memory_usage():
    """Retourne l'usage mémoire actuel"""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024 / 1024  # GB

def convert_large_jsonl_to_parquet_optimized(input_file, output_file, chunk_size=100000):
    """
    Convertit un très gros fichier JSONL en Parquet par chunks
    optimisé pour la mémoire
    """
    
    # Créer le dossier de sortie
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # Compter le nombre total de lignes pour la barre de progression
    print("🔄 Comptage des lignes...")
    total_lines = 0
    with open(input_file, 'r', encoding='utf-8') as f:
        for _ in f:
            total_lines += 1
    print(f"📊 Total de lignes: {total_lines:,}")
    
    # Variables de traitement
    chunk_data = []
    chunk_files = []
    processed_records = 0
    chunk_num = 0
    errors = 0
    
    print(f"🔄 Traitement par chunks de {chunk_size:,} enregistrements...")
    print(f"💾 Mémoire initiale: {get_memory_usage():.2f} GB")
    
    # Traitement avec barre de progression
    with open(input_file, 'r', encoding='utf-8') as f:
        with tqdm(total=total_lines, desc="Processing", unit=" lines") as pbar:
            
            for line_num, line in enumerate(f, 1):
                pbar.update(1)
                
                line = line.strip()
                if not line:
                    continue
                
                try:
                    data_point = json.loads(line)
                    chunk_data.append(data_point)
                    processed_records += 1
                    
                except json.JSONDecodeError:
                    errors += 1
                    continue
                
                # Sauvegarder le chunk quand il est plein
                if len(chunk_data) >= chunk_size:
                    chunk_file = f"{output_file}.chunk_{chunk_num:04d}.parquet"
                    
                    # Créer DataFrame et sauvegarder
                    chunk_df = pd.DataFrame(chunk_data)
                    chunk_df.to_parquet(
                        chunk_file,
                        compression='snappy',
                        index=False,
                        engine='pyarrow'
                    )
                    
                    chunk_files.append(chunk_file)
                    
                    # Libérer la mémoire
                    del chunk_df
                    chunk_data = []
                    gc.collect()  # Force garbage collection
                    
                    chunk_num += 1
                    
                    # Afficher le progrès
                    memory_usage = get_memory_usage()
                    pbar.set_postfix({
                        'chunks': chunk_num,
                        'records': f"{processed_records:,}",
                        'mem': f"{memory_usage:.2f}GB",
                        'errors': errors
                    })
    
    # Traiter le dernier chunk
    if chunk_data:
        chunk_file = f"{output_file}.chunk_{chunk_num:04d}.parquet"
        chunk_df = pd.DataFrame(chunk_data)
        chunk_df.to_parquet(chunk_file, compression='snappy', index=False)
        chunk_files.append(chunk_file)
        del chunk_df
        chunk_num += 1
    
    print(f"\n✅ Traitement terminé!")
    print(f"📊 {processed_records:,} enregistrements traités")
    print(f"⚠️ {errors} erreurs de parsing")
    print(f"📁 {len(chunk_files)} fichiers chunks créés")
    
    # Combiner tous les chunks en un seul fichier Parquet
    print(f"\n🔄 Fusion des chunks en un seul fichier...")
    
    # Lire et combiner par batches pour éviter la saturation mémoire
    parquet_writer = None
    schema = None
    
    for i, chunk_file in enumerate(tqdm(chunk_files, desc="Merging chunks")):
        # Lire le chunk
        chunk_df = pd.read_parquet(chunk_file)
        
        # Convertir en PyArrow Table
        table = pa.Table.from_pandas(chunk_df)
        
        if parquet_writer is None:
            # Premier chunk : initialiser le writer
            schema = table.schema
            parquet_writer = pq.ParquetWriter(
                output_file,
                schema,
                compression='snappy'
            )
        
        # Écrire le chunk
        parquet_writer.write_table(table)
        
        # Libérer la mémoire
        del chunk_df, table
        gc.collect()
        
        # Supprimer le fichier chunk temporaire
        os.remove(chunk_file)
    
    # Fermer le writer
    if parquet_writer:
        parquet_writer.close()
    
    return processed_records, errors

# ================================
# UTILISATION OPTIMISÉE
# ================================

# Chemins des fichiers
input_file = "../data/raw/Clothing_Shoes_and_Jewelry.jsonl"
output_file = "../data/processed/Clothing_Shoes_and_Jewelry.parquet"

# Paramètres optimisés pour 25GB
CHUNK_SIZE = 50000  # Ajustez selon votre RAM (plus petit = moins de RAM)

print(f"🚀 CONVERSION OPTIMISÉE POUR GROS FICHIERS")
print(f"📁 Fichier source: {input_file}")
print(f"📁 Fichier cible: {output_file}")
print(f"⚙️ Taille des chunks: {CHUNK_SIZE:,}")
print(f"💾 RAM disponible: {psutil.virtual_memory().available / 1024**3:.1f} GB")

try:
    processed, errors = convert_large_jsonl_to_parquet_optimized(
        input_file, 
        output_file, 
        chunk_size=CHUNK_SIZE
    )
    
    # Statistiques finales
    original_size = os.path.getsize(input_file) / (1024**3)  # GB
    parquet_size = os.path.getsize(output_file) / (1024**3)  # GB
    compression_ratio = original_size / parquet_size
    
    print(f"\n🎉 CONVERSION TERMINÉE!")
    print(f"📊 {processed:,} enregistrements convertis")
    print(f"⚠️ {errors} erreurs")
    print(f"📁 Taille JSONL: {original_size:.2f} GB")
    print(f"📁 Taille Parquet: {parquet_size:.2f} GB") 
    print(f"🗜️ Compression: {compression_ratio:.1f}x")
    print(f"💾 Mémoire finale: {get_memory_usage():.2f} GB")
    
    # Test rapide de lecture
    print(f"\n🔄 Test de lecture...")
    df_sample = pd.read_parquet(output_file, engine='pyarrow').head(1000)
    print(f"✅ Lecture réussie! Colonnes: {list(df_sample.columns)}")
    
except Exception as e:
    print(f"❌ Erreur: {e}")
    import traceback
    traceback.print_exc()

🚀 CONVERSION OPTIMISÉE POUR GROS FICHIERS
📁 Fichier source: ../data/raw/Clothing_Shoes_and_Jewelry.jsonl
📁 Fichier cible: ../data/processed/Clothing_Shoes_and_Jewelry.parquet
⚙️ Taille des chunks: 50,000
💾 RAM disponible: 14.7 GB
🔄 Comptage des lignes...
📊 Total de lignes: 66,033,346
🔄 Traitement par chunks de 50,000 enregistrements...
💾 Mémoire initiale: 36.72 GB


Processing:   4%|▍         | 2800000/66033346 [2:07:44<48:04:46, 365.33 lines/s, chunks=55, records=2,750,000, mem=36.95GB, errors=0] 


KeyboardInterrupt: 

In [8]:
!pip install cudf

Collecting cudf
  Downloading cudf-0.6.1.post1.tar.gz (1.1 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: cudf
  Building wheel for cudf (pyproject.toml): started
  Building wheel for cudf (pyproject.toml): finished with status 'error'
Failed to build cudf


  error: subprocess-exited-with-error
  
  × Building wheel for cudf (pyproject.toml) did not run successfully.
  │ exit code: 1
  ╰─> [84 lines of output]
      !!
      
              ********************************************************************************
              Please consider removing the following classifiers in favor of a SPDX license expression:
      
              License :: OSI Approved :: Apache Software License
      
              See https://packaging.python.org/en/latest/guides/writing-pyproject-toml/#license for details.
              ********************************************************************************
      
      !!
        self._finalize_license_expression()
      running bdist_wheel
      running build
      installing to build\bdist.win-amd64\wheel
      running install
      Traceback (most recent call last):
        File [35m"C:\Users\Yann\Desktop\DEV\School\ml_m1\NLP\CustomGPT\scraper_env\Lib\site-packages\pip\_vendor\pyproject_hooks

In [None]:
# Version ultra-optimisée pour RTX 4080 + Windows (SANS cuDF)
import json
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import os
import gc
from tqdm import tqdm
import psutil
from concurrent.futures import ThreadPoolExecutor
import multiprocessing as mp

# CuPy pour GPU (que vous avez déjà)
try:
    import cupy as cp
    GPU_AVAILABLE = cp.cuda.is_available()
    if GPU_AVAILABLE:
        device = cp.cuda.Device()
        print(f"🚀 GPU DÉTECTÉ: {device.name}")
        print(f"💾 VRAM: {device.mem_info[1] / 1024**3:.1f} GB")
        # Limiter l'usage GPU à 80% pour éviter les crashes
        cp.cuda.MemoryPool().set_limit(int(device.mem_info[1] * 0.8))
    else:
        print("⚠️ GPU non disponible")
except ImportError:
    GPU_AVAILABLE = False
    print("📦 CuPy non installé - Mode CPU uniquement")

def get_system_info():
    """Affiche les infos système pour optimiser"""
    cpu_count = mp.cpu_count()
    ram_gb = psutil.virtual_memory().total / 1024**3
    
    print(f"💻 CPU Cores: {cpu_count}")
    print(f"💾 RAM: {ram_gb:.1f} GB")
    
    if GPU_AVAILABLE:
        gpu_memory = cp.cuda.Device().mem_info[1] / 1024**3
        print(f"🎮 VRAM: {gpu_memory:.1f} GB")
        return cpu_count, ram_gb, gpu_memory
    
    return cpu_count, ram_gb, 0

def calculate_optimal_params():
    """Calcule les paramètres optimaux selon votre matériel"""
    cpu_count, ram_gb, gpu_memory = get_system_info()
    
    # Taille des chunks adaptative
    if GPU_AVAILABLE and gpu_memory > 10:
        chunk_size = 300000  # RTX 4080 peut gérer de gros chunks
        workers = min(cpu_count, 8)
    elif ram_gb > 16:
        chunk_size = 200000
        workers = min(cpu_count, 6)
    elif ram_gb > 8:
        chunk_size = 100000
        workers = min(cpu_count, 4)
    else:
        chunk_size = 50000
        workers = 2
    
    print(f"⚙️ Chunk size: {chunk_size:,}")
    print(f"🧵 Workers: {workers}")
    
    return chunk_size, workers

def parse_json_chunk_parallel(lines_chunk, worker_id=0):
    """Parse JSON en parallèle avec gestion d'erreurs"""
    parsed_data = []
    errors = 0
    
    for line in lines_chunk:
        line = line.strip()
        if line:
            try:
                parsed_data.append(json.loads(line))
            except json.JSONDecodeError:
                errors += 1
                continue
    
    return parsed_data, errors

def optimize_dataframe_gpu(df):
    """Optimise le DataFrame avec GPU si disponible"""
    if not GPU_AVAILABLE:
        return df
    
    try:
        # Optimisations GPU pour colonnes numériques
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            if col in df.columns and not df[col].isna().all():
                # Transférer vers GPU
                values = df[col].fillna(0).values.astype(np.float32)
                gpu_array = cp.asarray(values)
                
                # Opérations GPU rapides
                if col == 'user_rating':
                    # Normaliser les ratings
                    mean_val = cp.mean(gpu_array)
                    std_val = cp.std(gpu_array)
                    df[f'{col}_normalized'] = cp.asnumpy((gpu_array - mean_val) / (std_val + 1e-8))
                
                # Nettoyer GPU
                del gpu_array
        
        # Optimisations pour le texte
        if 'review_text' in df.columns:
            # Calculer longueurs sur GPU
            text_lengths = df['review_text'].str.len().fillna(0).values
            if len(text_lengths) > 0:
                gpu_lengths = cp.asarray(text_lengths)
                
                # Stats rapides
                mean_length = float(cp.mean(gpu_lengths))
                max_length = float(cp.max(gpu_lengths))
                
                df['text_length'] = cp.asnumpy(gpu_lengths)
                
                del gpu_lengths
        
        # Forcer le nettoyage GPU
        cp.get_default_memory_pool().free_all_blocks()
        
    except Exception as e:
        print(f"⚠️ GPU optimization failed: {e}")
    
    return df

def convert_large_jsonl_final(input_file, output_file, chunk_size=None, num_workers=None):
    """
    VERSION FINALE - Optimisée pour RTX 4080 + Windows
    SANS cuDF mais avec toutes les autres optimisations
    """
    
    print("🚀 CONVERSION ULTRA-OPTIMISÉE RTX 4080 + WINDOWS")
    print("=" * 60)
    
    # Paramètres adaptatifs
    if chunk_size is None or num_workers is None:
        chunk_size, num_workers = calculate_optimal_params()
    
    # Créer dossier de sortie
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    # Compter les lignes pour la progression
    print("🔄 Analyse du fichier...")
    with open(input_file, 'r', encoding='utf-8') as f:
        total_lines = sum(1 for _ in f)
    
    file_size_gb = os.path.getsize(input_file) / 1024**3
    print(f"📊 {total_lines:,} lignes ({file_size_gb:.2f} GB)")
    
    # Variables de traitement
    chunk_files = []
    processed_records = 0
    total_errors = 0
    chunk_num = 0
    
    # Buffer pour lecture par chunks
    lines_buffer = []
    
    print(f"\n🔄 Traitement en cours...")
    
    with open(input_file, 'r', encoding='utf-8') as f:
        with tqdm(total=total_lines, desc="📊 Processing", unit=" lines", 
                  bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
            
            for line_num, line in enumerate(f, 1):
                pbar.update(1)
                lines_buffer.append(line)
                
                # Traiter quand le buffer est plein
                if len(lines_buffer) >= chunk_size:
                    
                    # Diviser en sous-chunks pour traitement parallèle
                    sub_chunk_size = len(lines_buffer) // num_workers
                    if sub_chunk_size == 0:
                        sub_chunk_size = len(lines_buffer)
                    
                    sub_chunks = [
                        lines_buffer[i:i + sub_chunk_size]
                        for i in range(0, len(lines_buffer), sub_chunk_size)
                    ]
                    
                    # Parse JSON en parallèle
                    all_parsed_data = []
                    chunk_errors = 0
                    
                    with ThreadPoolExecutor(max_workers=num_workers) as executor:
                        future_to_chunk = {
                            executor.submit(parse_json_chunk_parallel, chunk, i): i 
                            for i, chunk in enumerate(sub_chunks)
                        }
                        
                        for future in future_to_chunk:
                            try:
                                parsed_data, errors = future.result()
                                all_parsed_data.extend(parsed_data)
                                chunk_errors += errors
                            except Exception as e:
                                print(f"⚠️ Worker error: {e}")
                                chunk_errors += 1
                    
                    # Créer DataFrame si on a des données
                    if all_parsed_data:
                        df = pd.DataFrame(all_parsed_data)
                        
                        # 🚀 OPTIMISATIONS GPU
                        df = optimize_dataframe_gpu(df)
                        
                        # Sauvegarder le chunk
                        chunk_file = f"{output_file}.chunk_{chunk_num:04d}.parquet"
                        df.to_parquet(
                            chunk_file,
                            compression='snappy',
                            index=False,
                            engine='pyarrow',
                            use_dictionary=True  # Optimisation supplémentaire
                        )
                        
                        chunk_files.append(chunk_file)
                        processed_records += len(all_parsed_data)
                        total_errors += chunk_errors
                        
                        # Nettoyer la mémoire
                        del df, all_parsed_data
                        gc.collect()
                        
                        chunk_num += 1
                    
                    # Vider le buffer
                    lines_buffer = []
                    
                    # Mise à jour de la barre de progression
                    memory_usage = psutil.Process().memory_info().rss / 1024**3
                    
                    postfix = {
                        'chunks': chunk_num,
                        'records': f"{processed_records:,}",
                        'RAM': f"{memory_usage:.1f}GB",
                        'errors': total_errors
                    }
                    
                    if GPU_AVAILABLE:
                        gpu_memory = cp.get_default_memory_pool().used_bytes() / 1024**3
                        postfix['GPU'] = f"{gpu_memory:.1f}GB"
                    
                    pbar.set_postfix(postfix)
    
    # Traiter le dernier buffer
    if lines_buffer:
        print("🔄 Traitement du dernier chunk...")
        parsed_data, errors = parse_json_chunk_parallel(lines_buffer)
        if parsed_data:
            df = pd.DataFrame(parsed_data)
            df = optimize_dataframe_gpu(df)
            
            chunk_file = f"{output_file}.chunk_{chunk_num:04d}.parquet"
            df.to_parquet(chunk_file, compression='snappy', index=False)
            chunk_files.append(chunk_file)
            processed_records += len(parsed_data)
            total_errors += errors
            del df
    
    print(f"\n✅ Parsing terminé!")
    print(f"📊 {processed_records:,} enregistrements traités")
    print(f"⚠️ {total_errors} erreurs de parsing")
    print(f"📁 {len(chunk_files)} chunks créés")
    
    # 🔄 FUSION FINALE ULTRA-RAPIDE
    print(f"\n🔄 Fusion finale des chunks...")
    merge_chunks_ultra_fast(chunk_files, output_file)
    
    return processed_records, total_errors

def merge_chunks_ultra_fast(chunk_files, output_file):
    """Fusion ultra-rapide avec PyArrow pur - VERSION CORRIGÉE"""
    
    print(f"🚀 Fusion de {len(chunk_files)} chunks avec PyArrow...")
    
    # Lire tous les chunks comme tables PyArrow
    tables = []
    
    for chunk_file in tqdm(chunk_files, desc="📖 Loading", unit=" chunks"):
        try:
            table = pq.read_table(chunk_file)
            tables.append(table)
        except Exception as e:
            print(f"⚠️ Erreur lecture {chunk_file}: {e}")
    
    if not tables:
        print("❌ Aucun chunk valide trouvé!")
        return
    
    # Concaténation PyArrow (ultra-rapide)
    print("🔄 Concaténation...")
    combined_table = pa.concat_tables(tables)
    
    # Écriture avec optimisations COMPATIBLES
    print("💾 Écriture finale...")
    try:
        # Essayer avec toutes les optimisations
        pq.write_table(
            combined_table,
            output_file,
            compression='snappy',
            use_dictionary=True,
            write_statistics=True,
            row_group_size=100000,
            # use_byte_stream_split=True  # ❌ SUPPRIMÉ - cause l'erreur
        )
        print("✅ Écriture avec optimisations complètes")
    except Exception as e:
        print(f"⚠️ Erreur avec optimisations: {e}")
        print("🔄 Fallback vers écriture basique...")
        
        # Fallback vers écriture simple
        try:
            pq.write_table(
                combined_table,
                output_file,
                compression='snappy',
                use_dictionary=False,  # Désactiver si problème
                write_statistics=False
            )
            print("✅ Écriture basique réussie")
        except Exception as e2:
            print(f"❌ Erreur critique: {e2}")
            # Dernier recours avec pandas
            print("🔄 Dernier recours avec pandas...")
            df = combined_table.to_pandas()
            df.to_parquet(output_file, compression='snappy', index=False)
            print("✅ Sauvegarde pandas réussie")
    
    # Nettoyage
    print("🧹 Nettoyage...")
    for chunk_file in chunk_files:
        try:
            os.remove(chunk_file)
        except:
            pass
    
    # Libérer mémoire
    del tables, combined_table
    gc.collect()
    
    print("✅ Fusion terminée!")

# ================================
# UTILISATION SIMPLE
# ================================

def main():
    """Fonction principale - lancez ça!"""
    
    # Vos fichiers
    input_file = "../data/raw/Clothing_Shoes_and_Jewelry.jsonl"
    output_file = "../data/raw/Clothing_Shoes_and_Jewelry.parquet"
    
    print("🎯 TRAITEMENT DE VOTRE FICHIER 25GB")
    print(f"📁 Source: {input_file}")
    print(f"📁 Cible: {output_file}")
    
    # Vérifier que le fichier existe
    if not os.path.exists(input_file):
        print(f"❌ Fichier non trouvé: {input_file}")
        return
    
    try:
        start_time = time.time()
        
        processed, errors = convert_large_jsonl_final(input_file, output_file)
        
        end_time = time.time()
        processing_time = end_time - start_time
        
        # Statistiques finales
        if os.path.exists(output_file):
            original_size = os.path.getsize(input_file) / (1024**3)
            parquet_size = os.path.getsize(output_file) / (1024**3)
            compression_ratio = original_size / parquet_size
            speed = processed / processing_time
            
            print(f"\n🎉 CONVERSION TERMINÉE!")
            print(f"⏱️ Temps: {processing_time/60:.1f} minutes")
            print(f"📊 {processed:,} enregistrements")
            print(f"⚠️ {errors} erreurs ({errors/processed*100:.2f}%)")
            print(f"📁 {original_size:.2f} GB → {parquet_size:.2f} GB")
            print(f"🗜️ Compression: {compression_ratio:.1f}x")
            print(f"🚀 Vitesse: {speed:,.0f} records/sec")
            
            # Test de lecture
            print(f"\n🔍 Test de lecture...")
            sample = pd.read_parquet(output_file).head(5)
            print(f"✅ Lecture OK! Shape: {sample.shape}")
            print(f"📋 Colonnes: {list(sample.columns)}")
            
        else:
            print("❌ Fichier de sortie non créé")
    
    except Exception as e:
        print(f"❌ Erreur: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    import time
    main()

📦 CuPy non installé - Mode CPU uniquement
🎯 TRAITEMENT DE VOTRE FICHIER 25GB
📁 Source: ../data/raw/Clothing_Shoes_and_Jewelry.jsonl
📁 Cible: ../data/raw/Clothing_Shoes_and_Jewelry.parquet
🚀 CONVERSION ULTRA-OPTIMISÉE RTX 4080 + WINDOWS
💻 CPU Cores: 24
💾 RAM: 63.8 GB
⚙️ Chunk size: 200,000
🧵 Workers: 6
🔄 Analyse du fichier...
📊 66,033,346 lignes (25.90 GB)

🔄 Traitement en cours...


📊 Processing: 100%|██████████| 66033346/66033346 [1:09:29<00:00]  


🔄 Traitement du dernier chunk...

✅ Parsing terminé!
📊 66,033,346 enregistrements traités
⚠️ 0 erreurs de parsing
📁 331 chunks créés

🔄 Fusion finale des chunks...
🚀 Fusion de 331 chunks avec PyArrow...


📖 Loading: 100%|██████████| 331/331 [01:31<00:00,  3.62 chunks/s]


🔄 Concaténation...
💾 Écriture finale...
❌ Erreur: BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 and FIXED_LEN_BYTE_ARRAY


Traceback (most recent call last):
  File "C:\Users\Yann\AppData\Local\Temp\ipykernel_25564\113660458.py", line 351, in main
    processed, errors = convert_large_jsonl_final(input_file, output_file)
                        ~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Yann\AppData\Local\Temp\ipykernel_25564\113660458.py", line 275, in convert_large_jsonl_final
    merge_chunks_ultra_fast(chunk_files, output_file)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Yann\AppData\Local\Temp\ipykernel_25564\113660458.py", line 304, in merge_chunks_ultra_fast
    pq.write_table(
    ~~~~~~~~~~~~~~^
        combined_table,
        ^^^^^^^^^^^^^^^
    ...<5 lines>...
        use_byte_stream_split=True  # Compression améliorée
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "c:\Users\Yann\Desktop\DEV\School\ml_m1\NLP\CustomGPT\scraper_env\Lib\site-packages\pyarrow\parquet\core.py", line 1909, in write_table
    writer.wri

In [5]:
import os
import gc
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from tqdm import tqdm

def merge_parquet_chunks(chunk_files, output_file):
    """
    Fusionne une liste de fichiers Parquet en un seul.
    
    Paramètres :
        chunk_files (list of str): chemins des fichiers Parquet à fusionner
        output_file (str): chemin du fichier Parquet final
    """
    
    print(f"🔄 Fusion de {len(chunk_files)} fichiers en {output_file}")
    
    # S'assurer que le dossier de sortie existe
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    parquet_writer = None
    schema = None

    for chunk_path in tqdm(chunk_files, desc="🔗 Fusion des chunks"):
        try:
            # Lire chunk
            chunk_df = pd.read_parquet(chunk_path)
            table = pa.Table.from_pandas(chunk_df)

            if parquet_writer is None:
                # Initialisation du writer avec le schéma du premier chunk
                schema = table.schema
                parquet_writer = pq.ParquetWriter(
                    output_file,
                    schema=schema,
                    compression="snappy"
                )

            # Écriture du chunk
            parquet_writer.write_table(table)

        except Exception as e:
            print(f"❌ Erreur sur {chunk_path}: {e}")
        finally:
            # Libération mémoire
            del chunk_df, table
            gc.collect()
    
    # Finaliser
    if parquet_writer:
        parquet_writer.close()

    print(f"\n✅ Fusion terminée : {output_file}")

# =========================
# Exemple d'utilisation
# =========================
if __name__ == "__main__":
    # Exemple de détection automatique des fichiers chunkés
    import glob

    # Chemin de base (adapter selon votre structure) Clothing_Shoes_and_Jewelry.parquet.chunk_0330.parquet
    output_file = "../data/processed/Clothing_Shoes_and_Jewelry.parquet"
    chunk_pattern = os.path.join(
        os.path.dirname(output_file),
        "Clothing_Shoes_and_Jewelry.parquet.chunk_*.parquet"
    )


    chunk_files = sorted(glob.glob(chunk_pattern))
    
    if not chunk_files:
        print("⚠️ Aucun fichier chunk trouvé.")
    else:
        merge_parquet_chunks(chunk_files, output_file)


🔄 Fusion de 56 fichiers en ../data/processed/Clothing_Shoes_and_Jewelry.parquet


🔗 Fusion des chunks: 100%|██████████| 56/56 [00:13<00:00,  4.19it/s]


✅ Fusion terminée : ../data/processed/Clothing_Shoes_and_Jewelry.parquet





In [7]:
import os
import gc
import glob
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm

def merge_parquet_chunks(chunk_files, output_file):
    """
    Fusionne une liste de fichiers Parquet en un seul.
    
    Args:
        chunk_files (list of str): chemins des fichiers Parquet à fusionner
        output_file (str): chemin du fichier Parquet final
    """
    print(f"🔄 Fusion de {len(chunk_files)} fichiers en {output_file}")
    
    # Créer le dossier de sortie si nécessaire
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    parquet_writer = None

    for chunk_path in tqdm(chunk_files, desc="🔗 Fusion des chunks"):
        try:
            chunk_df = pd.read_parquet(chunk_path)
            table = pa.Table.from_pandas(chunk_df)

            if parquet_writer is None:
                parquet_writer = pq.ParquetWriter(
                    output_file,
                    table.schema,
                    compression="snappy"
                )

            parquet_writer.write_table(table)

        except Exception as e:
            print(f"❌ Erreur avec {chunk_path} : {e}")
        finally:
            del chunk_df, table
            gc.collect()
    
    if parquet_writer:
        parquet_writer.close()

    print(f"\n✅ Fusion terminée : {output_file}")

# =========================
# Exemple d'utilisation
# =========================
if __name__ == "__main__":
    # Dossier contenant les fichiers chunkés
    chunk_dir = "../data/raw"  # <- Change ici si nécessaire
    output_file = "../data/processed/Clothing_Shoes_and_Jewelry.parquet"

    # Motif pour trouver tous les chunks
    chunk_pattern = os.path.join(chunk_dir, "Clothing_Shoes_and_Jewelry.parquet.chunk_*.parquet")

    # Lire tous les chunks
    chunk_files = sorted(glob.glob(chunk_pattern))

    print(f"🔍 {len(chunk_files)} fichiers trouvés avec le motif : {chunk_pattern}")
    
    if not chunk_files:
        print("⚠️ Aucun fichier chunk trouvé.")
    else:
        merge_parquet_chunks(chunk_files, output_file)


🔍 331 fichiers trouvés avec le motif : ../data/raw\Clothing_Shoes_and_Jewelry.parquet.chunk_*.parquet
🔄 Fusion de 331 fichiers en ../data/processed/Clothing_Shoes_and_Jewelry.parquet


🔗 Fusion des chunks: 100%|██████████| 331/331 [04:08<00:00,  1.33it/s]


✅ Fusion terminée : ../data/processed/Clothing_Shoes_and_Jewelry.parquet





In [None]:
import pandas as pd

# Chemin vers le fichier Parquet fusionné
parquet_file = "../data/processed/Clothing_Shoes_and_Jewelry.parquet"

# Lire le fichier complet (attention à la taille en RAM !)
df = pd.read_parquet(parquet_file, engine="pyarrow")

# Afficher les premières lignes
print("✅ Aperçu du DataFrame fusionné :")
print(df.head())

# Afficher quelques infos utiles
print("\n📊 Infos générales :")
print(df.info())

print("\n📏 Dimensions :")
print(f"Lignes : {df.shape[0]:,}")
print(f"Colonnes : {df.shape[1]}")


In [2]:
import pyarrow.dataset as ds
import pandas as pd

parquet_file = "../data/processed/merged/Clothing_Shoes_and_Jewelry.parquet"
dataset = ds.dataset(parquet_file, format="parquet")

batch_reader = dataset.to_batches()


rows_collected = 0
max_rows = 3_000_000
frames = []

for batch in batch_reader:
    batch_df = batch.to_pandas()
    batch_len = len(batch_df)
    
    if rows_collected + batch_len >= max_rows:
        # Prendre uniquement le reste
        needed = max_rows - rows_collected
        frames.append(batch_df.iloc[:needed])
        break
    else:
        frames.append(batch_df)
        rows_collected += batch_len

df = pd.concat(frames, ignore_index=True)
print(f"✅ {len(df):,} lignes lues")
print(df.head())
print("\n📊 Infos générales :")
print(df.info())



✅ 3,000,000 lignes lues
   rating                                    title  \
0     3.0  Arrived Damaged : liquid in hub locker!   
1     3.0                Useless under 40 degrees.   
2     4.0   Not waterproof, but a very comfy shoe.   
3     4.0        Lovely, but QA issues with sewing   
4     2.0                                  Just ok   

                                                text  \
0  Unfortunately Amazon in their wisdom (cough, c...   
1  Useless under 40 degrees unless you’re just ru...   
2  I purchased these bc they are supposed to be w...   
3  I’ll start by saying I love this robe!  I trul...   
4  Don't be fooled by the description. I was free...   

                                              images        asin parent_asin  \
0  [{'attachment_type': 'IMAGE', 'large_image_url...  B096S6LZV4  B09NSZ5QMF   
1                                                 []  B09KMDBDCN  B08NGL3X17   
2                                                 []  B096N5WK8Q  B07RGM3D

In [3]:

df["date"] = pd.to_datetime(df["timestamp"], unit="ms")
df["date"]

0         2023-03-04 14:06:07.351
1         2023-02-22 16:36:59.242
2         2023-02-04 15:21:38.918
3         2018-12-18 06:29:37.507
4         2022-02-18 22:29:32.746
                    ...          
2999995   2022-09-01 18:22:35.935
2999996   2020-12-31 04:24:51.163
2999997   2020-08-11 23:57:21.001
2999998   2020-07-31 18:10:50.754
2999999   2018-01-15 19:04:41.829
Name: date, Length: 3000000, dtype: datetime64[ns]

#### Conversion timestamp

In [None]:
import os
import gc
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from tqdm import tqdm

def process_parquet_in_batches(input_file, output_file, batch_size=100_000):
    """
    Lit un gros fichier Parquet par batchs, ajoute une colonne date convertie depuis timestamp,
    sauvegarde chaque batch en chunk parquet, puis fusionne tous les chunks.
    """
    # Dossiers de travail
    output_dir = os.path.dirname(output_file)
    os.makedirs(output_dir, exist_ok=True)

    temp_chunks_dir = os.path.join(output_dir, "tmp_chunks")
    os.makedirs(temp_chunks_dir, exist_ok=True)

    print("📥 Lecture du dataset parquet en batchs...")
    dataset = ds.dataset(input_file, format="parquet")
    batch_reader = dataset.to_batches(batch_size=batch_size)

    chunk_paths = []
    total_rows = 0

    for i, batch in enumerate(tqdm(batch_reader, desc="🧪 Traitement batchs")):
        df = batch.to_pandas()

        # Ajout de la colonne date (à partir de timestamp)
        df["date"] = pd.to_datetime(df["timestamp"], unit="ms")

        # Sauvegarde du batch traité
        chunk_path = os.path.join(temp_chunks_dir, f"processed_chunk_{i:04d}.parquet")
        df.to_parquet(chunk_path, index=False, engine="pyarrow", compression="snappy")
        chunk_paths.append(chunk_path)

        total_rows += len(df)

        # Nettoyage mémoire
        del df, batch
        gc.collect()

    print(f"\n✅ {len(chunk_paths)} chunks traités. Fusion en un seul fichier final...")

    # Fusion des chunks
    parquet_writer = None
    for chunk_file in tqdm(chunk_paths, desc="🔗 Fusion finale"):
        chunk_df = pd.read_parquet(chunk_file)
        table = pa.Table.from_pandas(chunk_df)

        if parquet_writer is None:
            parquet_writer = pq.ParquetWriter(output_file, table.schema, compression="snappy")

        parquet_writer.write_table(table)

        # Nettoyage
        del chunk_df, table
        gc.collect()

    if parquet_writer:
        parquet_writer.close()

    print(f"\n🎉 Fichier final traité enregistré à : {output_file}")
    print(f"📊 Lignes totales : {total_rows:,}")
    print("🧹 Suppression des fichiers temporaires...")
    
    # Suppression des chunks temporaires
    for f in chunk_paths:
        try:
            os.remove(f)
        except Exception as e:
            print(f"⚠️ Erreur suppression {f}: {e}")
    os.rmdir(temp_chunks_dir)

# ===============================
# Exemple d'utilisation
# ===============================
if __name__ == "__main__":
    input_parquet = "../data/processed/merged/Clothing_Shoes_and_Jewelry.parquet"
    output_parquet = "../data/processed/final/Clothing_Shoes_and_Jewelry_timestamped.parquet"

    process_parquet_in_batches(input_parquet, output_parquet, batch_size=100_000)


📥 Lecture du dataset parquet en batchs...


🧪 Traitement batchs: 661it [03:38,  3.03it/s]



✅ 661 chunks traités. Fusion en un seul fichier final...


🔗 Fusion finale:   2%|▏         | 16/661 [00:05<03:23,  3.18it/s]

#### partitionnement

In [None]:
import pandas as pd
import pyarrow.parquet as pq
import math
import os
import sys
import traceback

# === PARAMÈTRES ===
input_file = "../data/processed/final/Clothing_Shoes_and_Jewelry_processed.parquet"
partition_dir = "../data/processed/partitions"
partition_size = 6_000_000  # lignes par partition
overwrite_existing = False  # Mettre à True pour réécraser les partitions existantes

# === CRÉATION DU DOSSIER DE SORTIE ===
try:
    os.makedirs(partition_dir, exist_ok=True)
except Exception as e:
    print(f"❌ Impossible de créer le dossier {partition_dir} : {e}")
    sys.exit(1)

# === CHARGEMENT DU DATAFRAME ===
try:
    print("📥 Chargement du DataFrame depuis le Parquet...")
    df = pd.read_parquet(input_file, engine="pyarrow")
    total_rows = len(df)
except FileNotFoundError:
    print(f"❌ Fichier introuvable : {input_file}")
    sys.exit(1)
except Exception:
    print("❌ Erreur inattendue lors du chargement du Parquet :")
    traceback.print_exc()
    sys.exit(1)

# === CALCUL DES PARTITIONS ===
num_partitions = math.ceil(total_rows / partition_size)
print(f"📊 Total lignes: {total_rows:,} → {num_partitions} partitions de {partition_size:,} lignes")

# === TRAITEMENT PAR PARTITION ===
for i in range(num_partitions):
    start_idx = i * partition_size
    end_idx = min(start_idx + partition_size, total_rows)
    part_path = os.path.join(partition_dir, f"partition_{i:02d}.parquet")
    
    if os.path.exists(part_path) and not overwrite_existing:
        print(f"⚠️ Partition déjà existante, ignorée : {part_path}")
        continue

    try:
        df_part = df.iloc[start_idx:end_idx]
        df_part.to_parquet(part_path, index=False, compression="snappy")
        print(f"✅ Partition {i+1}/{num_partitions} sauvegardée : {part_path} ({len(df_part):,} lignes)")
    except Exception as e:
        print(f"❌ Erreur lors de l’écriture de la partition {i} : {e}")
        traceback.print_exc()
    finally:
        del df_part

print("\n🎉 Partitionnement terminé avec succès.")
