# Imports

In [None]:
# !sudo apt-get install graphviz libgraphviz-dev
# !pip install transformers tqdm chart_studio pygraphviz --upgrade --quiet

In [3]:
# IMPORTS TO RUN NLP CLASSIFIERS
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
from textblob import TextBlob

# # IMPORTS
import pandas as pd
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
tqdm.pandas()
from itertools import permutations, product
import requests
from bs4 import BeautifulSoup
import plotly.express as px
import chart_studio.plotly as py

# # CONNECT TO YOUR GOOGLE DRIVE TO SAVE DATA
from google.colab import drive
drive.mount('/content/drive')


# # GLOBAL VARIABLES
movie_scripts_path = "/content/drive/MyDrive/honours/movie scripts/"
movie_results_path = "/content/drive/MyDrive/honours/results/"
movie_norm_results_path = "/content/drive/MyDrive/honours/norm_results/"
EMOTION_COLUMNS =  ['Sentiment','Intimacy','Irony','Hate','admiration',
 'amusement',
 'anger',
 'annoyance',
 'approval',
 'caring',
 'confusion',
 'curiosity',
 'desire',
 'disappointment',
 'disapproval',
 'disgust',
 'embarrassment',
 'excitement',
 'fear',
 'gratitude',
 'grief',
 'joy',
 'love',
 'nervousness',
 'optimism',
 'pride',
 'realization',
 'relief',
 'remorse',
 'sadness',
 'surprise',
 'neutral']

Mounted at /content/drive


In [None]:
# load models for different emotions [once!]

# add any models here
MODELS = ['cardiffnlp/twitter-roberta-base-hate',
          'cardiffnlp/twitter-roberta-base-irony',
          'cardiffnlp/twitter-roberta-base-intimacy-latest',
          'SamLowe/roberta-base-go_emotions']

for MODEL in MODELS:
  # load model
  tokenizer = AutoTokenizer.from_pretrained(MODEL, truncation=True, max_length=512)
  model = AutoModelForSequenceClassification.from_pretrained(MODEL)

  # save model (once)
  tokenizer.save_pretrained(MODEL)
  model.save_pretrained(MODEL)

# Util functions

In [5]:
def invalid_name(word_before_colon):
  if len(word_before_colon) == 0 or word_before_colon[0] == '[' or word_before_colon[0] == '(':
    return True
  return False

def extract_names(script, threshold=10):
  lines_per_character = {}

  # loop through every line
  for line in script:
    parts = line.split(':')
    word_before_colon = parts[0].strip()
    if invalid_name(word_before_colon): continue

    # count number of lines for every character
    if word_before_colon in lines_per_character:
      lines_per_character[word_before_colon] = lines_per_character[word_before_colon] + 1
    else:
      lines_per_character[word_before_colon] = 0

  # filter for characters with a certain number of lines
  filtered_dict = {key: value for key, value in lines_per_character.items() if value > threshold}
  character_list = list(filtered_dict.keys())

  return character_list, lines_per_character

def score_emotion(tokenizer, model, text):
  encoded_input = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
  output = model(**encoded_input)
  scores = output[0][0].detach().numpy()

  return scores

def irony_sentiment(text='Hello World'):
  path = '/content/cardiffnlp/twitter-roberta-base-irony'

  tokenizer = AutoTokenizer.from_pretrained(path)
  model = AutoModelForSequenceClassification.from_pretrained(path)

  return score_emotion(tokenizer, model, text)

def hate_sentiment(text="Good night 😊"):
  path = '/content/cardiffnlp/twitter-roberta-base-hate'

  tokenizer = AutoTokenizer.from_pretrained(path)
  model = AutoModelForSequenceClassification.from_pretrained(path)

  return score_emotion(tokenizer, model, text)

def intimacy_sentiment(text="The cookie taste delicious"):
  path = "cardiffnlp/twitter-roberta-base-intimacy-latest"

  tokenizer = AutoTokenizer.from_pretrained(path)
  model = AutoModelForSequenceClassification.from_pretrained(path)

  return model(**tokenizer(text, return_tensors="pt")).logits.item()

def emotion_25_sentiment(text="I am not having a good day"):
  path = "SamLowe/roberta-base-go_emotions"

  tokenizer = AutoTokenizer.from_pretrained(path)
  model = AutoModelForSequenceClassification.from_pretrained(path)

  return model(**tokenizer(text, return_tensors="pt")).logits

