## Preprocessing

### Imports

In [8]:
import json
import os
from enum import Enum, auto, unique
from numbers import Number
from typing import (Any, Dict, Iterable, List, Mapping, Optional, Sequence,
                    Tuple, TypeVar, Union)

import torch
import zlib
from contextlib import contextmanager
from dataclasses import dataclass
from itertools import islice, zip_longest
from typing import (Any, Dict, Iterable, List, Optional, Sequence, Tuple,
                    TypeVar, Union)
from transformers import AutoTokenizer, PreTrainedTokenizer, BertTokenizer

from datastructures import (
    IndexedDict,
    IndexedSet,
    Trim,
)

from segments import (
    Segment,
)

 ### Special Tokens
 
 Special Tokens identify parts of narrative like characters and actions. In particular, we can sandwitch cards between special tokens unique to them. 

In [9]:
@unique
class SpecialToken(str, Enum):
    """
    An enumeration of special tokens
    """

    # This method must be defined before calling "auto()"
    # pylint:disable=unused-argument,no-self-argument
    def _generate_next_value_(name, start, count, last_values):
        """
        Automatically create the enumeration name
        """
        return f"<|{name.upper()}|>"

    # pylint:enable=unused-argument,no-self-argument

    @classmethod
    def from_string(cls, name: str):
        """
        Get the associated SpecialToken from the passed in string
        """
        if name == "name":
            # Handle this specially since Enum defines "name" as a string, but
            # we want to use it to extract the field from the data
            name = "name_field"
        print(name)

        return cls(f"<|{name.upper()}|>")

    missing = auto()  # denotes missing information
    separator = auto()  # separator token at the beginning of each Segment
    character = auto()  # a character's biography

    # All the possible card types from <CardNamespace> in the Storium export
    # format: https://storium.com/help/export/json/0.9.2
    chartype = auto()
    goal = auto()
    person = auto()
    place = auto()
    thing = auto()
    strength = auto()
    weakness = auto()
    obstacle = auto()
    subplot = auto()

    # generic attributes for cards, characters, etc
    name_field = auto()  # cannot call it "name", as Enum defines it as well
    description = auto()

    # Contextual card attributes
    failure_stakes = auto()
    success_stakes = auto()

    # Information denoting entry type
    move = auto()
    establishment = auto()
    addition = auto()
    conclusion = auto()

    # some notions of ordering
    previous = auto()  # can stack, e.g. previous + previous => timestep t-2

    # some notions of authorship
    narrator = auto()  # can stack,  e.g. previous + narrator
    same_character = auto()  # can stack,  e.g. previous + same_character
    diff_character = auto()  #  can stack,  e.g. previous + diff_character

    def __str__(self):
        """
        Override the default string method to return the enumeration value,
        which is a string
        """
        return self.value


In [10]:
def extract_string(
    field: str, mapping: Dict[str, Any], default: str = SpecialToken.missing.value
) -> str:
    """
    Extract the given string field, accounting for the potential that it is
    specified as None
    """
    return mapping.get(field, default) or default

### Dataclasses 

The code has dataclasses. Basically when our entire data would be processed, it would be in the form of these dataclasses. 

1. **ProcessedStory** - Our entire story would be in the form of this dataclass. It contains three things: 
    - *characters*
    - *entries*
    - *establishment_entries* 
- These would serve as our main cards. The entries and establishment entries would be in the form of **Entry Info** and character entries would be in the form of **Character Info**. Notice that they are in the form of IndexedDict.

2. *Entry Info* -  Container for entries and establishment entries. 
2. *Character Info* - Container for characters.  

In [11]:
@dataclass
class CharacterInfo:
    """
    The processed character info
    """

    # summary: Segment
    name: str
    description: str
    character_id: str
    checksum: int

    # This is a sorted list of entry ids written by the character to
    # allow easily looking up the previous entries for the character
    entry_ids: IndexedSet

