## Setup

In [None]:
!pip install -qq datasets lightning tables
!pip install -qq torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

In [None]:
import json
import pickle
from pathlib import Path
from typing import NamedTuple, Literal
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy

import numpy as np
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
from datasets import (
    Dataset,
    Features,
    Value,
    Array2D,
    DatasetInfo,
    load_dataset,
    ClassLabel,
    Sequence,
)
from lightning.pytorch import LightningDataModule


data_dir = Path("sqa_data")

## Loading and assembling the data

In [None]:
def dataset_split(pd_data, test_list):
    data_select_index_test = pd_data["context_source_file"] == "000"

    for test_file_i in test_list:
        data_select_index_test = (pd_data["context_source_file"] == test_file_i) | (
            data_select_index_test
        )

    print("Testing data percentage: ", sum(data_select_index_test) / pd_data.shape[0])
    print("Testing data number: ", sum(data_select_index_test))
    print(
        "Testing data unique scene: ",
        len(pd_data[data_select_index_test].context_index.unique()),
    )

    return data_select_index_test

In [None]:
# loading generated questions pickle
win_len = 1800
base_file_name = f"s1234_{win_len}_600"  # stride 600 is the one from the paper
pd_data = pd.read_pickle(data_dir / f"{base_file_name}_balanced.pkl")
with open(data_dir / f"{base_file_name}_context.pkl", "rb") as f:
    sensory_data = pickle.load(f)

# The OppQA data is split into a training set and a testing set. The
# training set contains SQA data generated on the first two Activity-
# of-Daily-Living (ADL) runs and a drill run of users 1-4, and the
# rest of the runs are used to generate testing data.

### splitting method 1: based on context
valid_list = [
    "S1-ADL1.dat",
    "S2-ADL1.dat",
    "S3-ADL1.dat",
    "S4-ADL1.dat",
    "S1-ADL3.dat",
    "S2-ADL3.dat",
    "S3-ADL3.dat",
    "S4-ADL3.dat",
    "S1-ADL2.dat",
    "S2-ADL2.dat",
    "S3-ADL2.dat",
    "S4-ADL2.dat",
]

train_list = [
    "S1-ADL4.dat",
    "S2-ADL4.dat",
    "S3-ADL4.dat",
    "S4-ADL4.dat",
    "S1-ADL5.dat",
    "S2-ADL5.dat",
    "S3-ADL5.dat",
    "S4-ADL5.dat",
    "S1-Drill.dat",
    "S2-Drill.dat",
    "S3-Drill.dat",
    "S4-Drill.dat",
]

#     ============  split train/valid based on no overlapping context:  ============
train_ind = dataset_split(pd_data, train_list)
valid_ind = dataset_split(pd_data, valid_list)


### splitting method 2: total random
# ============ random split train/valid:  ============
#     random_ind = np.random.rand(pd_data.shape[0])
#     train_ind = random_ind>=0.8
#     valid_ind = ~train_ind

# ====================================================

#     ### splitting method 3: based on q_struct
#     uniq_struct = ( pd_data.question_structure.unique() )
#     print('Total unique Q structure num: ',  len(uniq_struct))
#     # split the unique Q-struct to 50%-50%
#     rd_num = np.random.rand(len(uniq_struct))
#     train_ind_struct = rd_num<0.8
#     test_ind_struct = rd_num>=0.8

#     train_qstruct = uniq_struct[train_ind_struct]
#     # valid_qstruct = uniq_struct[valid_ind]
#     test_qstruct = uniq_struct[test_ind_struct]
#     train_ind = pd_data.question_structure.isin(train_qstruct)
#     valid_ind = pd_data.question_structure.isin(test_qstruct)

#     print('Train/test split:  %d / %d' %(sum(train_ind), sum(valid_ind)) )
#     # ====================================================

In [None]:
train_ind

In [None]:
# The validation is really the test set, i.e. the hold-out for final evaluation.
test_ind = valid_ind
del valid_ind

# Generate a proper validation set
percent_valid = 0.1

