In [1]:
!pip install datasets



In [2]:
from datasets import load_dataset
import pandas as pd
from typing import List, Dict, Optional, Callable, Union
import json

class DatasetProcessor:
    def __init__(self):
        self.processed_datasets = []
        self.transformers = {
            "conversation": self._transform_conversation,
            "basic": self._transform_basic
        }

    def _transform_conversation(self, item: Union[Dict, List, str]) -> Optional[Dict]:
        """
        会話形式のデータを変換します。
        様々な形式に対応：
        - Dict形式: {"messages": [...]}
        - List形式: [{"role": "user", "content": ...}, ...]
        - 文字列形式: 直接のテキストデータ
        """
        try:
            #文字列の場合
            if isinstance(item, str):
                return {
                    "instruction": item,
                    "input": None,
                    "output": None
                }

            #リストの場合
            if isinstance(item, list):
                messages = item
            #辞書の場合
            elif isinstance(item, dict):
                #messagesキーがある場合
                if "messages" in item:
                    messages = item["messages"]
                #roleとcontentキーがある場合
                elif "role" in item and "content" in item:
                    messages = [item]
                else:
                    messages = [item]
            else:
                return None

            #メッセージの分類
            user_messages = []
            assistant_messages = []

            for msg in messages:
                if isinstance(msg, dict):
                    role = msg.get("role", "")
                    content = msg.get("content", "")

                    if role == "user":
                        user_messages.append(content)
                    elif role == "assistant":
                        assistant_messages.append(content)

            if not user_messages or not assistant_messages:
                return None

            return {
                "instruction": user_messages[0],
                "input": " ".join(user_messages[1:]) if len(user_messages) > 1 else None,
                "output": " ".join(assistant_messages)
            }
        except Exception as e:
            print(f"Error in transform_conversation: {str(e)}")
            return None

    def _transform_basic(self, item: Dict, instruction_column: str, output_column: str, input_column: Optional[str] = None) -> Optional[Dict]:
        """
        基本的なデータ形式を変換します
        """
        try:
            instruction = item[instruction_column] if instruction_column in item else None
            output = item[output_column] if output_column in item else None
            input_text = item[input_column] if input_column and input_column in item else None

            if instruction is None or output is None:
                return None

            return {
                "instruction": instruction,
                "output": output,
                "input": input_text
            }
        except Exception as e:
            print(f"Error in transform_basic: {str(e)}")
            return None

    def load_and_process_dataset(
        self,
        dataset_name: str,
        instruction_column: str,
        output_column: str,
        input_column: Optional[str] = None,
        split: str = "train",
        transform_type: str = "basic",
        custom_transformer: Optional[Callable] = None
    ) -> None:
        """
        指定されたHugging Faceデータセットを読み込んで処理します
        """
        try:
            dataset = load_dataset(dataset_name, split=split)
            processed_data = []

            transformer = custom_transformer if custom_transformer else self.transformers.get(transform_type)
            if not transformer:
                raise ValueError(f"Unknown transform_type: {transform_type}")

            for item in dataset:
                try:
                    if transform_type == "basic":
                        processed_item = transformer(
                            item, instruction_column, output_column, input_column
                        )
                    else:
                        target_data = item[instruction_column] if instruction_column else item
                        processed_item = transformer(target_data)

                    if processed_item:
                        processed_data.append(processed_item)
                except Exception as e:
                    print(f"Error processing item in {dataset_name}: {str(e)}")
                    continue

            self.processed_datasets.extend(processed_data)
            print(f"Successfully processed {dataset_name}: {len(processed_data)} items")

        except Exception as e:
            print(f"Error processing dataset {dataset_name}: {str(e)}")

    def save_combined_dataset(
        self,
        output_path: str,
        format: str = "json"
    ) -> None:
        """
        処理したデータセットを保存します
        """
        if not self.processed_datasets:
            print("No datasets to save")
            return

        try:
            if format.lower() == "json":
                with open(output_path, 'w', encoding='utf-8') as f:
                    json.dump(self.processed_datasets, f, ensure_ascii=False, indent=2)

            elif format.lower() == "csv":
                df = pd.DataFrame(self.processed_datasets)
                df.to_csv(output_path, index=False)

            print(f"Successfully saved combined dataset to {output_path}")
            print(f"Total number of items: {len(self.processed_datasets)}")

        except Exception as e:
            print(f"Error saving dataset: {str(e)}")

    def get_stats(self) -> Dict:
        """
        処理したデータセットの統計情報を返します
        """
        if not self.processed_datasets:
            return {"total_items": 0}

        stats = {
            "total_items": len(self.processed_datasets),
            "items_with_input": sum(1 for item in self.processed_datasets if item["input"] is not None),
            "items_without_input": sum(1 for item in self.processed_datasets if item["input"] is None),
            "average_instruction_length": sum(len(item["instruction"]) for item in self.processed_datasets) / len(self.processed_datasets),
            "average_output_length": sum(len(item["output"]) for item in self.processed_datasets) / len(self.processed_datasets)
        }
        return stats

