In [1]:
import copy
import csv
import os
import re
from external_functions import *

#### **Text Extraction**

In [2]:
uppercase_colon_pattern = re.compile("^(?P<speaker>([QA]\. BY )?(M[Rr]?[Ss]?\.? )?(A )?(([A-Z]{2,}[\- ]?)+)|(JUROR NO\. [0-9]+)):(\s)*") # Mr. Mesereau:, A Juror:, The Court:
uppercase_pattern = re.compile("^(M[Rr]?[Ss]?\.? )?((Mc|Le)?[A-Z]{2,}[\- ]?)+(([A-Z](\. |[’']))([A-Z]{2,}[\- ]?)*)?(, (JR|SR)\.)?(\([A-Za-z]+\))?$")
# Examples of patterns to be matched: DIRECT EXAMINATION, CROSS-EXAMINATION (Continued), SHAWN O'GRADY, CAROLE McCOY
# VICTOR M. ALVAREZ, PHILLIP LeMARQUE, HERIBERTO MARTINEZ, JR., ADRIAN MARIE McMANUS, JAMES F. VAN NORMAN, 

exception_pattern = re.compile("BY: *$|DATED: *$|APPEARANCES OF COUNSEL: *$")
qa_pattern = re.compile("(?P<qa>^[QA]\.)\s*")
proceedings_pattern = re.compile("^[\(]?The following proceedings were held")

def new_speaker_identifier(text, previous_status):
    """Helper function to read_from_file() identifying if a new line starts with a new speaker. If it does,\n
    the speaker name, the speaker text (i.e. the text line minus the speaker name) and status (reading, \n
    recess, reading_message or end_of_day) is returned."""
    
    # Checking for changes in status
    if previous_status == "not_started":
        if re.search("Santa Maria, California$", text):
            return ("Start of Day Message", text, "reading_message")
        else:
            return None
    elif previous_status == "reading":
        if re.search("Recess taken", text):
            return ("Recess Message", text, "recess")
        elif re.search("\(The proceedings adjourned[\S ]+\)", text):
            return ("Adjourn Message", text, "end_of_day")
    
    # Standard case: Finding matches like "MR. AUCHINCLOSS:"
    if previous_status in ["reading", "recess"]:
        groups = uppercase_colon_pattern.search(text) # Mr. Mesereau:, A Juror:, The Court:  
        if groups:
            if not exception_pattern.search(groups.group(0)):
                speaker = groups.group("speaker").replace("BY ", "").title() # Returns "The Court" for new_speaker_groups.group(0) = "THE COURT: "
                speaker_text = text.replace(groups.group(0), "") # Returns "Good morning." for "THE COURT: Good morning."
                return (speaker, speaker_text, "reading")
            else:
                return None
    
    # If previous_status == "reading", look for two additional types of matches
    if previous_status == "reading":
        
        # Finding matches, at the end of a line, like "MR. JESUS SALAS", "DIRECT EXAMINATION (Continued)" 
        groups = uppercase_pattern.search(text)
        if groups:
            speaker = groups.group(0).title()
            if len(speaker) >= 5:
                return ("Message", groups.group(0), "reading_message")
        
        # Finding matches for "Q. " and "A. "
        groups = qa_pattern.search(text)
        if groups:
            speaker = groups.group("qa")
            speaker_text = qa_pattern.sub("", text)
            return (speaker, speaker_text, "reading")
        
        groups = proceedings_pattern.search(text)
        if groups:
            return ("Message", text, "reading_message")

text_input = "A. This has the D.A. and then it has her. It"
new_speaker_identifier(text_input, "reading")

('A.', 'This has the D.A. and then it has her. It', 'reading')

In [3]:
def create_speaking_segment(speaker, speaker_lines):
    """Returns a speaking segment (a dictionary with text and a speaker). Speaker_lines is a list of text items to be merged"""
    segment_text = re.sub("okay\. --$", "okay --.", " ".join(speaker_lines))
    if speaker in ["Message", "Recess Message", "Adjourn Message"]:
        segment_text = re.sub("[\(\)]", "", segment_text) # Removes any parenthesis around message text
    speaking_segment = {"text": segment_text, "speaker": speaker}
    return speaking_segment