def sentiment_magic(choice, line, debug=False):
    if choice == 'sentiment':
      polarity = TextBlob(line).sentiment.polarity
      if debug:
        threshold = 0.1
        percentage = 1.0
      return polarity

    if choice == 'hate':
      return hate_sentiment(line)

    if choice == 'irony':
      return irony_sentiment(line)

    if choice == 'intimacy':
      return intimacy_sentiment(line)

    if choice == '25 emotions':
      return pd.Series(emotion_25_sentiment(line)[0].tolist())

# Script data extract and load to Google drive

In [6]:
def data_extracter(path, method):
  # Selects top k characters. Also removes obiously wrong names.
  THRESHOLD = 10

  if method == 'Harry-Potter-Parsing':
    # Open Harry Potter data from Kaggle
    df = pd.read_csv(path)
    df = df[['character', 'dialog']]

    # Rename columns to match my conventions
    df = df.rename(columns={'character': 'Name', 'dialog': 'Line'})

    # Filter the top k characters
    grouped_counts = df.groupby('Name').size().reset_index(name='Count')
    top_k_counts = grouped_counts.sort_values(by='Count', ascending=False).head(THRESHOLD)
    df = df[df['Name'].isin(top_k_counts['Name'])]

  if method == 'Colon-based-parsing':
    # Open txt file from path
    with open(path, 'r') as file:
        script = [line.strip() for line in file.readlines()]

    # Assume all names are before the colon
    names, lines_per_name = extract_names(script, threshold=THRESHOLD)

    # Assume all lines are after the colon
    df = pd.DataFrame({'Name': [line.split(':')[0].strip() for line in script],
                      'Line': [line.strip().split(':')[1].strip() for line in script]})

    # Filter for the top k names. Removes obviously wrong names.
    df = df[df['Name'].isin(names)]

  if method == 'IMSDB-parser':
    # Fetch data
    response_all_scripts = requests.get(path)
    soup = BeautifulSoup(response_all_scripts.text, 'html.parser')
    child_elements = soup.find_all('pre')
    x = child_elements[-1].contents # get the content of the innermost 'pre'

    df = pd.DataFrame(columns=['Name', 'Line'])
    data = []

    # Iterate through child elements and add them to the list
    for index, element in enumerate(x):
      if element.name == 'b':
        name = element.get_text(strip=True)
        try:
          if x[index+1] != None:
            line = x[index+1].strip()
        except Exception as e:
          line = ""
        data.append({'Name': name, 'Line': line})

    # Save result as a dataframe
    df = pd.concat([pd.DataFrame([entry]) for entry in data], ignore_index=True)

    # Apply a small threshold to remove very obviously wrong names
    df = df[df.groupby('Name')['Name'].transform('count') > THRESHOLD]
    df = df[(df['Name'] != '') & (df['Line']!='')]

  # Final check to remove nulls
  df = df.dropna()
  return df

def delete_files_in_dir(path):
  file_list = os.listdir(path)

  for file_name in file_list:
      file_path = os.path.join(movie_results_path, file_name)
      try:
          os.remove(file_path)
          print(f"Deleted: {file_path}")
      except Exception as e:
          print(f"Error deleting {file_path}: {e}")

def update_dataframe(path, col_name, emotion):
  df = pd.read_csv(path)
  df = df[df['Line'].notnull()]
  df[col_name] = df['Line'].progress_apply(lambda x: sentiment_magic(emotion, x))
  return df

## EXAMPLE - extract script to dataframe
# path = movie_scripts_path + 'civil_war.txt'
# df = data_extracter(path, method='Colon-based-parsing')
# df.to_csv(movie_results_path + 'civil_war.csv', index=False)

## EXAMPLE - extract script from IMSDB website
# path = 'https://imsdb.com/scripts/Joker.html'
# data_extracter(path, 'IMSDB-parser')

## EXAMPLE - clear files in the movie results directory
# delete_files_in_dir(movie_results_path)

## EXAMPLE - update dataframes with new sentiment column
# path = movie_results_path + 'civil_war.csv'
# df = update_dataframe(path, 'Intimacy', 'intimacy')
# df.to_csv(path, index=False) #uncomment to overwrite the saved file

## EXAMPLE - update dataframe with 25 emotions
# path = movie_results_path+'civil_war.csv'
# df = pd.read_csv(path)
# df = update_dataframe(path, ['admiration',...,'neutral'], '25 emotions')
# df.to_csv(path, index=False) #uncomment to overwrite the saved file

