diff --git a/litgpt/data/prepare_slimpajama.py b/litgpt/data/prepare_slimpajama.py deleted file mode 100644 index 9076ad32b0..0000000000 --- a/litgpt/data/prepare_slimpajama.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright Lightning AI. Licensed under the Apache License 2.0, see LICENSE file. - -import json -import os -import time -from pathlib import Path - -from litgpt import Tokenizer -from litgpt.data.prepare_starcoder import DataChunkRecipe -from litgpt.utils import CLI - - -class SlimPajamaDataRecipe(DataChunkRecipe): - def __init__(self, tokenizer: Tokenizer, chunk_size: int): - super().__init__(chunk_size) - self.tokenizer = tokenizer - - def prepare_structure(self, input_dir): - files = Path(input_dir).rglob("*.zst") - return [str(file) for file in files] - - def prepare_item(self, filepath): - import zstandard as zstd - - with zstd.open(open(filepath, "rb"), "rt", encoding="utf-8") as f: - for row in f: - text = json.loads(row)["text"] - if json.loads(row)["meta"]["redpajama_set_name"] == "RedPajamaGithub": - continue # exclude the GitHub data since it overlaps with starcoder - text_ids = self.tokenizer.encode(text, bos=False, eos=True) - yield text_ids - - -def prepare( - input_dir: Path = Path("data/SlimPajama-627B/train"), - output_dir: Path = Path("data/slimpajama/train"), - tokenizer_path: Path = Path("checkpoints/Llama-2-7b-hf/"), - chunk_size: int = (2049 * 16384), - fast_dev_run: bool = False, -) -> None: - from litdata.processing.data_processor import DataProcessor - - tokenizer = Tokenizer(tokenizer_path) - data_recipe = SlimPajamaDataRecipe(tokenizer=tokenizer, chunk_size=chunk_size) - data_processor = DataProcessor( - input_dir=str(input_dir), - output_dir=str(output_dir), - fast_dev_run=fast_dev_run, - num_workers=os.cpu_count(), - num_downloaders=1, - ) - - start_time = time.time() - data_processor.run(data_recipe) - elapsed_time = time.time() - start_time - print(f"Time taken: {elapsed_time:.2f} seconds") - - -if __name__ == "__main__": - CLI(prepare) diff --git a/litgpt/data/prepare_starcoder.py b/litgpt/data/prepare_starcoder.py deleted file mode 100644 index 1a93f2e10f..0000000000 --- a/litgpt/data/prepare_starcoder.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright Lightning AI. Licensed under the Apache License 2.0, see LICENSE file. - -import os -import time -import traceback -from pathlib import Path - -from lightning_utilities.core.imports import RequirementCache - -from litgpt import Tokenizer -from litgpt.utils import CLI - -_LITDATA_AVAILABLE = RequirementCache("litdata") -if _LITDATA_AVAILABLE: - from litdata.processing.data_processor import DataChunkRecipe -else: - DataChunkRecipe = object - - -class StarcoderDataRecipe(DataChunkRecipe): - def __init__(self, tokenizer: Tokenizer, chunk_size: int): - super().__init__(chunk_size) - self.tokenizer = tokenizer - - def prepare_structure(self, input_dir): - files = Path(input_dir).rglob("*.parquet") - return [str(file) for file in files] - - def prepare_item(self, item_metadata): - import pyarrow.parquet as pq - - filepath = item_metadata - start = time.time() - - try: - parquet_file = pq.ParquetFile(filepath) - # reduce RAM usage - for batch in parquet_file.iter_batches(batch_size=8192, columns=["content"]): - for text in batch.to_pandas()["content"]: - yield self.tokenizer.encode(text, bos=False, eos=True) - - except Exception: - print(traceback.format_exc()) - print(f"Error reading {filepath}") - return - - parquet_file.close() - end = time.time() - print(f"Took {end - start:.2f} seconds total", filepath) - - -def prepare( - input_dir: Path = Path("data/starcoderdata"), - output_dir: Path = Path("data/starcoder"), - tokenizer_path: Path = Path("checkpoints/Llama-2-7b-hf/"), - chunk_size: int = (2049 * 8192), - fast_dev_run: bool = False, -) -> None: - from litdata.processing.data_processor import DataProcessor - - tokenizer = Tokenizer(tokenizer_path) - data_recipe = StarcoderDataRecipe(tokenizer=tokenizer, chunk_size=chunk_size) - data_processor = DataProcessor( - input_dir=str(input_dir), - output_dir=str(output_dir), - fast_dev_run=fast_dev_run, - num_workers=os.cpu_count(), - num_downloaders=1, - ) - - start_time = time.time() - data_processor.run(data_recipe) - elapsed_time = time.time() - start_time - print(f"Time taken: {elapsed_time:.2f} seconds") - - -if __name__ == "__main__": - CLI(prepare) diff --git a/litgpt/data/tinyllama.py b/litgpt/data/tinyllama.py index 0f32507aa2..ace5f93ed2 100644 --- a/litgpt/data/tinyllama.py +++ b/litgpt/data/tinyllama.py @@ -1,7 +1,11 @@ # Copyright Lightning AI. Licensed under the Apache License 2.0, see LICENSE file. +import json +import os +import time +import traceback from dataclasses import dataclass, field from pathlib import Path -from typing import Optional, Union +from typing import Union, Optional, Generator from torch.utils.data import DataLoader @@ -25,29 +29,48 @@ class TinyLlama(DataModule): num_workers: int = 8 """How many DataLoader processes to use for loading.""" + tokenizer: Optional[Tokenizer] = field(init=False, repr=False, default=None) batch_size: int = field(init=False, repr=False, default=1) seq_length: int = field(init=False, repr=False, default=2048) def __post_init__(self): # Could be a remote path (s3://) or a local path - self.slimpajama_train = str(self.data_path).rstrip("/") + "/slimpajama/train" - self.slimpajama_val = str(self.data_path).rstrip("/") + "/slimpajama/val" - self.starcoder_train = str(self.data_path).rstrip("/") + "/starcoder" + self.slimpajama_train = os.path.join(str(self.data_path), "slimpajama", "train") + self.slimpajama_val = os.path.join(str(self.data_path), "slimpajama", "val") + self.starcoder_train = os.path.join(str(self.data_path), "starcoder") def connect( self, tokenizer: Optional[Tokenizer] = None, batch_size: int = 1, max_seq_length: Optional[int] = None ) -> None: + self.tokenizer = tokenizer self.batch_size = batch_size - self.seq_length = max_seq_length + 1 # Increase by one because we need the next token as well + if max_seq_length: + self.seq_length = max_seq_length + 1 # Increase by one because we need the next token as well def prepare_data(self) -> None: - for path in (self.slimpajama_train, self.slimpajama_val, self.starcoder_train): - if not path.startswith("s3://") and not Path(path).is_dir(): - raise FileNotFoundError( - "The data path for TinyLlama is expected to be the directory containing these subdirectories:" - f" `slimpajama/train`, `slimpajama/val`, `starcoder`. The directory {path} does not exist." - " Set it via `--data.data_path=...`" - ) + # for path in (self.slimpajama_train, self.slimpajama_val, self.starcoder_train): + # if not path.startswith("s3://") and not Path(path).is_dir(): + # raise FileNotFoundError( + # "The data path for TinyLlama is expected to be the directory containing these subdirectories:" + # f" `slimpajama/train`, `slimpajama/val`, `starcoder`. The directory {path} does not exist." + # " Set it via `--data.data_path=...`" + # ) + + prepare_slimpajama( + input_dir=os.path.join(self.data_path, "slimpajama-raw/train"), + output_dir=self.slimpajama_train, + tokenizer=self.tokenizer, + ) + prepare_slimpajama( + input_dir=os.path.join(self.data_path, "slimpajama-raw/validation"), + output_dir=self.slimpajama_val, + tokenizer=self.tokenizer, + ) + prepare_starcoder( + input_dir=os.path.join(self.data_path, "starcoderdata-raw"), + output_dir=self.starcoder_train, + tokenizer=self.tokenizer, + ) def train_dataloader(self) -> DataLoader: from litdata.streaming import CombinedStreamingDataset, StreamingDataLoader, StreamingDataset, TokensLoader @@ -89,3 +112,55 @@ def val_dataloader(self) -> DataLoader: val_dataset, batch_size=self.batch_size, pin_memory=True, num_workers=self.num_workers, drop_last=True ) return val_dataloader + + +def prepare_slimpajama(input_dir: str, output_dir: str, tokenizer: Tokenizer) -> None: + from litdata import optimize + import zstandard as zstd + + def process(filepath: str) -> Generator: + with zstd.open(open(filepath, "rb"), "rt", encoding="utf-8") as f: + for row in f: + text = json.loads(row)["text"] + if json.loads(row)["meta"]["redpajama_set_name"] == "RedPajamaGithub": + continue # exclude the GitHub data since it overlaps with starcoder + text_ids = tokenizer.encode(text, bos=False, eos=True) + yield text_ids + + optimize( + fn=process, + inputs=[str(file) for file in Path(input_dir).rglob("*.zst")], + output_dir=output_dir, + chunk_bytes="100MB", # TODO: find a good value, chunk_size = (2049 * 16384), + num_workers=os.cpu_count(), + num_downloaders=1, + fast_dev_run=False, + ) + + +def prepare_starcoder(input_dir: str, output_dir: str, tokenizer: Tokenizer) -> None: + from litdata import optimize + import pyarrow.parquet as pq + + def process(filepath: str) -> Generator: + try: + parquet_file = pq.ParquetFile(filepath) + # Reduce RAM usage + for batch in parquet_file.iter_batches(batch_size=8192, columns=["content"]): + for text in batch.to_pandas()["content"]: + yield tokenizer.encode(text, bos=False, eos=True) + except: + print(traceback.format_exc()) + print(f"Error reading {filepath}") + return + parquet_file.close() + + optimize( + fn=process, + inputs=[str(file) for file in Path(input_dir).rglob("*.parquet")], + output_dir=output_dir, + chunk_bytes="100MB", # TODO: find a good value, chunk_size = (2049 * 8192), + num_workers=os.cpu_count(), + num_downloaders=1, + fast_dev_run=False, + ) diff --git a/litgpt/scripts/prepare.py b/litgpt/scripts/prepare.py new file mode 100644 index 0000000000..e8174dbfb1 --- /dev/null +++ b/litgpt/scripts/prepare.py @@ -0,0 +1,28 @@ +# Copyright Lightning AI. Licensed under the Apache License 2.0, see LICENSE file. +from pathlib import Path +from typing import Optional + +from lightning_utilities import is_overridden +from litgpt import Tokenizer +from litgpt.data import LitDataModule +from litgpt.utils import CLI + + +def prepare( + data: LitDataModule, + tokenizer_dir: Optional[Path], + max_seq_length: Optional[int] = None +) -> None: + + if not is_overridden("prepare_data", data, LitDataModule): + raise ValueError( + f"The {type(data).__name__} data module does not support preparing the data in advance." + ) + + tokenizer = Tokenizer(tokenizer_dir) + data.connect(tokenizer=tokenizer, batch_size=1, max_seq_length=max_seq_length) + data.prepare_data() + + +if __name__ == "__main__": + CLI(prepare) diff --git a/tutorials/pretrain_tinyllama.md b/tutorials/pretrain_tinyllama.md index f4976ee097..2c91187839 100644 --- a/tutorials/pretrain_tinyllama.md +++ b/tutorials/pretrain_tinyllama.md @@ -52,7 +52,7 @@ In order to start pretraining litgpt on it, you need to read, tokenize, and writ First, install additional dependencies for preprocessing: ```bash -pip install '.[all]' +pip install litgpt '.[all]' ``` You will need to have the tokenizer config available: @@ -64,38 +64,16 @@ litgpt download \ --tokenizer_only true ``` -Then, run the preprocessing script for each dataset and split. -You will require **1.1 TB** of disk space for Starcoder and **2.5** TB of space for the SlimPajama dataset. - -**Starcoder:** - -```bash -python litgpt/data/prepare_starcoder.py \ - --input_dir data/starcoderdata-raw \ - --output_dir data/starcoder \ - --tokenizer_path checkpoints/meta-llama/Llama-2-7b-hf -``` - -**SlimPajama:** +Then, run the preprocessing command by pointing to the directory where the data was downloaded. +You will require and additional **1.1 TB** of disk space for Starcoder and **2.5** TB of space for the SlimPajama dataset. ```bash -python litgpt/data/prepare_slimpajama.py \ - --input_dir data/slimpajama-raw/validation \ - --output_dir data/slimpajama/val \ - --tokenizer_path checkpoints/meta-llama/Llama-2-7b-hf - -python litgpt/data/prepare_slimpajama.py \ - --input_dir data/slimpajama-raw/test \ - --output_dir data/slimpajama/test \ - --tokenizer_path checkpoints/meta-llama/Llama-2-7b-hf - -python litgpt/data/prepare_slimpajama.py \ - --input_dir data/slimpajama-raw/train \ - --output_dir data/slimpajama/train \ +litgpt prepare \ + --data TinyLlama \ + --data.data_path data \ --tokenizer_path checkpoints/meta-llama/Llama-2-7b-hf ``` -If you want to run on a small slice of the datasets first, pass the flag `--fast_dev_run=true` to the commands above. In the above we are assuming that you will be using the same tokenizer as used in LlaMA/TinyLlama, but any trained [SentencePiece](https://github.com/google/sentencepiece) tokenizer with a 32000 vocabulary size will do here.