In [None]:
data_sources = [
    "ArXiv",
    "BookCorpus2",
    "Books3",
    "DM Mathematics",
    "Enron Emails",
    "EuroParl",
    "FreeLaw",
    "Github",
    "Gutenberg (PG-19)",
    "HackerNews",
    "NIH ExPorter",
    "OpenSubtitles",
    "OpenWebText2",
    "PhilPapers",
    "Pile-CC",
    "PubMed Abstracts",
    "PubMed Central",
    "StackExchange",
    "UPSTO Backgrounds",
    "Ubuntu IRC",
    "Wikipedia (en)",
    "YoutubeSubtitles"
]


In [None]:
import gc
from datasets import load_dataset, concatenate_datasets
import pyarrow.parquet as pq
import pyarrow as pa
import os
from tqdm import tqdm

data_sources = [
    "NIH ExPorter",
    "PhilPapers",
    "Enron Emails"
    ]

os.makedirs("parquet_files", exist_ok=True)

for subset_of_interest in tqdm(data_sources, desc="Data Sources"):
    print(f"Processing {subset_of_interest}...")
    
    folder_name = subset_of_interest.replace(" ", "_")
    
    dataset = load_dataset("ArmelR/the-pile-splitted", subset_of_interest, num_proc=8)
    
    concatenated_dataset = concatenate_datasets([dataset['train'], dataset['test']])
    
    os.makedirs(f"parquet_files/{folder_name}", exist_ok=True)

    total_rows = len(concatenated_dataset)
    total_size_bytes = concatenated_dataset.data.nbytes
    size_per_file = 1_000_000_000
    rows_per_file = int((total_rows / total_size_bytes) * size_per_file)

    start_idx = 0
    file_idx = 0
    pbar = tqdm(total=total_rows, desc=f"Saving {subset_of_interest}")
    while start_idx < total_rows:
        end_idx = min(start_idx + rows_per_file, total_rows)
        
        subset_data = concatenated_dataset.select(range(start_idx, end_idx))
        
        subset_table = pa.Table.from_pandas(subset_data.data.to_pandas())
        
        pq.write_table(subset_table, f"parquet_files/{folder_name}/dataset_{file_idx}.parquet")
        
        pbar.update(end_idx - start_idx)
        
        start_idx = end_idx
        file_idx += 1
    
    pbar.close()
    print(f"Exported {subset_of_interest} to {file_idx} Parquet files.")
    
    del dataset
    del concatenated_dataset
    del subset_data
    del subset_table
    
    gc.collect()


In [None]:
import gc
from datasets import load_dataset, concatenate_datasets
import pyarrow.parquet as pq
import pyarrow as pa
import os
import subprocess
from threading import Thread
from tqdm import tqdm

def upload_to_s3(folder_name):
    s3_bucket = "your-s3-bucket-name"
    subprocess.run(["aws", "s3", "sync", f"parquet_pile/{folder_name}", f"s3://{s3_bucket}/{folder_name}"])
    subprocess.run(["rm", "-r", f"parquet_pile/{folder_name}"])

data_sources = [
    "ArXiv",
    "BookCorpus2",
    "Books3",
    "DM Mathematics",
    "Enron Emails",
    "EuroParl",
    "FreeLaw",
    "Github",
    "Gutenberg (PG-19)",
    "HackerNews",
    "NIH ExPorter",
    "OpenSubtitles",
    "OpenWebText2",
    "PhilPapers",
    "Pile-CC",
    "PubMed Abstracts",
    "PubMed Central",
    "StackExchange",
    "UPSTO Backgrounds",
    "Ubuntu IRC",
    "Wikipedia (en)",
    "YoutubeSubtitles"
]


# Create a parent directory to store all Parquet files
os.makedirs("parquet_pile", exist_ok=True)

for subset in tqdm(data_sources, desc="Data Sources"):
    print(f"Processing {subset}...")
    
    folder_name = subset.replace(" ", "_")
    
    dataset = load_dataset("ArmelR/the-pile-splitted", subset, num_proc=8)
    
    concatenated_dataset = concatenate_datasets([dataset['train'], dataset['test']])
    
    os.makedirs(f"parquet_pile/{folder_name}", exist_ok=True)

    total_rows = len(concatenated_dataset)
    total_size_bytes = concatenated_dataset.data.nbytes
    size_per_file = 1_000_000_000
    rows_per_file = int((total_rows / total_size_bytes) * size_per_file)

    start_idx = 0
    file_idx = 0
    pbar = tqdm(total=total_rows, desc=f"Saving {subset}")
    while start_idx < total_rows:
        end_idx = min(start_idx + rows_per_file, total_rows)
        subset_data = concatenated_dataset.select(range(start_idx, end_idx))
        subset_table = pa.Table.from_pandas(subset_data.data.to_pandas())
        pq.write_table(subset_table, f"parquet_pile/{folder_name}/dataset_{file_idx}.parquet")
        pbar.update(end_idx - start_idx)
        start_idx = end_idx
        file_idx += 1
    
    pbar.close()

    # Start a new thread to upload this dataset to S3
    Thread(target=upload_to_s3, args=(folder_name,)).start()
    
    print(f"Exported {subset} to {file_idx} Parquet files.")
    
    del dataset
    del concatenated_dataset
    del subset_data
    del subset_table
    gc.collect()

# # Load parquet in Stream
# dataset = load_dataset(
#     "parquet", data_files=["s3://<bucket name>/<data folder>/data-parquet"],
#     storage_options=fs.storage_options, streaming=True)

# i = 0
# for e in dataset['train']:
#   if i == 5:
#     break
#   i+=1
#   print(e)