# Analysis
- filter top k characters by number of lines
- filter length of line to greater than a threhold
- concat/average concurrent same speaker
- calculate average sentiment for every pair of consecutive speakers
- visualize sentiment over time for all characters + host online
- average weights of incoming/outoing nodes per node
- my_min_cut_decomposition + visualization
- community algorithms + visualization

In [93]:
# filter top k characters by number of lines
def filter_top_k_characters(df, threshold=16):
  # Get the top k names
  name_counts = df['Name'].value_counts()
  top_k_names = name_counts.head(threshold).index.tolist()
  filtered_df = df[df['Name'].isin(top_k_names)]
  return filtered_df

# filter length of line to greater than threshold
def filter_length_line(df, threshold=2):
  filtered_df = df[df['Line'].apply(lambda x: len(x)) > threshold]
  return filtered_df

def calculate_average_sentiment(pair_sentiment, pair_count):
  # Calculate the average sentiment for each pair of characters
  average_sentiment = {}
  for pair in pair_sentiment:
      if pair_count[pair] > 0:
          average_sentiment[pair] = pair_sentiment[pair] / pair_count[pair]
      else:
          average_sentiment[pair] = 0

  # Create a new DataFrame from pairs and average sentiment
  result_data = {
      'Character Pair': list(average_sentiment.keys()),
      'Average Sentiment': list(average_sentiment.values())
  }

  result_df = pd.DataFrame(result_data)
  return result_df

# Calculate the average sentiment for each pair of consecutive characters
def calculate_average_sentiment_per_consecutive_pair(df, annotated, emotion='Sentiment'):
  pair_sentiment = {}
  pair_count = {}

  # Loop through and count sentiments
  for i in range(len(df)):
      current_char = df.iloc[i, df.columns.get_loc('Name')]
      next_char = df.iloc[i, df.columns.get_loc('Spoken')] if annotated else df.iloc[i + 1, df.columns.get_loc('Name')]
      sentiment = df.iloc[i, df.columns.get_loc(emotion)]
      pair = (current_char, next_char)

      if pair not in pair_sentiment:
        pair_sentiment[pair] = sentiment
        pair_count[pair] = 1
      else:
        pair_sentiment[pair] += sentiment
        pair_count[pair] += 1

  result_df = calculate_average_sentiment(pair_sentiment, pair_count)
  return result_df


def create_graph_average_sentiment_per_consecutive_pair(df):
  G = nx.DiGraph()

  for idx, row in df.iterrows():
      char_pair = row['Character Pair']
      avg_sentiment = row['Average Sentiment']

      if avg_sentiment >= 0:
        edge_color = 'green'
      else:
        edge_color = 'red'

      # width is scaled to magnitude
      edge_width = abs(avg_sentiment)

      # add character and weight to graph
      G.add_edge(char_pair[0], char_pair[1], weight=avg_sentiment, capacity=avg_sentiment, color=edge_color, width=edge_width)

  return G

def plot_average_sentiment_per_consecutive_pair(G, weight_scale=3):
  pos = nx.circular_layout(G)
  edge_colors = [G[u][v]['color'] for u, v in G.edges()]
  edge_widths = [G[u][v]['width']*weight_scale for u, v in G.edges()]
  nx.draw(G, pos, with_labels=True, edge_color=edge_colors, width=edge_widths)
  plt.show()

def my_min_cut(G):
  min_cut_value = float('inf')
  min_cut_partition = ()

  for source in list(G.nodes()):
    for sink in list(G.nodes()):
      if source != sink:
        cut_value, partition = nx.minimum_cut(G, source, sink)
        if cut_value < min_cut_value:
          min_cut_value = cut_value
          min_cut_partition = partition
  return min_cut_value, min_cut_partition