In [4]:
# The named group main_content returns the text of each line, if any. For example,
# "7 THE PEOPLE OF THE STATE OF )" returns "THE PEOPLE OF THE STATE OF )" while "6" returns no match
content_pattern = re.compile("(\s*\d+\s+)(?P<main_content>[\S ]+)(\s)")
replacements = {
    re.compile("[ ]{2,}"): " ",
    re.compile("‘|’"): "'",
    re.compile("\.'"): "'.",
    re.compile(",'"): "',",
    re.compile("\/{2,}"): "",
    re.compile(" \. "): ". ",
    re.compile(" \.$"): ".",
    re.compile(" \? "): "? ",
    re.compile(" \?$"): "?",
    re.compile(" ,"): ",",
    re.compile(",\”"): "”,",
    re.compile(" ;"): ";",
    re.compile(" :"): ":",
    re.compile("Aja"): "Azja",
    re.compile("MR\. MR\."): "MR.",
    re.compile("MR "): "MR. ",
    re.compile("Mr "): "Mr. ",
    re.compile("MESSEREAU"): "MESEREAU",
    re.compile("Davallin"): "Davellin",
    re.compile("ANTHONY A\. CANTU"): "ANTONIO A. CANTU",
    re.compile("Robeson"): "Robson",    
    re.compile(" 's"): "'s",
    re.compile(" 't"): "'t",
    re.compile(" 'd"): "'d",
    re.compile(" 've"): "'ve",
    re.compile(" 're"): "'re",
    re.compile(" 'll"): "'ll",
    re.compile("“”"): "“",
    re.compile(" ”"): "”",
    re.compile("”\.”|\.”"): "”.",
    re.compile("”,”|,”"): "”,",
    re.compile(" \.”"): "”.",
    re.compile(" “\?"): "”?",
    re.compile(" \.\)"): ".)",
    re.compile("'\.s"): ".'s", # D.A'.s => D.A.'s
    re.compile("Lamir"): "LaMere"
    }

def extract_line_text(line):
    """Returns extracted text for a line of text. Returns None if no text can be extracted."""
    try:
        line_text = content_pattern.search(line).group("main_content")
        for regex, repl in replacements.items(): # Making replacements as needed
            line_text = regex.sub(repl, line_text)
        if re.search("^\s*28", line): # For lines of text starting with 28 (this represents the last line for each page):
            line_text = re.sub(" ?[0-9]+\s*$", "", line_text) # This removes the page numbering
        if line_text == "":
            return None
        else:
            return line_text
    except:
        return None

line = "18 BY MR. SNEDDON: "
extract_line_text(line)

'BY MR. SNEDDON:'

In [5]:
def read_from_file(file):
    """Reading the information from a single text file, returning a list of speaking segments"""

    transcript = []
    speaker = None # Necessary in order not to read formalia at the beginning of each file
    status = "not_started"
    
    for line in file.readlines(): # Iterating over each file line by line

        # Extracting the text line by line if there is any text to be extracted 
        line_text = extract_line_text(line)
        if not line_text: # If there is no text to extract, continue to the next line of text
           continue
        
        # These lines of code makes sure that a message always corresponds to at least two lines of codes,
        # for example "DIRECT EXAMINATION (Continued)\n" followed by "BY MR. AUCHINCLOSS:""
        if status == "reading_message":
            status = "reading"
            if line_text[:2] == "BY":
                speaker_lines.append(line_text)
                continue

        # If line contains new speaker
        try:
            new_speaker, speaker_text, status = new_speaker_identifier(line_text, status)
            # If previous lines contained a speaker
            if speaker:
                # The dictionary speaking_segment is assigned the text of previous speaker.
                speaking_segment = create_speaking_segment(speaker, speaker_lines)
                transcript.append(speaking_segment)
            
            speaker_lines = [speaker_text] # Speaker_lines is recreated with the current speaker_text
            
            # Updating the speaker for next iteration, i.e. for the next line of text to be read
            speaker = new_speaker

            # The current line of text being read is added as a speaking segment, e.g. Recess Message: Recess taken
            if status in ["recess", "end_of_day"]:
                speaking_segment = create_speaking_segment(speaker, speaker_lines)
                transcript.append(speaking_segment)
                speaker = None
                if status == "end_of_day":
                    break # No need to do further iterations
        
        except:
            # If there is no speaker and line_text contains no speaker info, don't read the line
            if speaker is None:
                continue
            # If same speaker as previous line, simply append the current line of text to that speaker
            else:
                speaker_lines.append(line_text)
    
    return transcript

In [6]:
def iterating_over_files(specified_file = None):
    """Iterates over a single file or all files in the Transcripts directory, returning a dictionary\n
    transcripts where each value corresponds to a file (as processed by read_from_file())"""

    transcripts = {} # Dictionary to store all text and information that is read in
    directory_path = "Transcripts" # Name of folder containing all transcripts
    for file_name in os.listdir(directory_path):
        file_path = os.path.join(directory_path, file_name)

        # In case filename has been provided, non-matching file names are skipped
        if specified_file is not None: # If a filename has been provided
            if file_name != specified_file:
                continue
        
        if file_name == "Metadata.txt": # Don't read the metadata
            continue

        # For each file that is read
        with open(file_path, encoding="ansi") as file:#, encoding='utf-8', errors='replace') as file:
            transcript = read_from_file(file)
            transcripts[file_name.replace(".txt", "")] = transcript

    return transcripts

