# CAAT AmAus Data Processing

This notebook is used for processing the plain Corpus of Americans and Australians Talking (CAAT) AmAus Word documents, without any conversation analysis markup.

This notebook requires a single DOCX input file, and outputs a transformed CSV.

Some of the cells in this notebook require the user to update the variables according to the document or review the cell output, and these are prefixed with the **User Input** and **User Review** labels respectively.

In [None]:
%%capture
# Before we begin, let's make sure that we install all the requirements that we need
import sys
!{sys.executable} -m pip install -r requirements.txt

## Import libraries

In [None]:
from docx import Document
from pathlib import Path
import re
import pandas as pd
from io import StringIO
import numpy as np
import unicodedata

## **User Input**: Specify document location

Edit the sections in quotes to specify the file path and name of the Word document. If the document is in the same location as this notebook is being run, keep the default `AmAus01-31 transcripts_June2025 version`, otherwise update it. `.docx` is not needed at the end of the document name.

In [None]:
document_directory = "AmAus01-31 transcripts_June2025 version"
document_name = "AmAus07_transcript_plain"

## Load document as text

The document is loaded as plain text. The following fixes are also applied:
- space and tab combinations directly after the speaker code converted to single tab
- colon added after speaker code if missing
- tabs not dividing the speaker and text columns converted to space

In [None]:
# Load the document
doc_path = Path(document_directory) / f"{document_name}.docx"
doc = Document(doc_path)

# Extract text from all paragraphs
full_text = []
for paragraph in doc.paragraphs:
    full_text.append(paragraph.text)

# Join all paragraphs into a single string
text_content = '\n'.join(full_text)

# Fix space and tab combinations after speaker code, and add colon after speaker code if missing
text_content = re.sub(r'^[\t ]+', r'\t', text_content, flags=re.MULTILINE)
text_content = re.sub(r'^([A-Z]):[\t ]+', r'\1:\t', text_content, flags=re.MULTILINE)
text_content = re.sub(r'^([A-Z])[\t ]+', r'\1:\t', text_content, flags=re.MULTILINE)

# Replace " \t" with just space if the line already contains a tab before it
text_content = re.sub(r'(\t.*) \t', r'\1 ', text_content, flags=re.MULTILINE)

print(text_content)

## **User Review**: Restructure data if speaker tags are missing

For any lines missing speakers, if the speaker occurs in the line above, that line will be attached to the end of the previous one, adding a space between the sections if not already present. If an 'END OF TRANSCRIPT' line occurs, this will also be removed.

This step will also output a `changes_log.txt` file to the same folder as your input file for review of all changes applied.

In [None]:
# remove END OF TRANSCRIPT line
text_content = re.sub(r'^.*END OF TRANSCRIPT.*$', r'', text_content, flags=re.MULTILINE)

rules = [
    (re.compile(r'^([A-Z]:)(\t.* $)\n\t(.*)$', re.MULTILINE), r'\1\2\3'),
    (re.compile(r'^([A-Z]:)(\t.*[^ ]$)\n\t(.*)$', re.MULTILINE), r'\1\2 \3'),
    (re.compile(r'^([A-Z]:)(\t.*?)(?:\s*)\n(?![A-Z]:)([^\n]+)$', re.MULTILINE), None),
]

with open("changes_log.txt", "w", encoding="utf-8") as log:
    while True:
        total_changes = 0

        for pat, repl in rules:

            def replacer(match):
                before = match.group(0)

                speaker = match.group(1)
                col2 = match.group(2)
                continuation = match.group(3).lstrip()

                after = f"{speaker}{col2} {continuation}"

                log.write("CHANGED:\n")
                log.write(before)
                log.write("\n→\n")
                log.write(after)
                log.write("\n" + "-" * 40 + "\n")

                return after

            text_content, n = pat.subn(replacer, text_content)
            total_changes += n

        if total_changes == 0:
            break

## Convert text to a Pandas DataFrame

Tab-separated text is converted to columns.

In [None]:
# Split columns only when tabs exist and make each line a row
rows = [line.split("\t") for line in text_content.splitlines()]
df = pd.DataFrame(rows)
df