In [12]:
@dataclass
class EntryInfo:
    """
    The processed entry info
    """

    entry_id: str
    character_id: str
    establishment_id: str
    checksum: int
    text: str
    imp_cards: Dict
    format: str
    # text: Segment
    # summary: Segment


In [13]:
@dataclass
class ProcessedStory:
    """
    This defines the structure of a story after processing
    """
    
    game_id: str

    # A mapping of character id to character info
    characters: IndexedDict[CharacterInfo]

    # A mapping of entry id to entry info
    entries: IndexedDict[EntryInfo]

    # A mapping of entry id to establishment's entry info
    establishment_entries: IndexedDict[EntryInfo]

### Checksums 

For each card, there are checksums. This allows for selective reprocessing of data. Only the data that has changed (as indicated by a changed checksum) needs to be reprocessed and re-encoded for training. Hence, with entry associated card, we have a checksum. 

In [14]:
def checksum_card(card: Optional[Dict[str, Any]], checksum: int = 1) -> int:
    """
    Checksum the card.
    """
    if not card:
        return checksum

    for field in ("name", "description", "success_stakes", "failure_stakes"):
        checksum = zlib.adler32(
            extract_string(field, card).encode("utf-8"), checksum
        )

    return checksum

In [15]:
def checksum_cards(cards: List[Dict[str, Any]], checksum: int = 1) -> int:
    """
    Create the summary of a card
    """
    for card in cards:
        checksum = checksum_card(card, checksum)

    return checksum

In [16]:
def checksum_character(character: Dict[str, Any], character_id: str) -> int:
    """
    Compute a checksum of a character
    """
    checksum = zlib.adler32(character_id.encode("utf-8"))
    for field in ("name", "description"):
        checksum = zlib.adler32(
            extract_string(field, character).encode("utf-8"), checksum
        )

    return checksum

In [17]:
def checksum_entry(entry: Dict[str, Any], entry_id: str) -> int:
    """
    Compute a checksum of an entry
    """
    checksum = zlib.adler32(entry_id.encode("utf-8"))
    entry_type = entry["format"]
    if entry_type == "move":
        checksum = checksum_card(entry.get("target_challenge_card"), checksum)
        checksum = checksum_cards(
            entry.get("cards_played_on_challenge", []), checksum
        )
    elif entry_type == "establishment":
        checksum = checksum_card(entry.get("place_card"), checksum)
    elif entry_type == "addition":
        checksum = checksum_cards(entry.get("challenge_cards", []), checksum)

    return zlib.adler32(
        extract_string("description", entry, "").encode("utf-8"), checksum
    )

# The story details 

We now consider the main story processing. This is done by **process_story** function. The workflow of this function is the following: 

1. **Extract Scenes and Characters:** It starts by extracting scenes and characters from the story dictionary. If these are not present or not in the correct format, the function returns the processed object if it exists, effectively skipping processing.
2. **Initialize Character List:**  A list of characters is initialized, starting with a default narrator character entry, which is always present in Storium stories but without a detailed summary (it has a checksum of 0, an empty entry_ids set, and an empty Segment as summary).

3. **Process Characters:** Iterate over each character in the characters list. Generate a character_id from the character_seq_id and prefix it with character.

4. **Process Scenes and Entries:** : Iterate over each scene in scenes, and within each, iterate over its entries. For each entry, compute its checksum and determine if it needs processing based on whether it has changed from the previously processed version. Process the entry using process_entry, which structures the entry's text, and associates it with the relevant character and scene information.

5. **Construct ProcessedStory Object:** Compile the processed data into a ProcessedStory object, containing the structured data for the entire story, including mappings of characters and entries.

The function utilizes another function called **process_entry**. The process_entry function is designed to process a single entry in a narrative or dataset, such as a character's action or a segment of a story, and encapsulate the processed data into an EntryInfo object


