# Imports and Functions

In [1]:
import os
import re
import pandas as pd
from tqdm import tqdm
import csv
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from scipy.spatial.distance import pdist, squareform
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import plotly.express as px
import plotly.io as pio

In [2]:
# text processing functions
def upload_corpus_csv(path):
    """
    Function to upload CSV files containing the texts and converting them into a dataframe
    :return: a list of dataframes
    """
    texts = os.listdir(path)
    corpus = []
    for text in texts:
        try:
            df = pd.read_csv(f"{path}/{text}", encoding="utf-8").fillna("")
            corpus.append(df)
        except Exception as e:
            print(f"can't process {text}:\n{e}")
    return corpus

def split_df_by_column_value(df, column):
    """
    Function to split the text dataframes according to a column. Used to separate text to lines
    :param df: dataframe containing one word in each row.
    :param column: the column by which to split the dfs, perferably `text` or `line`.
    :return: a list of dataframes split according to the value given to the column parameter.
    """
    dfs = []
    column_values = df[column].unique()
    for value in column_values:
        split_df = df[df[column]==value]
        dfs.append(split_df)
    return dfs

def df2str(df, column, break_perc=1, mask=True, segmentation=True):
    """
    Function to convert the values from the text dataframe to a string of text with line breaks.
    :param df: the text dataframe
    :param column: the chosen column from the dataframe to construct the text from (preferably Unicode, cf, or lemma)
    :param break_perc: a parameter which dictates whether to include broken words depending on the percentage of how broken they are. Compares this value to the `break_perc` column in the dataframe. Parameter is set to 1 (i.e. all words, whether broken or not, are included); can be any float between 0 and 1.
    :param mask: boolean whether to mask named entities or not; set to True.
    :return: a string which includes all the words in the texts according to the column chosen. Extra spaces that were between broken words or empty lines are removed.
    """
    # check if column exists in dataframe. If not, return empty text.
    if column not in df.columns:
        return ("", 0, 0)
    else: 
        # remove rows that include duplicate values for compound words
        if column not in ["norm", "cf", "sense", "pos"]:
            df = df.drop_duplicates("ref").copy()
        # if column entry is empty string, replace with UNK (can happen with normalization or lemmatization)
        mask_empty = df[column]==""
        df[column] = df[column].where(~mask_empty, other="UNK")
        # mask proper nouns
        if mask and "pos" in df.columns:
            mask_bool = df["pos"].isin(["PN", "RN", "DN", "GN", "MN", "SN", "n"])
            df[column] = df[column].where(~mask_bool, other=df["pos"])
        # change number masking from `n` to `NUM`
        if mask:
            mask_num = df[column]=="n"
            df[column] = df[column].where(~mask_num, other="NUM")
            # add masking to male PNs based on determinative
            has_determinative = df[column].str[0]=="𒁹"
            df[column] = df[column].where(~has_determinative, other="PN")
        # remove rows without break_perc
        if "" in df["break_perc"].unique().tolist():
            df = df[df["break_perc"]!=""].copy()
        # filter according to break_perc
        mask_break = df["break_perc"] <= break_perc
        df[column] = df[column].where(mask_break, other="X")
        # calculate text length with and without UNK and x tokens
        text_length_full = df.shape[0]
        mask_partial = df[column].isin(["UNK", "X", "x"])
        text_length_partial = text_length_full - sum(mask_partial)
        # create text lines
        text = ""
        df_lines = split_df_by_column_value(df, "line")
        for line in df_lines:
            word_list = list(filter(None, line[column].to_list()))
            if word_list != []:
                text += " ".join(map(str, word_list)).replace("x", "X").strip() + "\n"

        if segmentation == False:
            # remove all white spaces (word segmentation and line breaks)
            text = re.sub(r"[\s\u00A0]+", "", text)
            
        # double check length of words in text matches number of rows:
        len_text = len([word for word in text.replace("\n", " ").split(" ") if word != ""])
        if len_text != text_length_full:
            print(f"Number of words in text ({len_text}) does not match number of rows in dataframe ({text_length_full})!\nSee {df.loc[0, 'text']}")

        return (text, text_length_full, text_length_partial)
    
def get_segmented_unicode_texts(corpus, break_perc=1, mask=True):
    """
    Function to convert the dataframes into strings of segmented unicode texts.
    :param corpus: a list of dataframes
    :param break_perc: a parameter which dictates whether to include broken words depending on the percentage of how broken they are. Compares this value to the `break_perc` column in the dataframe. Parameter is set to 1 (i.e. all words, whether broken or not, are included); can be any float between 0 and 1.
    :param mask: boolean whether to mask named entities or not; set to True.
    :return: a dictionary where the keys are the text IDs and the values are the segmented unicode texts
    """
    texts_dict = {}
    for df in corpus:
        # get the text number from the dataframe "text" column
        key = df.loc[0, "text"]
        text, text_length_full, text_length_partial = df2str(df, "unicode_word", break_perc, mask)
        texts_dict[key] = (text, text_length_full, text_length_partial)
    return texts_dict