transcripts_v0 = iterating_over_files()#"2005-03-01".txt")
#transcripts_v0["2005-03-21"][:10]

In [7]:
def get_changed_speaker_names():
    """Returns a dictionary of the mapping of the speaker names that are to be changed as defined in changed_speaker_names.csv"""
    dictionary = {}
    with open("changed_speaker_names.csv") as csv_file:
        csv_reader = csv.DictReader(csv_file, delimiter=",")
        for row in csv_reader:
            original_name = row["original_name"]
            new_name = row ["new_name"]
            dictionary[original_name] = new_name
    return dictionary

changed_speaker_names = get_changed_speaker_names()

In [8]:
def get_read_back_offsets():
    """Returns a dictionary of lists using the data in read_back_offsets.csv. For example,\n
    we have '2005-03-04': ['-7', '-2'] as one of the entries on the returned dictionary. The\n
    interpretation of this is that for the court date 2005-03-04 the text corresponding to the\n
    first time "(Record read.)" appears is 7 seven speaking segments prior. The second time\n
    "(Record read.)" appears this difference is 2."""
    read_back_offsets = {}
    with open("read_back_offsets.csv") as csv_file:
        for row in csv_file:
            row_split = [x for x in row[:-1].split(",") if x != ""] # [:-1] removes \n at the end of each row
            court_date = row_split[0]
            offsets = row_split[1:]
            read_back_offsets[court_date] = offsets
        return read_back_offsets

In [9]:
def get_read_back_text(transcript, segment_number, court_date, record_reading):
    """Returns the text corresponing to each "(Record read.) in the transcripts,\n
    using the offsets specified in read_back_offsets.csv"""
    read_back_offsets = get_read_back_offsets()
    offsets = [int(offset) for offset in read_back_offsets[court_date][record_reading].split(";")]
    text_lst = []
    for offset in offsets:
        text_lst.append(transcript[segment_number + offset]["text"])
    text = " ".join(text_lst)
    return text

In [10]:
declared_sworn_pattern = re.compile("^(?P<name>[\S ]+[\S]*) (?P<text_after_name>Having (been (previously )?sworn|so affirmed)[\S ]+)")
examination_pattern = re.compile("(?P<examination_type>[\S ]*EXAMINATION (\(?[A-Za-z]+\)? )?BY) (?P<person_questioning>[\S ]+):$") # DIRECT EXAMINATION (Continued) BY MR. AUCHINCLOSS:
q_name_questioner_pattern = re.compile("^Q. (?P<questioner>[\S ]+)") # Example: "Q. BY MR. AUCHINCLOSS"
record_read_pattern = re.compile(" \(Record read\.\)")