def my_min_cut_decomposition(G, num_iterations=7, weight_scale=3):
  for step in range(num_iterations): # number of iterations
    # Create subgraphs based on the partition obtained
    min_cut_value, partition = my_min_cut(G)
    subgraph1_nodes = partition[0]
    subgraph2_nodes = partition[1]

    subgraph1 = G.subgraph(subgraph1_nodes)
    subgraph2 = G.subgraph(subgraph2_nodes)

    # Display subgraphs
    plt.figure(figsize=(10, 4))

    plt.subplot(1, 2, 1)
    pos = nx.circular_layout(subgraph1)
    # Use force_atlas2_layout for better positioning without overlaps
    pos = graphviz_layout(subgraph1, prog="neato", root=None)

    nx.draw(subgraph1, pos, with_labels=True, edge_color=['red' if d['weight'] < 0 else 'green' for u, v, d in subgraph1.edges(data=True)],
            width=[abs(d['weight']) * weight_scale for u, v, d in subgraph1.edges(data=True)], node_color='lightgreen', node_size=500, font_weight='bold')
    plt.title(f"Subgraph 1 (Step {step+1})")


    plt.subplot(1, 2, 2)
    pos = nx.circular_layout(subgraph2)
    # Use force_atlas2_layout for better positioning without overlaps
    pos = graphviz_layout(subgraph2, prog="neato", root=None)
    nx.draw(subgraph2, pos, with_labels=True, edge_color=['red' if d['weight'] < 0 else 'green' for u, v, d in subgraph2.edges(data=True)],
            width=[abs(d['weight']) * weight_scale for u, v, d in subgraph2.edges(data=True)], node_color='lightgreen', node_size=500, font_weight='bold')
    plt.title(f"Subgraph 2 - Step {step+1}")

    plt.tight_layout()
    plt.show()

    # Replace G with the larger subgraph
    if len(subgraph1) >= len(subgraph2):
      G = subgraph1.copy()
    else:
      G = subgraph2.copy()

def average_incoming_outgoing_weights_of_every_node(G):
  # Calculate the average weights for incoming and outgoing edges for each node
  average_incoming_weights = {}
  average_outgoing_weights = {}

  for node in G.nodes():
      incoming_weights = [G[predecessor][node]['weight'] for predecessor in G.predecessors(node)]
      outgoing_weights = [G[node][successor]['weight'] for successor in G.successors(node)]

      average_incoming_weights[node] = sum(incoming_weights) / len(incoming_weights) if incoming_weights else 0
      average_outgoing_weights[node] = sum(outgoing_weights) / len(outgoing_weights) if outgoing_weights else 0

  # Create a DataFrame
  avg_df = pd.DataFrame({
      'Character': list(G.nodes()),
      'Average_Incoming_Weight': [average_incoming_weights[node] for node in G.nodes()],
      'Average_Outgoing_Weight': [average_outgoing_weights[node] for node in G.nodes()]
  })

  return avg_df


In [8]:
# # EXAMPLE - how to plot the average sentiment
# df = pd.read_csv(movie_results_path+'civil_war.csv')
# avg_df = calculate_average_sentiment_per_consecutive_pair(df, annotated=False)
# G = create_graph_average_sentiment_per_consecutive_pair(avg_df)
# plot_average_sentiment_per_consecutive_pair(G)

# # EXAMPLE - how to run my_min_cut_decomposition
# df = pd.read_csv(movie_results_path+'civil_war.csv')
# avg_df = calculate_average_sentiment_per_consecutive_pair(df, annotated=False)
# G = create_graph_average_sentiment_per_consecutive_pair(avg_df)
# my_min_cut_decomposition(G)

# Visualize data. online and filterable!

In [9]:
## Create a plotly figure that has every line spoken/character/emotion plotted over the course of the movie
## Example: https://chart-studio.plotly.com/~coffeeboost/1

# fig = px.line(df, x=df.index, y='Sentiment', color='Name', markers=True, line_dash='Name', title='Line Plot for Each Unique Name',hover_data={'Name': True, 'Sentiment': True, 'Line': True})
# fig.update_layout(xaxis_title='Index', yaxis_title='Value', legend_title='Name')
# py.sign_in(username='USERNAME', api_key='API_KEY')
# chart_studio_plot = py.plot(fig, filename='interactive_plot', auto_open=False)

In [114]:
# function to create node colour list
def create_community_node_colors(graph, communities):
    colors = [
    "#FFD1DC", "#ADD8E6", "#98FB98", "#D8BFD8", "#FFDAB9",
    "#F0E68C", "#FFB6C1", "#87CEFA", "#DDA0DD", "#00FF7F",
    "#FFC0CB", "#AFEEEE", "#FFE4B5", "#E0FFFF", "#FAFAD2",
    "#F5FFFA", "#FFA07A", "#AFEEEE", "#FFE4E1", "#F0FFF0",
    "#FFF5EE", "#E6E6FA", "#FFFAF0", "#F0F8FF", "#FFF8DC",
    "#FFEBCD", "#FFEFD5", "#F5F5F5", "#F0F0F0", "#FFFFFF"]

    node_colors = []
    for node in graph:
        current_community_index = 0
        for community in communities:
            if node in community:
                node_colors.append(colors[current_community_index])
                break
            current_community_index += 1
    return node_colors


