In [24]:
import polars as pl
import os
from tqdm import tqdm
import chardet
import re
from dataclasses import dataclass
from typing import List, Optional

In [None]:
texts_dir = os.path.expanduser("~/Downloads/raw_texts")
all_text_files = [os.path.join(texts_dir, f) for f in os.listdir(texts_dir) if f.endswith(".txt")]

script_items = {
    "filename": [],
    "contents": []
}

for filename in tqdm(all_text_files):
    with open(filename, "rb") as f:
        raw_data = f.read()
        encoding = chardet.detect(raw_data)["encoding"]
    with open(filename, "r", encoding=encoding) as f:
        script_items["filename"].append(filename)
        script_items["contents"].append(f.read())

In [128]:

@dataclass
class DialogueLine:
    character: str
    content: str
    parenthetical: Optional[str] = None
    line_number: int = 0

@dataclass
class SceneElement:
    type: str  # Can be "scene_heading", "action", "dialogue", "transition"
    content: str
    line_number: int
    dialogue_data: Optional[DialogueLine] = None  # Used only for dialogue elements

@dataclass
class Scene:
    heading: SceneElement
    elements: List[SceneElement]

class ScreenplayParser:
    def __init__(self, text: str):
        self.lines = text.split('\n')
        self.current_line = 0
        self.total_lines = len(self.lines)
        self.scenes: List[Scene] = []
        
    def parse(self) -> List[Scene]:
        """Parse the entire screenplay and return a list of scenes."""
        # Skip any leading blank lines or title page content
        while self.current_line < self.total_lines:
            if self._is_scene_heading(self.current_line):
                self._parse_scene()
            self.current_line += 1
        return self.scenes
    
    def _is_scene_heading(self, line_num: int) -> bool:
        """Check if the current line is a scene heading."""
        line = self.lines[line_num].strip()
        return bool(re.match(r'^(INT\.|EXT\.).+', line))
    
    def _is_character_name(self, line_num: int) -> bool:
        """Check if the current line is a character name."""
        line = self.lines[line_num].strip()
        return bool(line and line.isupper() and not line.startswith('(') 
                   and not self._is_scene_heading(line_num)
                   and not self._is_transition(line_num))
    
    def _is_parenthetical(self, line_num: int) -> bool:
        """Check if the current line is a parenthetical."""
        line = self.lines[line_num].strip()
        return line.startswith('(') and line.endswith(')')
    
    def _is_transition(self, line_num: int) -> bool:
        """Check if the current line is a transition."""
        line = self.lines[line_num].strip()
        return bool(re.match(r'^(FADE|DISSOLVE|CUT).+', line))
    
    def _merge_consecutive_elements(self, elements: List[SceneElement]) -> List[SceneElement]:
        """Merge consecutive elements of the same type."""
        if not elements:
            return elements
            
        merged = []
        current = elements[0]
        current_lines = [current.content]
        current_start_line = current.line_number
        
        for next_elem in elements[1:]:
            if (next_elem.type == current.type and 
                next_elem.line_number == current_start_line + len(current_lines)):
                current_lines.append(next_elem.content)
            else:
                merged.append(SceneElement(
                    type=current.type,
                    content=" ".join(current_lines),
                    line_number=current_start_line,
                    dialogue_data=current.dialogue_data
                ))
                current = next_elem
                current_lines = [current.content]
                current_start_line = current.line_number
                
        # Don't forget to add the last group
        merged.append(SceneElement(
            type=current.type,
            content=" ".join(current_lines),
            line_number=current_start_line,
            dialogue_data=current.dialogue_data
        ))
        
        return merged
    
    def _parse_dialogue_block(self) -> SceneElement:
        """Parse a complete dialogue block including character name, parentheticals, and dialogue."""
        character_line = self.current_line
        character = self.lines[self.current_line].strip()
        self.current_line += 1
        
        parenthetical = None
        dialogue_lines = []
        
        while self.current_line < self.total_lines:
            line = self.lines[self.current_line].strip()
            
            if not line:
                break
                
            if self._is_parenthetical(self.current_line):
                parenthetical = line
            else:
                dialogue_lines.append(line)
                
            self.current_line += 1
            
            # Check next line - if blank or new character/scene heading, end dialogue block
            if self.current_line >= self.total_lines:
                break
            next_line = self.lines[self.current_line].strip()
            if (not next_line or self._is_character_name(self.current_line) 
                or self._is_scene_heading(self.current_line)):
                break
        
        dialogue_content = " ".join(dialogue_lines)
        
        return SceneElement(
            type="dialogue",
            content=dialogue_content,
            line_number=character_line,
            dialogue_data=DialogueLine(
                character=character,
                content=dialogue_content,
                parenthetical=parenthetical,
                line_number=character_line
            )
        )
    
    def _parse_scene(self):
        """Parse a single scene."""
        # Parse scene heading
        heading = SceneElement(
            type="scene_heading",
            content=self.lines[self.current_line].strip(),
            line_number=self.current_line
        )
        
        elements = []
        self.current_line += 1
        
        # Parse scene content until we hit the next scene heading or end of script
        while self.current_line < self.total_lines:
            if self._is_scene_heading(self.current_line):
                break
                
            line = self.lines[self.current_line].strip()
            
            # Skip blank lines
            if not line:
                self.current_line += 1
                continue
                
            if self._is_character_name(self.current_line):
                dialogue_element = self._parse_dialogue_block()
                elements.append(dialogue_element)
            elif self._is_transition(self.current_line):
                elements.append(SceneElement(
                    type="transition",
                    content=line,
                    line_number=self.current_line
                ))
                self.current_line += 1
            else:
                # Assume it's action description
                elements.append(SceneElement(
                    type="action",
                    content=line,
                    line_number=self.current_line
                ))
                self.current_line += 1
                
        # Merge consecutive elements before adding to scene
        elements = self._merge_consecutive_elements(elements)
        self.scenes.append(Scene(heading=heading, elements=elements))