def add_texts_dict_to_dataframe(df, text_dict):
    text_df = pd.DataFrame(text_dict).transpose().rename(columns={0: "Text",
                                                                  1: f"Text_length_full",
                                                                  2: f"Text_length_partial"})
    new_df = df.join(text_df)
    return new_df

In [3]:
# vectorization and tsne functions
# vectorizing texts

def vectorize(vectorizer, corpus, analyzer="word", ngram_range=(1,1), max_df=1.0, min_df=1, max_features=None, stop_words=["UNK", "X"]):
    """
    Converts a list of texts into a term-document matrix based on TF-IDF scores.
    Full full documentation of the variables of TfidfVectorizer from sklearn, see:
    https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html#sklearn.feature_extraction.text.TfidfVectorizer
    :param corpus: a dataframe in which the texts are in a `"Text"` column and the dataframe's index is the text ids.
    :param analyzer: whether the feature should be made of word or character n-grams.
                     use `"word"` for word features, `"char_wb"` for character n-grams within word boundaries,
                     or `"char"` for character n-grams without word boundaries.
    :param ngram_range: the lower and upper boundary of the range of n-values for different n-grams to be extracted.
    :param max_df: threshold to ignore terms that have a document frequency above a certain value.
                   If the threshold is a float, it represent a proportion of the documents.
                   If the threshold is an integer, it represents absolute counts of number of documents in which the terms appears.
    :param min_df: threshold to ignore terms that have a document frequency below a certain value.
                   If the threshold is a float, it represent a proportion of the documents.
                   If the threshold is an integer, it represents absolute counts of number of documents in which the terms appears.
    :param max_features: if not `None`, build a vocabulary that only considers the top max_features ordered by term frequency across the corpus.
    :param stop_words: if `None`, no stop words are used. Otherwise, can be a list with words to be removed from resulting tokens.
    :return: `counts` the raw counts of the vectorizer,
             `counts_df` a dataframe of the counts where the index is the text ids and the columns are the tokens,
             `stop_words` an updated list of stop words
    """

    if vectorizer == "tfidf":
        vectorizer = TfidfVectorizer(input="content", lowercase=False, analyzer=analyzer,
                                 token_pattern=r"(?u)\b\w+\b", ngram_range=ngram_range,
                                 max_df=max_df, min_df=min_df, max_features=max_features, stop_words=stop_words)
    elif vectorizer == "count":
        vectorizer = CountVectorizer(input="content", lowercase=False, analyzer=analyzer,
                                 token_pattern=r"(?u)\b\w+\b", ngram_range=ngram_range,
                                 max_df=max_df, min_df=min_df, max_features=max_features, stop_words=stop_words,
                                 binary=True)

    counts = vectorizer.fit_transform(corpus["Text"].tolist()).toarray()
    stop_words = vectorizer.stop_words_

    # saving the vocab used for vectorization, and switching the dictionary so that the feature index is the key
    vocab = vectorizer.vocabulary_
    switched_vocab = {value: key for key, value in vocab.items()}
    # adding the vocab words to the counts dataframe for easier viewing.
    column_names = []
    x = 0
    while x < len(switched_vocab):
        column_names.append(switched_vocab[x])
        x += 1

    counts_df = pd.DataFrame(counts, index=corpus.index, columns=column_names)

    return (counts, counts_df, stop_words)

# calculating distances between vectorized documents

def distance_calculator(counts, metric, text_ids):
    """
    Converts a term-document matrix to a text similarity matrix.
    :param counts: the raw counts from the `vectorize` function.
    :param metric: the metric by which to calculate the distances between the texts in the corpus.
                   Valid metrics are:
                   ‘braycurtis’, ‘canberra’, ‘chebyshev’, ‘cityblock’, ‘correlation’, ‘cosine’,
                   ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulczynski1’,
                   ‘mahalanobis’, ‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’,
                   ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’, ‘yule’.
    :param text_ids: list of unique text_ids.
    :return: a dataframe matrix of distance between texts.
    """
    return pd.DataFrame(squareform(pdist(counts, metric=metric)), index=text_ids, columns=text_ids)

# reducing dimensions with pca or tsne

def reduce_dimensions_pca(df, metadata):
    """
    Reduces multidimensional data into two dimensions using PCA.
    :param df: dataframe holding the dimensions to reduce. All columns should include numerical values only.
               The dataframe's index should hold the unique text ids.
    :param metadata: the rest of the metadata in the corpus, to help visualize the resulting clusters in meaningful ways.
                     The metadata's index should hold the unique text ids.
    :return: a dataframe with the coordinates of the two remaining dimensions on all other columns from the metadata.
    """
    pca = PCA(n_components=2)
    reduced_data = pca.fit_transform(df)
    reduced_df = pd.DataFrame(data=reduced_data, index=df.index, columns=["component 1", "component 2"])
    reduced_df_metadata = metadata.join(reduced_df)
    return reduced_df_metadata