from networkx.drawing.nx_agraph import graphviz_layout
# function to plot graph with node colouring based on communities
def visualize_communities(graph, communities, i):
    node_colors = create_community_node_colors(graph, communities)
    title = f"Community Visualization of {len(communities)} communities"
    pos = nx.spring_layout(graph, k=0.3, iterations=50, seed=2)

    # Use force_atlas2_layout for better positioning without overlaps
    pos = graphviz_layout(graph, prog="neato", root=None)

    plt.subplot(2, 1, i)
    plt.title(title)
    nx.draw(
        graph,
        pos=pos,
        node_size=1000,
        node_color=node_colors,
        with_labels=True,
        font_size=10,
        font_color="black",
    )

def find_communities(G, num_communities=3):
  communities = list(nx.community.girvan_newman(G))
  fig, ax = plt.subplots(1, figsize=(10, 15))
  visualize_communities(G, communities[num_communities-2], 1)

## EXAMPLE - Find communities using Girvan-Newman
# find_communities(G)

# Post Processing
- convert string to float
- scale number betwee [-1,1]
- remove consecutive characters
- scale down  with low pair count
- zero sclaing based on the median

In [11]:
# convert string to float
def parse(string):
  string = string[1:-1] # remove brackets
  string = string.strip() # remove white spaces
  split = string.split(' ')
  num1 = split[0] # first
  num2 = split[-1] # last
  return float(num1), float(num2)

# format string to float
def format_string_to_float(df):
  df['Irony'] = df['Irony'].apply(lambda x: parse(x)[0])
  df['Hate'] = df['Hate'].apply(lambda x: parse(x)[1])
  return df

# do min max scaling [-1,1]
def scale_to_minus_1_and_1(df):
  min = df[EMOTION_COLUMNS].min().min()
  max = df[EMOTION_COLUMNS].max().max()
  for col_name in EMOTION_COLUMNS:
    df[col_name] = df[col_name].apply(lambda x: 2 * (x - min)/( max - min) - 1)
  return df

# scale down pairs with low pair
# Calculate the average sentiment for each pair of consecutive characters
def get_pair_count(df, annotated):
  pair_count = {}
  length = len(df) if annotated else len(df)-1

  for i in range(length):
      current_char = df.iloc[i, df.columns.get_loc('Name')]
      next_char = df.iloc[i, df.columns.get_loc('Spoken')] if annotated else df.iloc[i + 1, df.columns.get_loc('Name')]
      pair = (current_char, next_char)
      if pair in pair_count:
        pair_count[pair] += 1
      else:
        pair_count[pair] = 1
  return pair_count

def scale_down_by_count(df, annotated):
  pair_count = get_pair_count(df, annotated)
  length = len(df) if annotated else len(df)-1

  for i in range(length):
    current_char = df.iloc[i, df.columns.get_loc('Name')]
    next_char = df.iloc[i, df.columns.get_loc('Spoken')] if annotated else df.iloc[i + 1, df.columns.get_loc('Name')]

    pair = (current_char, next_char)
    count = pair_count[pair]

    for column in EMOTION_COLUMNS:
      df.iloc[i, df.columns.get_loc(column)] *= 1-1/(count+1)

  return df

def scale_down_by_log(df):
  for emotion in EMOTION_COLUMNS:
    df[emotion] = np.log(df[emotion])
  return df

def zero_scale_to_median(df):
  for emotion in EMOTION_COLUMNS:
    median = df[emotion].median()
    df[emotion] -= median
  return df

# Results of analysis


In [102]:
df = pd.read_csv(movie_results_path+'Wizard-of-Oz.csv')
# prepare data for analysis
df = format_string_to_float(df)
df = scale_to_minus_1_and_1(df)
df = scale_down_by_count(df, annotated=False)
df = zero_scale_to_median(df)
# prepare graph for analysis
avg_df = calculate_average_sentiment_per_consecutive_pair(df, annotated=False)
G = create_graph_average_sentiment_per_consecutive_pair(avg_df)

In [None]:
# # run barrage of algorithms
# plot_average_sentiment_per_consecutive_pair(G)
# avg_df = calculate_average_sentiment_per_consecutive_pair(df, annotated=False, emotion='nervousness')
# G = create_graph_average_sentiment_per_consecutive_pair(avg_df)
# my_min_cut_decomposition(G, num_iterations=20)
# find_communities(G, num_communities=16)