def post_read_in_edits(transcripts):
    """After a first version of the "transcripts" dictionary has been created using "iterating_over_files()",\n
    this function iterates over all the speaking segments for all the court dates in "transcripts". This is \n 
    done in order to be able to explicitly specify all "Q." and "A." persons by name."""
    
    transcripts = copy.deepcopy(transcripts) # Necessary to preserve the old version of transcripts, transcripts_v0

    for court_date in transcripts.keys():
        
        transcript = transcripts[court_date]
        num_segments = len(transcript)
        segment_number = 0
        record_reading = 0 # Kepps track of the number of time (Record read.) appears in the text of each transcript

        while True:
            speaking_segment = transcript[segment_number]
            speaker = speaking_segment["speaker"]
            text = speaking_segment["text"]
            
            # Removing "(Record read.)"" from the text of speaking segments. Updating the value of record_read
            if record_read_pattern.search(text):
                text = record_read_pattern.sub("", text)
                speaking_segment["text"] = text
                record_read = True
            else:
                record_read = False

            # Captures "regular" messages as well as "Adjourn Message" and "Start of Day Message"
            if re.search("[Mm]essage", speaker):
                speaking_segment["type"] = "message"
                
                # Checking only regular messages
                if speaker == "Message":
                    # Checking for the name of the person being sworn in.
                    pattern_match = declared_sworn_pattern.search(text)
                    if pattern_match:
                        sworn_in_person = pattern_match.group("name").title().replace("Mcmanus", "McManus").replace("Mccoy", "McCoy").replace("Legrand", "LeGrand").replace("Lemarque", "LeMarque").replace("Mccullough", "McCullough")
                        #print(sworn_in_person)
                     # Retrieving new questioning persons for all spekaing segments containing "EXAMINATION BY xxx:"
                    else:
                        pattern_match = examination_pattern.search(text)
                        if pattern_match:
                            person_questioning = pattern_match.group("person_questioning").title()
            
            # Specifying the answering person for all answers
            elif speaker == "A.":
                speaking_segment["type"] = "answer"
                if sworn_in_person: # Should always evaluate to True
                    speaker = sworn_in_person
                    speaking_segment["speaker"] = speaker
                else:
                    print(text, "error") # Just making sure there are no errors
            
            # Specifying the questioning person for all questions
            elif speaker == "Q.":
                speaking_segment["type"] = "question"
                #speaking_segment["text"] = re.sub("(?<=[A-Za-z])\.$", "?", speaking_segment["text"]) # Adding question mark at the end of all questions
                if person_questioning: # Should always evaluate to True
                    speaker = person_questioning
                    speaking_segment["speaker"] =  person_questioning
                else:
                    print(text, "error") # Just making sure there are no errors

            # Handles speakers like "Q. BY MR. AUCHINCLOSS"
            elif q_name_questioner_pattern.search(speaker):
                speaker = q_name_questioner_pattern.search(speaker).group("questioner")
                speaking_segment["speaker"] = speaker
                speaking_segment["type"] = "question"

    
            else:
                speaking_segment["type"] = "normal"
                if speaker == "The Witness": # Should always evaluate to True
                    if sworn_in_person:
                        speaker = sworn_in_person
                        speaking_segment["speaker"] = speaker
                    else:
                        print(text, "error") # Just making sure there are no errors
            

            # Changing the speaker name for select speakers
            if speaker in changed_speaker_names:
                speaking_segment["speaker"] = changed_speaker_names[speaker]

            # Assigning the updated speaking segment to its court_date
            transcript[segment_number] = speaking_segment

            # If The Court Reporter reads from the record, an extra segment is inserted
            if record_read:
                text = get_read_back_text(transcript, segment_number, court_date, record_reading)
                record_reading += 1
                if text: # Given how the transcriptes are formatted, this if-condition is not really necessary
                    speaking_segment = {
                        "text": text,
                        "speaker": "The Court Reporter",
                        "type": "read-back"
                        }
                    segment_number = segment_number + 1
                    num_segments = num_segments + 1
                    transcript.insert(segment_number, speaking_segment)

            segment_number += 1
            if segment_number == num_segments: # True when there are no more segments to be read
                break
        
        transcripts[court_date] = transcript

    return transcripts

transcripts = post_read_in_edits(transcripts_v0)
#transcripts["2005-03-21"][:10]

In [11]:
def transcripts_to_files(transcripts, prefix = False):
    """Writes formatted court transcript to the folder "formatted-transcripts.txt"."""
    folder = "Formatted Transcripts"
    for court_date in transcripts.keys():
        short_date = "-".join(court_date.split("-")[1:]) # Example: Extracts "02-28" from "2005-02-28"
        path = os.path.join(folder, short_date + ".txt")
        with open(path, "w", encoding="utf-8") as file:
            for speaking_segment in transcripts[court_date]:
                speaker = speaking_segment["speaker"]
                text = speaking_segment["text"]
                
                if prefix == True:
                    segment_type = speaking_segment["type"]
                    if segment_type == "question":
                        prefix = "Q. "
                    elif segment_type == "answer":
                        prefix = "A. "
                    else:
                        prefix = ""
                else:
                    prefix = ""
            
                file.write(f"""{prefix}{speaker}: {text}\n""")

transcripts_to_files(transcripts, prefix=False)

#### **Output Validation**

In [12]:
def speakers(specified_court_date = None, sort_by="values"):
    """Returns a dictionary mapping each distinct speakers by number of speaking segments. By default,\n
    the returned dictionary is sorted by values in descending order, but if any other string value is\n
    passed in as the first argument alphabetical sorting is applied. The second argument, "specified_court_date",\n
    (which is also optional) makes it possible to only consider a specific court date."""
    
    all_speakers = {}
    for court_date in transcripts.keys():
        if specified_court_date is not None and court_date != specified_court_date:
            continue
        for speaking_segment in transcripts[court_date]:
            speaker = speaking_segment["speaker"]
            if speaker in all_speakers:
                all_speakers[speaker] += 1
            else:
                all_speakers[speaker] = 1
    
    if all_speakers == {}:
        print(f"""Typo error. Couldn't retrieve speakers for "{specified_court_date}".""")
    elif sort_by == "values": # Sorting be values (descending), then name
        return {k: v for k, v in sorted(all_speakers.items(), key=lambda item: (-item[1], item[0]))}
    else: # Sorting be keys (speaker names), alphabetical/ascending
        return {k: v for k, v in sorted(all_speakers.items(), key=lambda item: item[0])}
    return all_speakers

