Import Packages

In [1]:
#import packages
import re
from ftfy import fix_text
import spacy
import json
from unidecode import unidecode
from langdetect import detect, DetectorFactory
import datetime
import csv
from collections import Counter
import difflib
from nltk.metrics import distance


File retrieval

In [2]:
fpath = input("Enter input JSON file path: ")
fname = fpath.removesuffix('.json')

# open write and read json file
with open(fpath, 'r', encoding='utf-8') as file:
    data = json.load(file)

Pre-processing

In [3]:
spacy_model = spacy.load('en_core_web_sm')

# Clean tweets and store RT totals
rt_totals = {}
for data_text in data:

    # The following cleaning must be done to combine tweets properly
    # Remove URLS
    data_text['text'] = re.sub(r'\A|\shttps?://\S+', '', data_text['text'])
    #Fix encoding issues
    data_text['text'] = fix_text(data_text['text'])
    #Standardize special characters / emojis to Unicode
    data_text['text'] = unidecode(data_text['text'])
    #clean white space
    data_text['text'] = " ".join(data_text['text'].split())
    #keep tabs/newline characters
    data_text['text'] = re.sub(' +', ' ', data_text['text'])
    # lowcase letters
    # data_text['text'] = data_text['text'].lower()
    #remove hashtags
    data_text['text'] = re.sub(r'#', '', data_text['text'])

    # Extract RT data from text
    retweets = re.findall( \
        r'\b(?:rt(?:\+\d+)? |\")@(\w+):? (.*?)(?= // |\"\Z|\Z|\b rt @\w+:)', \
            data_text['text'], re.IGNORECASE)

    # Update retweet totals
    for rt in retweets:
        rt = (rt[0].lower(), rt[1])
        if rt not in rt_totals:
            rt_totals[rt] = {'retweets': 1, 'timestamp_ms': data_text['timestamp_ms']}
        else:
            rt_totals[rt]['retweets'] += 1
            rt_totals[rt]['timestamp_ms'] = \
                min(rt_totals[rt]['timestamp_ms'], data_text['timestamp_ms'])

# Combine retweets with existing tweets
for data_text in data:
    original_text = data_text['text']

    # Find number of retweets
    tweet_key = (data_text['user']['screen_name'].lower(), original_text)
    data_text['retweets'] = rt_totals.pop(tweet_key, {'retweets': 0})['retweets']

# Add retweeted tweets not in original dataset
for tweet_key in list(rt_totals):
    tweet = {'text': tweet_key[1], \
             'user': {'screen_name': tweet_key[0]}, \
             'timestamp_ms': rt_totals[tweet_key]['timestamp_ms'], \
             'retweets': rt_totals[tweet_key]['retweets']}
    data.append(tweet)

preprocessed_data = []
user_metadata = {}
# Remove retweets and finalize preprocessed json data
for data_text in data:
    original_text = data_text['text']

    # Update user weights based on retweet value
    user_name = data_text['user']['screen_name'].lower()
    if user_name not in user_metadata:
        user_metadata[user_name] = {'num_tweets': 0, 'rt_total': 0, 'rt_average': 0}
    user_metadata[user_name]['num_tweets'] += 1
    user_metadata[user_name]['rt_total'] += data_text['retweets']
    user_metadata[user_name]['rt_average'] = user_metadata[user_name]['rt_total'] \
        / user_metadata[user_name]['num_tweets']

    # Remove retweets
    remove_retweets = re.sub(r'(?:\A| )(?:(?:rt(?:\+\d+)? )|\")@\w+:?.*?\Z|(?:\"\Z)|(?: // )', \
                             '', original_text, 0, re.IGNORECASE)
    # Remove empty tweets
    if remove_retweets == '':
        continue
    #After pre-processing, information going back to orginal json data
    data_text['text'] = remove_retweets
    preprocessed_data.append(data_text)

# Save data
with open('_'.join([fname, 'user_metadata.json']), 'w', encoding='utf-8') \
    as output_file:
    json.dump(user_metadata, output_file, indent=6)
