In [None]:
import pandas as pd
import os
import re
import spacy
import logging
from charset_normalizer import from_path
from gensim.utils import simple_preprocess
from gensim.corpora.dictionary import Dictionary
from gensim.models import LdaModel
from gensim.parsing.preprocessing import STOPWORDS
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

In [None]:
# set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# load spaCy model
nlp = spacy.load('en_core_web_sm')

In [None]:
# function to read screenplay
def read_screenplay(file_path):
    try:
        result = from_path(file_path).best()
        with open(file_path, 'r', encoding=result.encoding) as file:
            lines = file.readlines()
        logging.info(f"Successfully read file {file_path}")
        return ''.join(lines[1:])
    except Exception as e:
        logging.error(f"Error reading file {file_path}: {str(e)}")
        return None

In [None]:
# function to separate scenes
def separate_scenes(text):
    try:
        scenes = []
        raw_scenes = [scene.strip() for scene in text.split('=' * 50) if scene.strip()]
        for raw_scene in raw_scenes:
            scene_lines = raw_scene.split('\n')
            scene_text = '\n'.join(scene_lines[1:]).strip()
            scenes.append(scene_text)
        logging.info(f"Separated text into {len(scenes)} scenes")
        return scenes
    except Exception as e:
        logging.error(f"Error separating scenes: {str(e)}")
        return []

In [None]:
# function to merge scenes with less than 100 words with following scene
def merge_short_scenes(scenes, min_words=100):
    merged_scenes = []
    current_scene = ""
    for scene in scenes:
        current_scene_word_count = len(current_scene.split())
        scene_word_count = len(scene.split())
        if current_scene_word_count + scene_word_count < min_words:
            current_scene += " " + scene
        else:
            if current_scene:
                merged_scenes.append(current_scene.strip())
            current_scene = scene
    if current_scene:
        merged_scenes.append(current_scene.strip())
    logging.info(f"Merged scenes into {len(merged_scenes)} longer scenes")
    
    return merged_scenes

In [None]:
# function to identify character names with regex (character names before dialogue lines are in uppercase in most screenplays)
def identify_character_names(text):
    character_name_pattern = re.compile(r'\n\s*([A-Z][A-Z\s]+)\s*\n')
    potential_characters = character_name_pattern.findall(text)
    cleaned_characters = [re.sub(r'\s+$', '', char) for char in potential_characters]
    
    return cleaned_characters

In [None]:
# function to remove character names, preprocess text
def preprocess_text(text):
    character_names = identify_character_names(text)
    for name in character_names:
        text = text.replace(name, '')
    text = text.lower()
    # replace multiple consecutive whitespace characters with a space
    text = re.sub(r'\s+', ' ', text)
    # replace all non-word characters with a space
    text = re.sub(r'\W', ' ', text)
    # process cleaned text using spaCy
    doc = nlp(text)
    # extract tokens from processed text, but only include alphabetic tokens
    tokens = [token.text for token in doc if token.is_alpha]
    
    # join tokens back into single string
    return ' '.join(tokens)

In [None]:
# function to remove stopwords
def remove_stopwords(texts):
    return [[word for word in simple_preprocess(str(doc)) if word not in STOPWORDS] for doc in texts]

In [None]:
# function to train LDA model
def train_lda_model(texts, num_topics=10, passes=10):
    # create dictionary from texts
    dictionary = Dictionary(texts)
    # filter out words that appear in fewer than 5 documents or more than 50% of documents
    dictionary.filter_extremes(no_below=5, no_above=0.5)
    # convert texts into bag-of-words format (list of (word_id, word_count) for each document)
    corpus = [dictionary.doc2bow(text) for text in texts]
    # train lda model on corpus with specified number of topics and passes
    lda_model = LdaModel(corpus=corpus, num_topics=num_topics, id2word=dictionary, passes=passes)
    
    return lda_model, dictionary, corpus

In [None]:
# function to get dominant topic
def get_dominant_topic(lda_model, text, dictionary):
    # convert text into bag-of-words
    bow = dictionary.doc2bow(text)
    # get topic distribution for text from lda model
    topic_distribution = lda_model.get_document_topics(bow)
    # find dominant topic and return its index
    dominant_topic = max(topic_distribution, key=lambda x: x[1])[0]
    
    return dominant_topic

In [None]:
# function to find dominant topic in each document
def assign_dominant_topics(texts, lda_model, dictionary):
    dominant_topics = [get_dominant_topic(lda_model, text, dictionary) for text in texts]
    
    return dominant_topics