court_date = "2005-05-26"
speaker_dict = speakers(specified_court_date = None, sort_by="values")
for key, value in speaker_dict.items():
    print(f"{key}: {value}")

Mr. Mesereau: 30035
Mr. Zonen: 13683
Mr. Sneddon: 13088
Mr. Sanger: 12908
The Court: 10345
Mr. Auchincloss: 8966
Janet Arvizo: 5629
Gavin Arvizo: 3302
Star Arvizo: 3120
Davellin Arvizo: 3031
Mr. Nicola: 2814
Rudy Provencio: 1736
Craig Bonner: 1644
Blanca Francia: 1519
Hamid Moslehi: 1415
Jesus Salas: 1342
Ann Kite: 1340
Brian Barron: 1289
Adrian McManus: 1162
June Chandler: 1140
David LeGrand: 1127
Mark Geragos: 1090
Joseph Marcus: 1085
Jay Jackson: 1056
Jason Francia: 990
Violet Silva: 946
Azja Pryor: 900
Kiki Fournier: 897
Message: 872
Irene Peters: 871
Ralph Chacon: 855
Steve Robel: 843
Chris Tucker: 807
Robert Spinner: 787
Louise Palanker: 762
Jamie Masada: 761
Cynthia Montgomery: 677
Debbie Rowe: 673
Christian Robinson: 660
William Dickerman: 627
Joy Robson: 600
Kassim Abdool: 594
Paul Zelis: 583
Larry Feldman: 579
Lisa Hemman: 567
Wade Robson: 564
Jeff Klapakis: 535
Victor Alvarez: 532
Cynthia Ann Bell: 520
Conn Abel: 511
George Lopez: 443
Stan Katz: 443
Duross O'Bryan: 428
Karen

In [13]:
# Prints all speakers without audio profiles
audio_profiles = get_audio_profiles()
for speaker, segment_count in speaker_dict.items():
    if speaker not in audio_profiles.keys() and not re.search("[Mm]essage", speaker):
        print(f"{speaker}: {segment_count}")

In [14]:
with open("speakers_without_pictures.csv") as file:
    csv_reader = csv.reader(file)
    speakers_without_pictures = [line[0] for line in csv_reader]

# Printing all speakers without images
for speaker in speaker_dict:
    image_path = os.path.join("pictures", speaker + ".jpg")
    if not os.path.isfile(image_path) and speaker not in speakers_without_pictures and not re.search("[Mm]essage", speaker):
        print(speaker)

In [15]:
# Printing speakers in speakers_without_pictures with pictures
for speaker in speakers_without_pictures:
    image_path = os.path.join("pictures", speaker + ".jpg")
    if os.path.isfile(image_path) or speaker not in speaker_dict:
        print(speaker)
    if speaker not in speaker_dict:
        print(speaker)

In [18]:
for speaker in os.listdir("pictures"):
    speaker = speaker[:-4]
    if speaker not in speaker_dict or speaker in speakers_without_pictures:
        print(speaker)

Janet Arvizo2


In [30]:
messages = set()

for transcript in transcripts.values():
    for speaking_segment in transcript:
        if speaking_segment["type"] == "message":
            messages.add(speaking_segment["text"])

messages

{'ADRIAN MARIE McMANUS Having been previously sworn, resumed the stand and testified further as follows:',
 'ADRIAN MARIE McMANUS Having been sworn, testified as follows:',
 'ALBERT LAFFERTY Having been previously sworn, resumed the stand and testified further as follows:',
 'ALBERT LAFFERTY Having been sworn, testified as follows:',
 'ALICIA ROMERO Having been previously sworn, resumed the stand and testified further as follows:',
 'ALICIA ROMERO Having been sworn, testified as follows:',
 'ANDREW R. DIETZ Having been sworn, testified as follows:',
 'ANGEL VIVANCO Having been previously sworn, resumed the stand and testified further as follows:',
 'ANGEL VIVANCO Having been sworn, testified as follows:',
 'ANN KITE Having been sworn, testified as follows:',
 'ANN MARIE KITE Having been previously sworn, resumed the stand and testified further as follows:',
 'ANN SERRANO LOPEZ Having been sworn, testified as follows:',
 'ANNE MARIE SIMS Having been sworn, testified as follows:',
 'ANTH