# train_ind is effectively a mask, and we want to set some to false and have a copy where those are then true for the validation set
# tghe selection shall be random
valid_ind = np.zeros_like(train_ind)
valid_ind[
    np.random.choice(
        np.where(train_ind)[0], int(percent_valid * np.sum(train_ind)), replace=False
    )
] = True
train_ind[valid_ind] = False

train_ind.sum(), valid_ind.sum(), test_ind.sum()

In [None]:
pd_data.info()

In [None]:
pd_data["answer"].value_counts()

In [None]:
num_multiclass = 17

In [None]:
pd_data.iloc[15000]

In [None]:
sensory_data.keys()

In [None]:
sensory_data["raw"]["S1-ADL1.dat_0"].shape

In [None]:
context_key_list = (
    pd_data["context_source_file"] + "_" + pd_data["context_start_point"].astype(str)
)
sensory_matrix = np.zeros((len(pd_data), win_len, 77), dtype="float32")

for key, values in tqdm(sensory_data["raw"].items()):
    sensory_matrix[np.where(context_key_list == key), :] = values

In [None]:
class Sample(NamedTuple):
    # sample_id: int
    question_id: int
    trajectory: torch.Tensor
    # textual_description: str
    question_type: str
    question: str
    answer_type: str
    answer: str
    # options: dict[str, str | bool] | None
    # correct_option: str


def answer_type(answer: str) -> str:
    if answer in ("Yes", "No"):
        return "binary"
    try:
        int(answer)
        return "count"  # This basically ranges from 0 to 3, each inclusive; could also be treated as "open"
    except ValueError:
        return "multi"


def get_for(data: pd.DataFrame) -> list[Sample]:
    return [
        Sample(
            # sample_id=num,
            question_id=row["question_index"],
            trajectory=sensory_matrix[0, ...],
            # textual_description=data["textual_description"],
            question_type=str(row["question_family_index"]),
            question=row["question"],
            answer_type=answer_type(row["answer"]),
            answer=row["answer"],
            # options=json.dumps(qa_pair["options"]),  # can be None
            # correct_option=qa_pair["correct_option"],
        )
        for _, row in data.iterrows()
    ]


get_for(pd_data.iloc[0:2])

In [None]:
# pd_data

In [None]:
df_all = pd.DataFrame.from_dict(get_for(pd_data))
df_all["question-answer"] = df_all["question"] + " " + df_all["answer"]
df_all

In [None]:
len(df_all)

In [None]:
df_all["question"].isna().any()

In [None]:
df_all["answer_type"].value_counts()

In [None]:
df_all["trajectory-str"] = df_all["trajectory"].apply(lambda x: str(x))
df_all[
    [
        "question_type",
        "answer_type",
        "answer",
        "question",
        "question-answer",
        "trajectory-str",
    ]
].nunique()

In [None]:
trajectory_shape = next(iter(get_for(pd_data.iloc[:1]))).trajectory.shape
trajectory_shape

In [None]:
multi_options = pd.Series(
    pd_data[~pd_data["answer"].isin(("Yes", "No", "0", "1", "2", "3"))][
        "answer"
    ].unique()
).sort_values()
multi_option_to_int = {answer: index for index, answer in enumerate(multi_options)}
multi_option_to_int

In [None]:
def make_info(task: Literal["binary", "open", "multi", "count"]) -> DatasetInfo:
    base_features = {
        # "sample_id": Value("int32"),
        "question_id": Value("int32"),
        "trajectory": Array2D(dtype="float32", shape=trajectory_shape),
        # "textual_description": Value("string"),
        "question_type": Value("string"),
        "question": Value("string"),
        "answer_type": Value("string"),
        # "answer": Value("string"),
        # "options": Value("string"),  # JSON encoded
        # "correct_option": Value("string"),
    }

    match task:
        case "binary":
            answer_features = {
                "answer": ClassLabel(names=["true", "false"], num_classes=2)
            }
        case "multi":
            answer_features = {
                "answer": ClassLabel(
                    names=multi_options.to_list(), num_classes=num_multiclass
                ),
                "options": Sequence(Value("string")),
            }
        case "count":
            answer_features = {"answer": Value("uint8")}
        case "open":
            answer_features = {"answer": Value("string")}
        case _:
            raise ValueError(f"Invalid task '{task}'")

    return DatasetInfo(features=Features(base_features | answer_features))