In [None]:
# function to compute topic coherence scores
def compute_topic_coherence(dominant_topics, lda_model):
    topic_vectors = lda_model.get_topics()
    # normalize vectors
    topic_vectors = topic_vectors / np.linalg.norm(topic_vectors, axis=1, keepdims=True)
    # get vectors for dominant topics
    scene_vectors = [topic_vectors[topic] for topic in dominant_topics]
    # compute pairwise cosine similarities
    similarities = cosine_similarity(scene_vectors)
    # compute average similarity (excluding self-similarity)
    num_scenes = len(scene_vectors)
    # subtract diagonal (self-similarity)
    sum_similarities = np.sum(similarities) - num_scenes
    avg_similarity = sum_similarities / (num_scenes * (num_scenes - 1))
    
    return avg_similarity

In [None]:
def process_screenplay(filename, screenplay_folder):
    try:
        file_path = os.path.join(screenplay_folder, filename)
        pattern = re.compile(r'_0*(\d+)\.txt$')
        match = pattern.search(filename)
        if not match:
            logging.warning(f"Could not extract imdbid from filename: {filename}")
            return None, None

        imdbid = match.group(1)
        text = read_screenplay(file_path)
        if text is None:
            return None, None

        scenes = separate_scenes(text)
        merged_scenes = merge_short_scenes(scenes, min_words=100)
        preprocessed_scenes = [preprocess_text(scene) for scene in merged_scenes]
        preprocessed_scenes = remove_stopwords(preprocessed_scenes)
        lda_model, dictionary, corpus = train_lda_model(preprocessed_scenes, num_topics=10, passes=10)
        dominant_topics = assign_dominant_topics(preprocessed_scenes, lda_model, dictionary)
        overall_coherence = compute_topic_coherence(dominant_topics, lda_model)
        
        logging.info(f"Processed {filename}: imdbid={imdbid}, coherence={overall_coherence}")
        return int(imdbid), overall_coherence
    except Exception as e:
        logging.error(f"Error processing {filename}: {str(e)}")
        return None, None

In [None]:
def process_screenplay(filename, screenplay_folder):
    try:
        # construct full file path for screenplay
        file_path = os.path.join(screenplay_folder, filename)        
        # define regex pattern to extract imdbid from filename
        pattern = re.compile(r'_0*(\d+)\.txt$')        
        # apply regex pattern to filename to find imdbid
        match = pattern.search(filename)
        if not match:
            # log warning if imdbid can't be extracted and return None
            logging.warning(f"Could not extract imdbid from filename: {filename}")
            return None, None

        # extract imdbid from filename
        imdbid = match.group(1)
        
        # read screenplay content from file
        text = read_screenplay(file_path)
        if text is None:
            return None, None

        # split screenplay into scenes
        scenes = separate_scenes(text)        
        # merge scenes shorter than 100 words
        merged_scenes = merge_short_scenes(scenes, min_words=100)        
        # preprocess each scene (e.g., lowercasing, removing non-alpha characters)
        preprocessed_scenes = [preprocess_text(scene) for scene in merged_scenes]        
        # remove stopwords from preprocessed scenes
        preprocessed_scenes = remove_stopwords(preprocessed_scenes)        
        # train lda model on preprocessed scenes
        lda_model, dictionary, corpus = train_lda_model(preprocessed_scenes, num_topics=10, passes=10)        
        # assign dominant topic to each scene
        dominant_topics = assign_dominant_topics(preprocessed_scenes, lda_model, dictionary)        
        # calculate topic coherence score based on dominant topics
        overall_coherence = compute_topic_coherence(dominant_topics, lda_model)        
        # log imdbid and coherence score
        logging.info(f"Processed {filename}: imdbid={imdbid}, coherence={overall_coherence}")
        
        # return imdbid and overall coherence score
        return int(imdbid), overall_coherence

    except Exception as e:
        # log error message if an exception occurs during processing
        logging.error(f"Error processing {filename}: {str(e)}")
        return None, None

In [None]:
if __name__ == "__main__":
    try:
        # load metadata df
        df = pd.read_csv('data/movie_metadata_final.csv')

        # folder containing screenplay files
        screenplay_folder = 'data/screenplay_data/data/scene_separated_texts'

        # list to store results
        results = []

        # iterate over screenplay files sequentially
        for filename in os.listdir(screenplay_folder):
            if filename.endswith('.txt'):
                result = process_screenplay(filename, screenplay_folder)
                if result[0] is not None and result[1] is not None:
                    results.append(result)

        # create new df to store imdbid and overall_coherence
        coherence_df = pd.DataFrame(results, columns=['imdbid', 'overall_coherence'])

        # drop rows where imdbid or overall_coherence is None
        coherence_df.dropna(inplace=True)

        # save results in df
        coherence_df.to_csv('data/movie_coherence_scores.csv', index=False)

        # merge coherence scores back into original metadata df
        df = df.merge(coherence_df, on='imdbid', how='left')

        # save updated df
        df.to_csv('data/movie_metadata_with_coherence.csv', index=False)
        
        logging.info("Finished processing all screenplays and saved results.")
    except Exception as e:
        logging.error(f"Error in main execution: {str(e)}")