In [18]:
def process_entry(
    # tokenizer,
    entry: Dict[str, Any],
    establishment_id: str,
    checksum: int,
    add_eos: bool = True,
    force: bool = False,
) -> Optional[EntryInfo]:
    """
    Process a character entry
    """
    
    text = extract_string("description", entry, "")
    entry_format = entry.get("format")
    if not text and not force and entry.get("format") != "establishment":
        # Only modeling moves with written text, though make a special
        # exception for establishment entries. While they are currently
        # required to have text, it seems at some point there were games that
        # didn't have any text for the establishment entry, though it would still
        # have place cards.
        return None
    
    imp_cards = {}
    # FOR CHARACTER MOVES
    
    imp_cards['target_challenge_card'] = [entry.get("target_challenge_card")]
    imp_cards['cards_played_on_challenge'] = entry.get("cards_played_on_challenge")

    # FOR SCENE CONTINUATION
    imp_cards['challenge_cards'] = entry.get("challenge_cards")

    # PLACE WHEN SCENE STARTS (MIGHT BE NULL)
    imp_cards["place_card"]  = [entry.get("place_card")]

    return EntryInfo(
        checksum=checksum,
        entry_id=entry["seq_id"],
        character_id=entry["role"],
        establishment_id=establishment_id,
        text=text,
        format=entry_format,
        # text=encoded_text,
        imp_cards=imp_cards
    )

In [19]:
def process_story(story: Dict[str, Any], processed: Optional[ProcessedStory] = None):

    scenes = story.get("scenes")
    characters = story.get("characters")

    # If either scenes or characters are missing, or scenes is not a proper sequence,
    # we return previously processed data if available
    if not scenes or not characters or not isinstance(scenes, Sequence):
        return processed
    
    # We now create the character_list. To do this, we first sort the entry_ids using indexedSet(). The character_id
    # is set to the narrator for the first character. 

    character_list = [
        (
            "narrator",
            CharacterInfo(
                name="narrator",
                description="",
                checksum=0, # setting narrator's checksum to 0
                entry_ids=IndexedSet(),
                character_id="narrator", 
                # summary=Segment(),
            ),
        )
    ]
    
    # =============================================================================== # 
    #                           Processing Character Entries 
    # ================================================================================#
    
    # We now Process each character in the story. We obtain the following:
    # - Their ID, their associated checksum, their summary which is tokenized
    # - Finally, we encapsulate all of it in the dataclass CharacterInfo. 
    for character in characters:
        character_id = character.get("character_seq_id")
        if not character_id:
            continue

        character_id = f"character:{character_id}"

        character_info = (
            processed.characters.get(character_id, None) if processed else None
        )
        

        # Compute the checksum for the character
        checksum = checksum_character(character, character_id)
        if not character_info or character_info.checksum != checksum:
            # Haven't processed this character before, so process it now
            character_info = CharacterInfo(
                name=extract_string("name", character),
                description=extract_string("description", character),
                checksum=checksum,
                entry_ids=IndexedSet(),
                character_id=character_id,
                # summary=summarize_character(character,tokenizer),
            )

        character_list.append(
            (
                character_id,
                character_info,
            )
        )
        
    all_characters = IndexedDict(character_list)
    
    # =============================================================================== # 
    #                           Processing Scene Entries 
    # ================================================================================#    
    
    # same as characters. Obtain id, checksum and tokenized summaries and then encpasulate
    # in dataclass entry_info. 
    
    entry_list: List[Tuple[str, EntryInfo]] = []
    establishment_list: List[Tuple[str, EntryInfo]] = []
    
    for scene in scenes:
        entries = scene.get("entries", [])
        if not entries or not isinstance(entries, Sequence):
            continue

        for entry in entries:
            entry_id = entry.get("seq_id", None)
            if entry_id is None:
            
                continue
                
            checksum = checksum_entry(entry, entry_id)
            
            entry_info = (
                processed.entries.get(entry_id, None) if processed else None
            )
            if not entry_info or entry_info.checksum != checksum:
                # Haven't processed this entry before, so process it now
                entry_info = process_entry(
                    # tokenizer,
                    entry,
                    establishment_list[-1][0] if establishment_list else entry_id,
                    checksum,
                )
            if not entry_info:
                continue

            entry_list.append((entry_id, entry_info))
            entry_format = entry.get("format")
            if entry_format == "establishment":
                establishment_list.append((entry_id, entry_info))

            character_info = (
                all_characters[  # pylint:disable=unsubscriptable-object
                    entry["role"]
                ]
            )

            character_info.entry_ids.insert(entry_id)

    return ProcessedStory(
        game_id=story["game_pid"],
        entries=IndexedDict(entry_list), 
        characters=all_characters,
        establishment_entries=IndexedDict(establishment_list),
    )