## Persisting the dataset

In [None]:
def push_grouped_df_to_hub(
    df_group: DataFrameGroupBy,
    split: Literal["test", "val", "train"],
    limit_task: list[Literal["open", "multi", "binary"]] | None = None,
    token: str = None,
):
    """
    Takes a grouped DataFrame, feature types, dataset info, and a Hugging Face authentication token,
    then pushes each group to the Hugging Face Hub under specified configurations.

    :param df_group: Grouped Pandas DataFrame object.
    :param feat_type: Feature type for the dataset.
    :param info: Information about the dataset.
    :param token: Hugging Face authentication token.
    """
    for name, group in df_group:
        if limit_task and name not in limit_task:
            continue
        print(f"Group Name: {name} of split: {split}")

        match name:
            case "binary":
                group["answer"] = (group["answer"] == "Yes").astype(int)

            case "multi":
                # group["answer_index"] = [
                #     multi_option_to_int[ans] for ans in group["answer"]
                # ]
                group["options"] = group["answer"].apply(
                    lambda _: multi_options.to_list()
                )

            case "count":
                group["answer"] = group["answer"].astype(int)

            case _:
                raise ValueError(f"Invalid task name: {name}")

        def gen_it():
            yield from group.to_dict(orient="records")

        info = make_info(name)

        match win_len:
            case 500:
                repo_name = "dasyd/OppQA-500"
            case 1800:
                repo_name = "dasyd/OppQA"
            case _:
                raise ValueError(
                    f"This window length has no dataset attached: {win_len}"
                )

        # Create a dataset from the list of dictionaries and push it to the hub
        dataset = Dataset.from_generator(
            gen_it,
            info=info,  # , gen_kwargs=dict(split=split)
        )
        dataset.push_to_hub(
            repo_name,
            config_name=name,
            token=token,
            split=split,
            #     commit_message=f"[Version Revision] Restructured item shape of {split} split of {name} task dataset",
        )
        print(f"Pushed {name} of split: {split} to Hub {repo_name}")

In [None]:
splits = {
    "train": train_ind,
    "val": valid_ind,
    "test": test_ind,
}

for name, mask in splits.items():
    data = get_for(pd_data[mask])
    df = pd.DataFrame(elem._asdict() for elem in data)
    push_grouped_df_to_hub(
        df.groupby("answer_type"),
        split=name,
        # limit_task=["binary"],
    )

You have to first run:

```shell
huggingface-cli login
```

## Test if it works (this re-downloads the dataset)

In [None]:
from datasets import VerificationMode


class TimeQADataModule(LightningDataModule):
    KEY = "dasyd/OppQA"

    def __init__(
        self,
        batch_size: int = 32,
        task: Literal["binary", "multi", "open"] = "multi",
    ):
        super().__init__()

        self.batch_size = batch_size
        self.task = task

    def _load_dataset_split(self, splits: list[str]):
        """Workaround to overcome the missing hf implementation of only dowloading the split shards"""

        return load_dataset(
            TimeQADataModule.KEY,
            self.task,
            data_dir=self.task,
            data_files={split: f"{split}-*" for split in splits},
            verification_mode=VerificationMode.NO_CHECKS,
            num_proc=len(splits),
        )

    def prepare_data(self) -> None:
        self._load_dataset_split(["val", "train"])

    def setup(self, stage: str) -> None:
        if stage == "fit":
            self.dataset = self._load_dataset_split(["train", "val"])
        elif stage == "test":
            self.dataset = self._load_dataset_split(["test"])

        self.dataset = self.dataset.with_format("torch")

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.dataset["train"], batch_size=self.batch_size, shuffle=True
        )

    def val_dataloader(self) -> DataLoader:
        return DataLoader(self.dataset["val"], batch_size=self.batch_size)

    def test_dataloader(self) -> DataLoader:
        return DataLoader(self.dataset["test"], batch_size=self.batch_size)


module = TimeQADataModule(batch_size=4, task="multi")
module.prepare_data()
module.setup("fit")
# module.setup("fit")
# module.setup("test")

In [None]:
loader = module.train_dataloader()
batch = next(iter(loader))

# list(batch.keys())
batch["trajectory"].shape