## **User Review**: Check rows with empty cells

The output here should only contain the header and footer sections. Any other output indicates there may be missing or incorrect data in the Word document. 

In [None]:
empty_cells = df[(df.isna() | (df == "")).any(axis=1)]
print(empty_cells)

## **User Input**: Identify header and footer sections to remove

Edit the numbers to specify how many lines to remove from the start and the end of the document, so that only the speaker and transcript section is left. Use the index from the previous cell to check the header and footer. For the header, add 1 to the last index number of the header as the count starts from zero. Note that this includes paragraph breaks. If there is no footer, set the number to `0`.

```
0    Transcript: xxx
1    Recording date: xxx
2    Length of audio recording: xxx
3    Length of video recording: xxx
4    Transcriber: xxx
5   
6    Speakers:
7    A:	xxx
8    B:	xxx
9    C:	xxx
10   
11   
```
Last row of header is index 11 = 12 rows

...
```
430  END OF TRANSCRIPT
```
1 row

In [None]:
header_rows = 11
footer_rows = 1

## Remove header and footer sections

In [None]:
# Remove header section
df = df.iloc[header_rows:].reset_index(drop=True)

# Remove footer section
df.drop(df.tail(footer_rows).index, inplace = True)
df

## Remove any rows where the entire line is blank

In [None]:
# Find rows where all cells are NaN or empty string
rows_to_delete = df[(df.isna() | df.eq("")).all(axis=1)]

# Print rows being removed
for idx in rows_to_delete.index:
    print(f"Removing empty row at index: {idx}")
else:
    print("No empty rows")

# Remove the rows
df = df[~((df.isna() | df.eq("")).all(axis=1))]
df = df.reset_index(drop=True)

## **User Review**: Check for any rows that contain empty cells

If the output is Empty DataFrame, there are no empty cells.

In [None]:
empty_cells = df[(df.isna() | (df == "")).any(axis=1)]
print(empty_cells)

## Add column names

In [None]:
df.columns = ['speakerID', 'text']

## **User Input**: Specify speaker and RA codes

Update the sections in quotes with the code in the `speakerID` column, the name of the speaker, and the code in the RO-Crate metadata spreadsheet (these are found in the `@id` column in the People tab). Scroll up to the output of 'Load document as text' if you need to see the speaker list again.

At least one American speaker, Australian speaker and research assistant (RA) will be required for all documents, but if there are more speakers involved, add these in the additional participants section.

In [None]:
# American speaker
am_speaker_orig = "D:"
am_speaker_name = "John Smith"
am_speaker_new = "#AmM03"

# Australian speaker
aus_speaker_orig = "E:"
aus_speaker_name = "Jane Smith"
aus_speaker_new = "#AusF05"

# Research Assistant
ra_orig = "L:"
ra_name = "Joan Smith"
ra_new = "#AusF02"

## Additional participants if needed
extra_speaker1_orig = "G:"
extra_speaker1_name = "John Smith"
extra_speaker1_new = "#AusF04"

extra_speaker2_orig = "Z:"
extra_speaker2_name = "Jane Smith"
extra_speaker2_new = "#AusF04"

## Update speakerIDs

In [None]:
df["speakerID"] = df["speakerID"].replace(to_replace=am_speaker_orig, value=am_speaker_new)
df["speakerID"] = df["speakerID"].replace(to_replace=aus_speaker_orig, value=aus_speaker_new)
df["speakerID"] = df["speakerID"].replace(to_replace=ra_orig, value=ra_new)
df["speakerID"] = df["speakerID"].replace(to_replace=extra_speaker1_orig, value=extra_speaker1_new)
df["speakerID"] = df["speakerID"].replace(to_replace=extra_speaker2_orig, value=extra_speaker2_new)
df

## **User Review**: Check for any rows where the speakerID wasn't updated

If the output is Empty DataFrame, there are no errors.

In [None]:
filtered = df[~df["speakerID"].str.contains("#", na=False)]
print(filtered)

## Add 'name' column