### Reorganize data for Models

In [21]:
def prepare_data(processed_story):
    """
    Reorganize data for model and return it
    Input: Processed Story
    Output: Reformatted data for use
    """

    first_scene_loc = list(processed_story.establishment_entries.values())[0].imp_cards['place_card'][0]
    last_known_location = {"card_id": first_scene_loc["card_id"], "name": first_scene_loc["name"], "description": first_scene_loc["description"]}  
    bert_data = []

    for entry_info in processed_story.entries.values():  # value is the entryInfo object

        # when establishment entry
        if entry_info.format == "establishment":
            scene_location = entry_info.imp_cards["place_card"][0]

            if scene_location is not None:
                last_known_location = {"card_id": scene_location["card_id"], "name": scene_location["name"], "description": scene_location["description"]}
            else:
                # print(f"entry_id: {entry_id} is establishment but has no place card, so using previous one")
                continue

        elif entry_info.format == "move":

            total_context = processed_story.establishment_entries[entry_info.establishment_id].text + entry_info.text
            character = processed_story.characters[entry_info.character_id]

            try:
                    event = entry_info.imp_cards["cards_played_on_challenge"][0]
            except:
                    # print(f"{entry_info.entry_id} cards_played_on_challenge are null, skipping this entry")
                    continue

            # making the data using the move entries
            bert_data.append({
                "total_context":total_context,
                "event": {"id": event["card_id"], "name": event['name'], "description": event['description']},
                "character": {"id": character.character_id , "name": character.name, "description": character.description},
                "place": {"id": last_known_location["card_id"], "name": last_known_location["name"], "description":last_known_location["description"]},
            })
            
        # scene conclusion and addition entries by narrator are ignored (since there is no event/character options to them for BERT)
        else:
            continue
    
    return bert_data

### Make Training Data 

This creates a folder called "bert_train_dataset" which gives us all training samples for BERT. These were slightly modified to follow the format required for finetuning GPT-3.5 (the storyline guidance model). 

In [None]:
import json

data_folder = "./storium_dataset"  
file_paths = []
with open(f"{data_folder}/train_filenames.txt", "r") as filenames_file:
    for line in filenames_file:
        file_paths.append(line.strip())

for file_path in file_paths:
    with open(f"{data_folder}/{file_path}", 'r', encoding='utf-8') as file:
        story_data = json.load(file)
    
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    processed_story = process_story(story_data)
    
    # Skip stories with less than 5 characters (one is narrator)
    num_chars = len(processed_story.characters)
    if num_chars < 5:
        continue  
    else:
        try:
            bert_data = prepare_data(processed_story)
        except Exception as e:
            print(f"{file_path} has the issue: {e}")
            continue  # Skip this story if there's an issue
        
        bert_data_folder_path = "./bert_train_dataset"
        if not os.path.exists(bert_data_folder_path):
            os.makedirs(bert_data_folder_path)
            print("Directory created successfully.")
        
        filename = file_path.split('/')[-1].rsplit('.', 1)[0] 

        try:
            # Write the bert data to a JSON file
            with open(f"{bert_data_folder_path}/{filename}_bert_data.json", "w") as json_file:
                json.dump(bert_data, json_file, indent=4)
            print(f"Data written to {bert_data_folder_path}/{filename}_bert_data.json")
        except: 
            print(f"{bert_data_folder_path}/{filename}_bert_data.json not opening")