In [1]:
template = """{%- if messages[0][\"role\"] == \"system\" %}\n    {%- set system_message = messages[0][\"content\"] %}\n    {%- set loop_messages = messages[1:] %}\n{%- else %}\n    {%- set loop_messages = messages %}\n{%- endif %}\n{%- if not tools is defined %}\n    {%- set tools = none %}\n{%- endif %}\n{%- set user_messages = loop_messages | selectattr(\"role\", \"equalto\", \"user\") | list %}\n\n{#- This block checks for alternating user/assistant messages, skipping tool calling messages #}\n{%- set ns = namespace() %}\n{%- set ns.index = 0 %}\n{%- for message in loop_messages %}\n    {%- if not (message.role == \"tool\" or message.role == \"tool_results\" or (message.tool_calls is defined and message.tool_calls is not none)) %}\n        {%- if (message[\"role\"] == \"user\") != (ns.index % 2 == 0) %}\n            {{- raise_exception(\"After the optional system message, conversation roles must alternate user/assistant/user/assistant/...\") }}\n        {%- endif %}\n        {%- set ns.index = ns.index + 1 %}\n    {%- endif %}\n{%- endfor %}\n\n{{- bos_token }}\n{%- for message in loop_messages %}\n    {%- if message[\"role\"] == \"user\" %}\n        {%- if tools is not none and (message == user_messages[-1]) %}\n            {{- \"[AVAILABLE_TOOLS][\" }}\n            {%- for tool in tools %}\n                {%- set tool = tool.function %}\n                {{- '{\"type\": \"function\", \"function\": {' }}\n                {%- for key, val in tool.items() if key != \"return\" %}\n                    {%- if val is string %}\n                        {{- '\"' + key + '\": \"' + val + '\"' }}\n                    {%- else %}\n                        {{- '\"' + key + '\": ' + val|tojson }}\n                    {%- endif %}\n                    {%- if not loop.last %}\n                        {{- \", \" }}\n                    {%- endif %}\n                {%- endfor %}\n                {{- \"}}\" }}\n                {%- if not loop.last %}\n                    {{- \", \" }}\n                {%- else %}\n                    {{- \"]\" }}\n                {%- endif %}\n            {%- endfor %}\n            {{- \"[/AVAILABLE_TOOLS]\" }}\n            {%- endif %}\n        {%- if loop.last and system_message is defined %}\n            {{- \"[INST]\" + system_message + \"\\n\\n\" + message[\"content\"] + \"[/INST]\" }}\n        {%- else %}\n            {{- \"[INST]\" + message[\"content\"] + \"[/INST]\" }}\n        {%- endif %}\n    {%- elif (message.tool_calls is defined and message.tool_calls is not none) %}\n        {{- \"[TOOL_CALLS][\" }}\n        {%- for tool_call in message.tool_calls %}\n            {%- set out = tool_call.function|tojson %}\n            {{- out[:-1] }}\n            {%- if not tool_call.id is defined or tool_call.id|length != 9 %}\n                {{- raise_exception(\"Tool call IDs should be alphanumeric strings with length 9!\") }}\n            {%- endif %}\n            {{- ', \"id\": \"' + tool_call.id + '\"}' }}\n            {%- if not loop.last %}\n                {{- \", \" }}\n            {%- else %}\n                {{- \"]\" + eos_token }}\n            {%- endif %}\n        {%- endfor %}\n    {%- elif message[\"role\"] == \"assistant\" %}\n        {{- message[\"content\"] + eos_token}}\n    {%- elif message[\"role\"] == \"tool_results\" or message[\"role\"] == \"tool\" %}\n        {%- if message.content is defined and message.content.content is defined %}\n            {%- set content = message.content.content %}\n        {%- else %}\n            {%- set content = message.content %}\n        {%- endif %}\n        {{- '[TOOL_RESULTS]{\"content\": ' + content|string + \", \" }}\n        {%- if not message.tool_call_id is defined or message.tool_call_id|length != 9 %}\n            {{- raise_exception(\"Tool call IDs should be alphanumeric strings with length 9!\") }}\n        {%- endif %}\n        {{- '\"call_id\": \"' + message.tool_call_id + '\"}[/TOOL_RESULTS]' }}\n    {%- else %}\n        {{- raise_exception(\"Only user and assistant roles are supported, with the exception of an initial optional system message!\") }}\n    {%- endif %}\n{%- endfor %}\n
"""

template = template.split("\n")
with open("template.jinja", "w") as f:
    f.write("\n".join(template))