## Prerequisites

- Python 3.9+
- pandas
- pyarrow (for reading Parquet files)

> ⚠️ **Important**: This notebook is only relevant for datasets that were chunked **without overlap**. If your chunks have overlapping text (e.g., 50 tokens overlap between consecutive chunks), reconstructing documents by simple concatenation will result in **duplicated text** in the output. For overlapping chunks, you would need a more sophisticated deduplication strategy.

In [None]:
# Install dependencies if needed
# %pip install pandas pyarrow

## Input Data Structure

Input Parquet files typically contain the following columns:

| Column | Description |
|--------|-------------|
| `chunk_id` | Unique identifier for the chunk |
| `doc_id` | Original document identifier |
| `chunk_index` | Chunk index within the document (for ordering) |
| `text` | Text content of the chunk |
| `chunk_text` | *(Obsolete)* Formatted chunk text |
| `embeddings_bge-m3` | *(Obsolete)* Chunk embedding vector |
| ... | Other corpus-specific metadata |

## Step 1: Import Required Libraries

In [None]:
import pandas as pd
from pathlib import Path
import glob

## Step 2: Load Parquet Files

In [None]:
def load_parquet_files(folder_path: str) -> pd.DataFrame:
    """
    Load all Parquet files from a folder and concatenate them.
    
    Args:
        folder_path: Path to the folder containing Parquet files
        
    Returns:
        DataFrame containing all data from Parquet files
    """
    parquet_files = glob.glob(f"{folder_path}/*.parquet")
    
    if not parquet_files:
        raise FileNotFoundError(f"No Parquet files found in {folder_path}")
    
    print(f"{len(parquet_files)} Parquet file(s) found")
    
    dfs = []
    for file_path in parquet_files:
        df = pd.read_parquet(file_path)
        print(f"{Path(file_path).name}: {len(df)} rows")
        dfs.append(df)
    
    combined_df = pd.concat(dfs, ignore_index=True)
    print(f"\nTotal: {len(combined_df)} chunks loaded")
    
    return combined_df

In [None]:
# Modify this path to point to your Parquet files folder
parquet_folder = "../data/parquet/travail_emploi"

# Load files
df_chunks = load_parquet_files(parquet_folder)

# Display preview
df_chunks.head()

In [None]:
# Display available columns
print("Available columns:")
for col in df_chunks.columns:
    print(f"  - {col}")

## Step 3: Remove Obsolete Columns