with open('_'.join([fname, 'preprocessed.json']), 'w', encoding='utf-8') \
    as output_file:
    json.dump(preprocessed_data, output_file, indent=6)

Parsing

In [4]:
# from nltk.tokenize import sent_tokenize, word_tokenize
def load_processed_tweets_from_json(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
    return data

data = load_processed_tweets_from_json('_'.join([fname, "preprocessed.json"]))
def clean_retweet_text(text):
    """Removes the 'rt @username:' prefix from a retweet, leaving only the main content."""
    # Check if the text starts with 'rt @'
    if text.startswith("rt @"):
        # Find the position of the first colon after 'rt @username'
        colon_position = text.find(":")
        # If a colon exists, return the text after it; otherwise, return the full string
        if colon_position != -1:
            return text[colon_position + 1:].strip()
    return text.strip()

award_name_allowlist = ["drama", "musical", "comedy", "animated", "animation", "motion", "television", "series", "award", "best"]

def extract_award_name(sentence):
    """Extracts the award name and winner from the sentence based on specified conditions."""
    # Find all hyphen and colon positions
    split_positions = [i for i, char in enumerate(sentence) if char in "-:"]
    award_name = None
    winner = None

    if not re.search(r'[-:]', sentence):
        return [None, None]  # Return None if there is no hyphen or colon

        # Step 2: Check if sentence contains "best" (case insensitive)
    if "best" not in sentence.lower():
        return [None, None]  # Return None if "best" is not present

    for index, pos in enumerate(split_positions):
        # Split the sentence at the current hyphen/colon position

        if index >= 1:
            left_part = sentence[split_positions[index - 1] + 1: split_positions[index]].strip()
        else:
            left_part = sentence[:pos].strip()
        right_part = sentence[pos + 1:].strip()

        # Check if there's something on the right
        if not right_part:
            return award_name, winner

        # Split left part into words
        left_words = left_part.split()

        # Check for "best" or "award" in left part
        if any(word.lower() in left_words for word in ['best', 'award']):

            # Determine the right portion based on allowlist keywords
            if index < len(split_positions) - 1:
                next_segment = sentence[pos + 1:split_positions[index + 1]].strip()
            else:
                next_segment = right_part

            # Assign award name and winner based on allowlist
            if any(word.lower() in next_segment.lower() for word in award_name_allowlist):
                award_name = f"{left_part} - {next_segment}".strip()
                # Capture the winner if more splits remain
                if index < len(split_positions) - 2:
                    winner = sentence[split_positions[index + 1] + 1: split_positions[index + 2]].strip()
            else:
                award_name = left_part
                winner = next_segment
            break
    if winner and '.' in winner:
        winner = re.split(r'\.', winner, 1)[0].strip()

    return [award_name, winner]
def extract_winner_info(text):
    """Check if the sentence contains a pattern like '... wins ...' with 'best' or 'award' in the second part."""
    text = clean_retweet_text(text)

    pattern = r'^(.*?)(wins|receives)(.*?)(best)(.*?)$'

    # Split the text into sentences
    sentences = re.split(r'[.!?]', text)

    # Check each sentence individually
    for sentence in sentences:
        match = re.search(pattern, sentence.strip(), re.IGNORECASE)

        if match:
            # Extract parts based on the match
            first_part = match.group(1).strip()
            second_part = match.group(3).strip() + " " + match.group(4).strip() + " " + match.group(5).strip()

            return [first_part, second_part]


    return None

def extract_award_sequences(input_str):

    initial_pattern = r"(?i)^(?:golden globes for|golden globes|award for)\s+(best)\s+(.+)"
    match = re.search(initial_pattern, input_str)
    if match:
        input_str = match.group(1) + " " + match.group(2)



    refine_pattern = r"(?i)(best|award)\s+(.+?)(?:\s+golden globes|goldenglobes|$)"
    match = re.search(refine_pattern, input_str)
    if match:
        input_str = match.group(1) + " " + match.group(2).strip()


    split_parts = re.split(r'(\s*-\s*|\s+for\s+)', input_str)
    sequences = []
    current_sequence = split_parts[0].strip()


    for i in range(1, len(split_parts) - 1, 2):
        sequences.append(current_sequence)
        separator = split_parts[i].strip()
        next_part = split_parts[i + 1].strip()
        current_sequence += f" {separator} {next_part}"

    sequences.append(current_sequence)

    return sequences
for index, tweet in enumerate(data):

    tweet_text = tweet['text']
    tweet_text = clean_retweet_text(tweet_text)

    award_name = extract_winner_info(tweet_text)

    if award_name:
        win_resolutions = {
            "award" : extract_award_sequences(award_name[1]),
            "winner": [award_name[0]],
            "confidence": 0.7

        }
        tweet["win_resolutions"] = win_resolutions

        continue
    award_name = extract_award_name(tweet_text)

    if award_name[0]:
        win_resolutions = {
            "award" : extract_award_sequences(award_name[0]),
            "winner": [award_name[1]],
            "confidence": 0.8

        }
        tweet["win_resolutions"] = win_resolutions

In [None]:
nlp = spacy_model
DetectorFactory.seed = 0

def find_people_from_text(text):

    # Skip if not English
    try:
        if detect(text) != 'en':
            return {}
    except:
        return {}

    doc = nlp(text)
    confidence = 0.5

    # Find the people mentioned in the tweet
    potential_hosts = [entity for entity in doc.ents \
                           if entity.label_ == "PERSON"]
    # Add to hosts
    hosts =[host.text for host in potential_hosts]

    for sentence in doc.sents:

        # Find the root verb of the sentence
        root_verb = [token for token in sentence if token.dep_ == "ROOT"]

        if root_verb and root_verb[0].lemma_ == "host":
            confidence = 0.7
            break

    return {'hosts': hosts, 'confidence': confidence}
have_host_resolutions = []
for index, tweet in enumerate(data):

    tweet_text = tweet['text']

    # Skip if no mention of host in text, or about next year
    if re.search(r'(?:\bhost)|(?:host(?:s|ed|ing)?\b)', tweet_text, re.IGNORECASE) == None \
        or re.search(r'\bnext\b', tweet_text, re.IGNORECASE) != None:
        host_resolutions = {}
    else:
        host_resolutions = find_people_from_text(tweet_text)

    if host_resolutions:
        have_host_resolutions.append(index)
        tweet["host_resolutions"] = host_resolutions
def split_string_on_and(text: str):
    # Split the text by "and" or "&" with surrounding whitespace handling
    parts = re.split(r'\s*(?:\band\b|&)\s*', text)
    # Filter out any empty strings in the list
    return [part.strip() for part in parts if part.strip()]

def parse_presenters(tweet: str):
    # Define the regex pattern to match the presenters and award pattern
    pattern = r"(.*?)(?<!re)(?:\bpresent\b|\bpresents\b|\bpresenting\b|\bpresenting for\b)\s+(.*?(?:best).*)"

    # Split the tweet into sentences
    sentences = re.split(r'[.!?]', tweet)

    # Check each sentence individually
    for sentence in sentences:
        match = re.search(pattern, sentence.strip(), re.IGNORECASE)

        if match:
            # Extract the presenters and award from the match
            presenters = match.group(1).strip()
            award = match.group(2).strip()

            # Clean the award string
            # Remove any words before "best"
            award = re.sub(r".*?\bbest\b", "best", award, flags=re.IGNORECASE)
            # Remove any words including and after "at," "and," "for," or "to"
            award = re.sub(r"\s+\b(at|and|for|to)\b.*", "", award, flags=re.IGNORECASE)

            return [award.strip(),   split_string_on_and(presenters)]


    return [None, None]
for index, tweet in enumerate(data):

    tweet_text = tweet['text']
    tweet_text = clean_retweet_text(tweet_text)


    possible_presenters  = parse_presenters(tweet_text)
    if possible_presenters[0]:
      present_resolutions = {"award": possible_presenters[0], "presenters": possible_presenters[1], "confidence": 0.7}
      tweet["present_resolutions"] = present_resolutions


def get_nominees(text: str):
    # Define the regex pattern to match "nominee/nominated/nominees ... ... best"
    pattern = r"(.*)\b(?:nominee|nominated|nominees)\b(.*?\bbest\b.*)"
    # Define a list of exclusion words/phrases
    exclusions = r"\b(?:not|should've|should have|wasn't|introduce|present|should)\b"

    # Split the text into sentences
    sentences = re.split(r'[.!?]', text)

    # Check each sentence individually
    for sentence in sentences:
        # Check if any exclusion word is present
        if re.search(exclusions, sentence, re.IGNORECASE):
            continue


        match = re.search(pattern, sentence, re.IGNORECASE)
        if match:
            # Clean and separate the nominee part and award name
            before_nominee = re.sub(r"\b(?:nominee|nominated|nominees)\b", "", match.group(1), flags=re.IGNORECASE).strip()
            award_name = re.sub(r".*?\bbest\b", "best", match.group(2), flags=re.IGNORECASE).strip()

            return [before_nominee, award_name]


    return None
def get_nominees2(text: str) :
    # Define the regex pattern to match a sentence with a negative word, "win/won," and "best"
    pattern = r"(.*?)(\bdoes not\b|\bnot\b|\bshould have\b|\bshould've\b|\bshould\b|\bdidn't\b|\bdid not\b)(.*?\b(?:win|won)\b.*?\bbest\b.*)"

    # Split the text into sentences
    sentences = re.split(r'[.:!?]', text)

    # Process each sentence individually
    for sentence in sentences:
        match = re.search(pattern, sentence.strip(), re.IGNORECASE)

        if match:
            # Extract the part before the negative word as the nominee
            nominee = match.group(1).strip()
            # Extract the part from "best" onward as the award_name
            award_name = re.sub(r".*?\bbest\b", "best", match.group(3), flags=re.IGNORECASE).strip()

            # Clean the nominee by removing text before and including "that" or "if"
            nominee = re.sub(r".*\b(that|if)\b", "", nominee, flags=re.IGNORECASE).strip()

            return [nominee, award_name]


    return None
for index, tweet in enumerate(data):

    tweet_text = tweet['text']
    tweet_text = clean_retweet_text(tweet_text)


    possible_nominee  = get_nominees(tweet_text)
    if possible_nominee:


        nominee_resolutions = {"award": possible_nominee[1], "nominee": possible_nominee[0], "confidence": 0.5}
        tweet["nominee_resolutions"] = nominee_resolutions

    possible_nominee  = get_nominees2(tweet_text)
    if possible_nominee:


        nominee_resolutions = {"award": possible_nominee[1], "nominee": possible_nominee[0], "confidence": 0.5}
        tweet["nominee_resolutions"] = nominee_resolutions




filename = '_'.join([fname, 'parsed_data.json'])

# Write data to JSON file
with open(filename, 'w') as json_file:
    json.dump(data, json_file, indent=4)

Aggregation

In [None]:
# Open and read extracted tweets
with open('_'.join([fname, 'parsed_data.json']), 'r', encoding='utf-8') \
    as extracted_file:
    extracted_data = json.load(extracted_file)
# Open and read user metadata
with open('_'.join([fname, 'user_metadata.json']), 'r', encoding='utf-8') \
    as metadata_file:
    user_metadata = json.load(metadata_file)

# Find ceremony year
dt = datetime.datetime.fromtimestamp(extracted_data[0]['timestamp_ms'] / 1000.0,
                                     tz=datetime.timezone.utc)
year = dt.year

# Import IMDB data relevant to the ceremony year
def get_imdb_title_basics():

    title_basics = {}

    with open('./imdb/title.basics.tsv') as imdb_file:
        reader = csv.DictReader(imdb_file, delimiter='\t', quotechar='"')
        for row in reader:
            # Skip title with unknown start years
            if row['startYear'] == "\\N":
                continue

            # Save title if it is from the current or previous year, OR if it is a
            # TV series that ran at some point during that time, AND it is not an
            # episode or special
            start_year = int(row['startYear'])
            if ((row['endYear'] == "\\N" \
                 and (start_year == year or start_year == year-1 \
                    or (row['titleType'] == "tvSeries" and start_year < year))) or \
                (row['endYear'] != "\\N" and start_year <= year \
                 and int(row['endYear']) >= year-1)) and \
               row['titleType'] != "tvEpisode" and \
               row['titleType'] != 'tvSpecial':
                category = row['titleType'].lower()
                if category not in title_basics:
                    title_basics[category] = []
                title = row['primaryTitle']
                title_basics[category].append(title)

    return title_basics

title_basics = get_imdb_title_basics()
# IMDB genre tags, minus 'Short' and 'Adult'
genres = ['Comedy', 'Music', 'Crime', 'Drama', 'Game-Show', 'Talk-Show', 'Family', 'Mystery', 'Sport', 'Horror', 'Western', 'Adventure', 'News', 'Action', 'Documentary', 'Reality-TV', 'Sci-Fi', 'Thriller', 'Animation', 'War', 'Musical', 'Romance', 'Fantasy', 'Biography', 'History']
# Set up results object
results = {}

# Ranking
possible_hosts = Counter()
possible_awards = Counter()
for tweet in extracted_data:
    # Calculate tweet weight based on user + tweet reliability
    user = tweet['user']['screen_name'].lower()
    tweet['weight'] = (tweet['retweets'] + 1) \
        + (user_metadata[user]['rt_average'] + 1)

    # Host
    if 'host_resolutions' in tweet:
        hr = tweet['host_resolutions']
        for host in hr['hosts']:
            possible_hosts[host.lower()] += tweet['weight'] * hr['confidence']

    # Award
    if 'win_resolutions' in tweet:
        wr = tweet['win_resolutions']
        for award in wr['award']:
            possible_awards[award.lower()] += tweet['weight'] * wr['confidence']
    if 'nominee_resolutions' in tweet:
        nr = tweet['nominee_resolutions']
        possible_awards[nr['award'].lower()] += tweet['weight'] * nr['confidence']
    if 'present_resolutions' in tweet:
        pr = tweet['present_resolutions']
        possible_awards[pr['award'].lower()] += tweet['weight'] * pr['confidence']


# Find closest match by averaging distance calculations
def closest_match(names, possible_names, diff):
    if not isinstance(names, list):
        names = [names]

    max_ratio = []
    for name in names:
        name = name.lower()
        sm = difflib.SequenceMatcher(b=name)
        for other_name in possible_names:
            other_name = other_name.lower()
            ratio = 1 - distance.edit_distance(name, other_name) / \
                max(len(name), len(other_name))
            sm.set_seq1(other_name)
            ratio = (sm.ratio() + ratio)/2.0
            if not max_ratio or (max_ratio and ratio > max_ratio[1]):
                max_ratio = [other_name, ratio]

    if max_ratio and max_ratio[1] > diff:
        return max_ratio[0]
    return []

# Combine like name categories
def combine_names(name, possible_names, orig_vals, len_penalty, diff):
    if name not in possible_names:
         return

    # Temporarily remove current name to avoid comparing it to itself
    temp = possible_names.pop(name)

    # Check for direct substrings
    for other_name in possible_names:
        # Prioritize longer string, unless large popularity difference
        if (name in other_name \
            and orig_vals[name] * len_penalty < orig_vals[other_name]) \
                or (other_name in name \
                    and orig_vals[other_name] * len_penalty >= orig_vals[name]):
                possible_names[other_name] += temp
                return
        elif name in other_name or other_name in name:
            possible_names[name] = temp + possible_names[other_name]
            del possible_names[other_name]
            return

    # If not a direct substring, check for misspellings
    closest = closest_match(name, possible_names.keys(), diff)
    if closest and orig_vals[name] > orig_vals[closest]:
        possible_names[name] = temp + possible_names[closest]
        del possible_names[closest]
        return
    elif closest:
        possible_names[closest] += temp
        return

    # Reinsert current name if it is distinct
    possible_names[name] = temp

# Retrieve the most likely combination of names from a given set
def get_names(possible_names, threshold):
    names = []
    if not possible_names:
        return names

    # Set the minimum score based on the given threshold
    min_score = threshold
    if threshold <= 1:
        min_score *= possible_names[0][1]
    for name in possible_names:
        # Insert all possible hosts above the minimum score
        if name[1] >= min_score:
            names.append(name[0])
        else:
            break

    return names

# Aggregate results for a category
def aggregate_results(possible_options, len_penalty, diff, threshold):
    in_order = possible_options.most_common()
    in_order.reverse()
    orig_vals = dict(in_order)
    for name in orig_vals:
        combine_names(name, possible_options, orig_vals, len_penalty, diff)
    return get_names(possible_options.most_common(), threshold)

## Aggregation

# Hosts
results['hosts'] = aggregate_results(possible_hosts, 0.8, 0.8, 0.8)

# Award names
awards = aggregate_results(possible_awards, 0.8, 0.7, 10)
for award in awards:
    results[award] = {}

# Per-award ranking
possible_winners = {}
possible_nominees = {}
possible_presenters = {}

for award in awards:
    possible_winners[award] = Counter()
    possible_nominees[award] = Counter()
    possible_presenters[award] = Counter()

for tweet in extracted_data:

    # Winner
    if 'win_resolutions' in tweet:
        wr = tweet['win_resolutions']
        award = closest_match(wr['award'], awards, 0.8)
        if award:
            for winner in wr['winner']:
                if winner:
                    possible_winners[award][winner.lower()] += \
                        tweet['weight'] * wr['confidence']

    # Nominee
    if 'nominee_resolutions' in tweet:
        nr = tweet['nominee_resolutions']
        award = closest_match(nr['award'], awards, 0.8)
        if award and nr['nominee']:
            possible_nominees[award][nr['nominee'].lower()] += \
                tweet['weight'] * nr['confidence']

    # Presenter
    if 'present_resolutions' in tweet:
        pr = tweet['present_resolutions']
        award = closest_match(pr['award'], awards, 0.8)
        if award:
            for presenter in pr['presenters']:
                if presenter:
                    possible_presenters[award][presenter.lower()] += \
                        tweet['weight'] * pr['confidence']

# Per-award aggregation
win_penalty = 0.1
for award in awards:
    winner = aggregate_results(possible_winners[award], 0.8, 0.8, 1)
    if winner:
        results[award]['winner'] = winner[0]
        # Insert award winner into possible nominees
        possible_nominees[award][winner[0]] = \
            win_penalty * possible_winners[award][winner[0]]
    else:
        results[award]['winner'] = ''
    results[award]['nominees'] = \
        aggregate_results(possible_nominees[award], 0.8, 0.8, 0.2)
    results[award]['presenters'] = \
        aggregate_results(possible_presenters[award], 0.8, 0.8, 0.4)

######Humor: Addtional goal
humor = ['guffaw', 'fun', 'smirk', 'yelp', 'wag', 'lol', 'funny', 'wry', 'sneer', 'joke']
#store data in list
humor_list = []
for t in extracted_data:
    for x in humor:
        if x in t['text']:
            humor_list.append((t['user']['screen_name'], t['text']))
            break



# Output to JSON
with open('_'.join([fname, 'results.json']), 'w', encoding='utf-8') as output_file:
    json.dump(results, output_file, indent=6)
# Output human-readable to TXT
with open('_'.join([fname, 'results.txt']), 'w', encoding='utf-8') as output_file:
    print('Hosts:', (', '.join(results['hosts']).title()), file=output_file)
    for award in awards:
        print("\nAward: ", award.title(), sep='', file=output_file)
        for category in results[award]:
            if isinstance(results[award][category], list):
                print(category.title(), ": ",
                      (', '.join(results[award][category])).title(), sep='',
                      file=output_file)
            else:
                print(category.title(), ": ", results[award][category].title(),
                      sep='', file=output_file)
    print()
    for y in humor_list:
        print(f"Humor_Who: {y[0]}, Best Jokes: {y[1]}", file=output_file)