In [6]:
# カスタム変換関数を使用する場合
def custom_transformer_aya(item):
    # カスタムの変換ロジック
    return {
        "instruction": item["prompt"][1]["content"],
        "output": item["chosen"]
    }

In [None]:
# プロセッサーのインスタンス化
processor = DatasetProcessor()

processor.load_and_process_dataset(
    dataset_name="GENIAC-Team-Ozaki/Hachi-Alpaca_newans",
    instruction_column="instruction",
    output_column="output",
    input_column="input",
    transform_type="basic"
)

processor.load_and_process_dataset(
    dataset_name="llm-jp/magpie-sft-v1.0",
    instruction_column="conversations",
    output_column="",
    transform_type="conversation"
)


# 結合したデータセットの保存
processor.save_combined_dataset("combined_dataset.json")

Successfully processed GENIAC-Team-Ozaki/Hachi-Alpaca_newans: 27805 items
Successfully processed GENIAC-Team-Ozaki/chatbot-arena-ja-karakuri-lm-8x7b-chat-v0.1-awq: 12474 items
Successfully processed GENIAC-Team-Ozaki/WikiHowNFQA-ja_cleaned: 6545 items
Successfully processed GENIAC-Team-Ozaki/Evol-Alpaca-gen3-500_cleaned: 507 items
Successfully processed GENIAC-Team-Ozaki/oasst2-33k-ja_reformatted: 21560 items
Successfully processed Aratako/SFT-Dataset-For-Self-Taught-Evaluators-iter1: 15640 items
Successfully processed GENIAC-Team-Ozaki/debate_argument_instruction_dataset_ja: 304 items


README.md:   0%|          | 0.00/628 [00:00<?, ?B/s]

(…)-00000-of-00001-157934b4864eb8e0.parquet:   0%|          | 0.00/18.4M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/49332 [00:00<?, ? examples/s]

Successfully processed fujiki/japanese_hh-rlhf-49k: 49332 items
Successfully processed GENIAC-Team-Ozaki/JaGovFaqs-22k: 22794 items
Successfully processed GENIAC-Team-Ozaki/Evol-hh-rlhf-gen3-1k_cleaned: 507 items
Successfully processed DeL-TaiseiOzaki/magpie-qwen2.5-32b-reasoning-100k: 125000 items
Successfully processed DeL-TaiseiOzaki/reasoning-finetuning-ja: 0 items
Successfully processed DeL-TaiseiOzaki/magpie-llm-jp-3-13b-20k: 20000 items
Successfully processed llm-jp/magpie-sft-v1.0: 132476 items


README.md:   0%|          | 0.00/8.79k [00:00<?, ?B/s]

aya-ja-nemotron-dpo-masked_train.jsonl:   0%|          | 0.00/15.1M [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Successfully processed weblab-GENIAC/aya-ja-nemotron-dpo-masked: 5651 items


README.md:   0%|          | 0.00/19.3k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/12.9M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/13881 [00:00<?, ? examples/s]

Successfully processed weblab-GENIAC/Open-Platypus-Japanese-masked: 13881 items
Successfully saved combined dataset to combined_dataset.json
Total number of items: 454476