In [None]:
def remove_obsolete_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Remove obsolete columns from the DataFrame.
    
    Args:
        df: DataFrame containing chunked data
        
    Returns:
        DataFrame without obsolete columns
    """
    obsolete_columns = ["chunk_text", "embeddings_bge-m3"] # As 'chunk_text' is based on 'text' and 'embeddings_bge-m3' is based on 'chunk_text'
    
    # Filter columns that actually exist
    columns_to_drop = [col for col in obsolete_columns if col in df.columns]
    
    if columns_to_drop:
        df = df.drop(columns=columns_to_drop)
        print(f"Columns removed: {columns_to_drop}")
    else:
        print("ℹNo obsolete columns found")
    
    return df

In [None]:
# Remove obsolete columns
df_cleaned = remove_obsolete_columns(df_chunks)

print(f"\nRemaining columns: {list(df_cleaned.columns)}")

## Step 4: Reconstruct Original Documents

In [None]:
def reconstruct_documents(df: pd.DataFrame, text_column: str = "text") -> pd.DataFrame:
    """
    Reconstruct original documents by concatenating chunks.
    
    Each document is reconstructed by ordering its chunks by `chunk_index`
    then concatenating their texts.
    
    Args:
        df: DataFrame containing chunks
        text_column: Name of the column containing text to concatenate
        
    Returns:
        DataFrame with one complete document per `doc_id`
    """
    # Check that required columns exist
    required_columns = ["doc_id", "chunk_index", text_column]
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing columns: {missing_columns}")
    
    print(f"Reconstructing {df['doc_id'].nunique()} documents...")
    
    # Sort by doc_id and chunk_index to ensure correct order
    df_sorted = df.sort_values(by=["doc_id", "chunk_index"])
    
    # Identify metadata columns (exclude chunk-specific cols AND doc_id since it's the groupby key)
    chunk_specific_cols = ["chunk_id", "chunk_index", "chunk_xxh64", text_column, "doc_id"]
    metadata_cols = [col for col in df.columns if col not in chunk_specific_cols]
    
    # Aggregation: concatenate text, keep first row's metadata
    agg_dict = {text_column: lambda x: "\n".join(x.astype(str))}
    for col in metadata_cols:
        agg_dict[col] = "first"
    
    df_reconstructed = df_sorted.groupby("doc_id", as_index=False).agg(agg_dict)
    
    # Reorder columns: doc_id first, then metadata, then text
    final_columns = ["doc_id"] + metadata_cols + [text_column]
    df_reconstructed = df_reconstructed[[col for col in final_columns if col in df_reconstructed.columns]]
    
    print(f"{len(df_reconstructed)} documents reconstructed")
    
    return df_reconstructed

In [None]:
# Reconstruct documents
df_documents = reconstruct_documents(df_cleaned)

# Display preview
df_documents.head()

In [None]:
# Verify result for one document
sample_doc_id = df_documents['doc_id'].iloc[0]
print(f"Sample reconstructed document (doc_id: {sample_doc_id})")
print("=" * 60)
print(df_documents[df_documents['doc_id'] == sample_doc_id]['text'].iloc[0][:1000])
print("...")

## Step 5: Save the Result

In [None]:
def save_reconstructed_documents(
    df: pd.DataFrame, 
    output_path: str, 
    format: str = "parquet",
    rows_per_file: int = 50000,
    compression: str = "zstd"
) -> None:
    """
    Save reconstructed documents, splitting into multiple files if needed.
    
    Args:
        df: DataFrame containing reconstructed documents
        output_path: Output path (folder for parquet, file path for csv/json)
        format: Output format ("parquet", "csv", or "json")
        rows_per_file: Target number of rows per parquet file (default: 50000)
        compression: Compression algorithm for parquet ("zstd", "snappy", "gzip", None)
    """
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    
    if format == "parquet":
        # Create output folder
        output_folder = Path(output_path)
        output_folder.mkdir(parents=True, exist_ok=True)
        
        total_rows = len(df)
        
        if total_rows <= rows_per_file:
            # Single file export
            output_file = output_folder / f"{output_folder.name}_part_0.parquet"
            df.to_parquet(
                output_file, 
                index=False, 
                compression=compression,
                engine="pyarrow"
            )
            print(f"Documents saved to: {output_file} ({total_rows} rows)")
        else:
            # Multi-file export
            num_files = (total_rows + rows_per_file - 1) // rows_per_file
            
            for i in range(num_files):
                start_idx = i * rows_per_file
                end_idx = min((i + 1) * rows_per_file, total_rows)
                df_batch = df.iloc[start_idx:end_idx]
                
                output_file = output_folder / f"{output_folder.name}_part_{i}.parquet"
                df_batch.to_parquet(
                    output_file, 
                    index=False, 
                    compression=compression,
                    engine="pyarrow"
                )
                print(f"  Part {i}: {output_file.name} ({len(df_batch)} rows)")
            
            print(f"\nTotal: {total_rows} rows saved in {num_files} file(s) to {output_folder}/")
    
    elif format == "csv":
        output_file = f"{output_path}.csv"
        df.to_csv(output_file, index=False)
        print(f"Documents saved to: {output_file}")
    
    elif format == "json":
        output_file = f"{output_path}.json"
        df.to_json(output_file, orient="records", force_ascii=False, indent=2)
        print(f"Documents saved to: {output_file}")
    
    else:
        raise ValueError(f"Unsupported format: {format}")

In [None]:
# Modify this path to define your output folder/file
output_path = "../data/output/reconstructed_documents"

# Save as Parquet (with ZSTD compression, split into 50k rows per file)
# Recommanded for large datasets
save_reconstructed_documents(
    df_documents, 
    output_path, 
    format="parquet",
    rows_per_file=50000,
    compression="zstd"
)

# Or as JSON (single file)
# save_reconstructed_documents(df_documents, output_path, format="json")