def reduce_dimensions_tsne(df, perplexity, n_iter, metric, metadata):
    """
    Reduces multidimensional data into two dimensions using TSNE.
    See full documentation of sklearn's TSNE on:
    https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html
    :param df: dataframe holding the dimensions to reduce. All columns should include numerical values only.
               The dataframe's index should hold the unique text ids.
    :param perplexity: perplexity is a measure the weighs the importance of nearby versus distant points when creating a lower-dimension mapping.
                       t-SNE first converts the distances between points into conditional probabilities that represent similarities,
                       using Gaussian probability distributions.
                       The perplexity parameter influences the variance used to compute these probabilities.
                       A higher perplexity leads to a broader Gaussian that considers a larger number of neighbors when assessing similarity.
                       Lower perplexity puts more focus on the local structure and considers fewer neighbors.
                       A good perplexity depends greatly on dataset size and density.
                       The documentation recommends a value between 5 and 50.
                       We recommend to start with the square root of the length of the corpus.
    :param n_iter: maximum number of iterations for optimization.
    :param metric: the metric to be used when calculating distances between vectors.
                   Valid metrics are:
                   ‘braycurtis’, ‘canberra’, ‘chebyshev’, ‘cityblock’, ‘correlation’, ‘cosine’,
                   ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulczynski1’,
                   ‘mahalanobis’, ‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’,
                   ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’, ‘sqeuclidean’, ‘yule’.
    :param metadata: the rest of the metadata in the corpus, to help visualize the resulting clusters in meaningful ways.
                     The metadata's index should hold the unique text ids.
    :return: a dataframe with the coordinates of the two remaining dimensions on all other columns from the metadata.
    """
    tsne = TSNE(n_components=2, perplexity=perplexity, n_iter=n_iter, metric=metric, init="pca", random_state=42)
    reduced_data = tsne.fit_transform(df)
    reduced_df = pd.DataFrame(data=reduced_data, index=df.index, columns=["component 1", "component 2"])
    reduced_df_metadata = metadata.join(reduced_df)
    return reduced_df_metadata

# Main

In [4]:
# upload text CSVs
corpus = upload_corpus_csv("land_sale_annotated")
corpus.extend(upload_corpus_csv("slave_sale_annotated"))
corpus.extend(upload_corpus_csv("unknown_sale_annotated"))

In [6]:
# create metadata file with a new column for visualizing the type of sale document
land_sale = ['lease document',
             'lease document (field)',
             'sales document (field)',
             'sales document (field, person)',
             'sales document (garden)',
             'sales document (house)',
             'sales document (inheritance)',
             'sales document (plot)']
slave_sale = ['sales document (duplicate)',
              'sales document (person)',
              'sales document (person, marriage?)',
              'sales document (person, restricted)']
unknown_sale = ['sales document']

df = pd.read_csv("NA_archival_texts_metadata_050724.csv",
                 encoding="utf-8",
                 index_col="ID").fillna("")

df["sale_type"] = df["subgenre"].where(~df["subgenre"].isin(land_sale), other="Land Sale")
df["sale_type"] = df["sale_type"].where(~df["subgenre"].isin(slave_sale), other="Slave Sale")
df["sale_type"] = df["sale_type"].where(~df["subgenre"].isin(unknown_sale), other="Unknown Sale")

# add processed texts to metadata
text_dict = get_segmented_unicode_texts(corpus, break_perc=.25, mask=True)
df = add_texts_dict_to_dataframe(df, text_dict)

In [7]:
# vectorize and reduce dimensions
final_df = df[df["Text_length_partial"]>=3]
counts, counts_df, stop_words = vectorize(vectorizer="tfidf", corpus=final_df)
matrix = distance_calculator(counts, "cosine", final_df.index)
reduced_tsne = reduce_dimensions_tsne(matrix,
                                      perplexity=matrix.shape[0]**0.5,
                                      n_iter=5000,
                                      metric="euclidean",
                                      metadata=final_df)



In [12]:
# create figure

import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

# Get colormap
cmap = plt.get_cmap('tab20')  # This is a colormap with 10 colors

# Create a list of colors from the colormap
colors = [mcolors.rgb2hex(cmap(i)) for i in range(cmap.N)]

region_order = ["Kuyunjik (Nineveh)", "Nimrud (Kalhu)", "Qalat Sherqat (Assur)",
                "Balawat (Imgur-Enlil)", "Dur-Katlimmu", "Western periphery"]
reduced_tsne['region-tsne'] = pd.Categorical(reduced_tsne['region-tsne'], categories=region_order, ordered=True)
reduced_tsne = reduced_tsne.sort_values(by='region-tsne')

colors = ['#A25FAC', '#5F8AEE', '#64D380', '#FFF56A', '#FF8D00', '#E50000']

fig = px.scatter(reduced_tsne, x="component 1", y="component 2",
                 color="region-tsne", #symbol="sale_type",
                 color_discrete_sequence=colors, size=reduced_tsne["Text_length_partial"].astype(int),
                 title="NA Sale Document Corpus",
                 hover_data=["subgenre", "project", "date_for_viz", reduced_tsne.index, "Text_length_partial", "Text_length_full"]
                 )
fig.update_traces(marker=dict(line=dict(width=1, color='black')))

fig.update_layout(
    legend_title_text='Provenience',  # Change the legend heading
    xaxis_title='',  # Remove x-axis label
    yaxis_title=''   # Remove y-axis label
)

fig.show()