In [None]:
df.insert(1, "name", "")
df.loc[df["speakerID"] == am_speaker_new, "name"] = am_speaker_name
df.loc[df["speakerID"] == aus_speaker_new, "name"] = aus_speaker_name
df.loc[df["speakerID"] == ra_new, "name"] = ra_name
df.loc[df["speakerID"] == extra_speaker1_new, "name"] = extra_speaker1_name
df.loc[df["speakerID"] == extra_speaker2_new, "name"] = extra_speaker2_name
df

## Add 'section' columns

In [None]:
# add section columns
df.insert(loc=3, column='section', value='MAIN')
df

## Remove any time codes

This step will search for any timecodes in formats like `(~0:33)` and `(~10:00)`.

In [None]:
before = df["text"].copy()

df["text"] = df["text"].str.replace(r'^\([\~0-9][\~0-9:\.]*\) ', '', regex=True)
df["text"] = df["text"].str.replace(r' \([\~0-9][\~0-9:\.]*\) ', ' ', regex=True)
df["text"] = df["text"].str.replace(r' \([\~0-9][\~0-9:\.]*\)$', ' ', regex=True)
df["text"] = df["text"].str.replace(r'^\([\~0-9][\~0-9:\.]*\)', '', regex=True)
df["text"] = df["text"].str.replace(r'\([\~0-9][\~0-9:\.]*\)', ' ', regex=True)
df["text"] = df["text"].str.replace(r'\([\~0-9][\~0-9:\.]*\)$', ' ', regex=True)

with open("timecodes_log.txt", "w", encoding="utf-8") as log:
    for i, (b, a) in enumerate(zip(before, df["text"])):
        if b != a:
            log.write("CHANGED:\n")
            log.write(str(b) + "\n")
            log.write("→\n")
            log.write(str(a) + "\n")
            log.write("-" * 40 + "\n")

## Clean characters in DataFrame

Applies some minor character encoding fixes for consistency.

In [None]:
replacements = {
    '“': '"',
    '”': '"',
    '‘': "'",
    '’': "'",
    '—': '-',
    '–': '-',
}

def clean_text(val):
    if isinstance(val, str):
        val = val.strip()
        for old, new in replacements.items():
            val = val.replace(old, new)
    return val

df_clean = df.map(clean_text)
df_clean

## **User Review**: Review characters in cleaned DataFrame

A list of all the characters in the dataframe is printed below. Scan through the list to check these are all expected.

In [None]:
chars = set("".join(df_clean.astype(str).stack()))
char_list = sorted(chars)

for c in char_list:
    code = f"U+{ord(c):04X}"
    try:
        name = unicodedata.name(c)
    except ValueError:
        name = "<NO NAME>"
    print(f"{c!r}  {code}  {name}")

## **User Input**: Identify preamble rows

The 'section' column is pre-populated by default with MAIN in each row. Run the cell below to view the first 20 rows of the document. If you can't see the final line of the preamble, increase `20` to a higher number and re-run.

In [None]:
df_clean.head(20)

## **User Input**: Update preamble range

Edit the numbers below with the correct range of the preamble, using the index column numbers.

In [None]:
pre_row_start = 0
pre_row_end = 3

## **User Input**: Identify postamble rows

The 'section' column is pre-populated by default with MAIN in each row. Run the cell below to view the last 20 rows of the document. If you can't see the first line of the postamble, increase `20` to a higher number and re-run.

In [None]:
df_clean.tail(20)

## **User Input**: Update postamble range

Edit the numbers below with the correct range of the postamble, using the index column numbers.

In [None]:
post_row_start = 564
post_row_end = 566

## Update DataFrame with PRE and POST sections

This will use the ranges specified previously to update MAIN to PRE and POST where applicable.

In [None]:
df_clean.loc[pre_row_start:pre_row_end, "section"] = "PRE"
df_clean.loc[post_row_start:post_row_end, "section"] = "POST"
df_clean

## Export final version to CSV

This will create a CSV version of the finalised document and save it in the same location as this notebook. It uses the same document name as the input Word document, e.g. AmAus01_transcript_plain.csv

In [None]:
df_clean.to_csv(f"{document_name}.csv", index=False)