We will implement a text tiling algorithm to segment the text into coherent sections. The algorithm is based on the following steps:
1. Split the text in sentences
2. Clean the sentences by removing stopwords and punctuation
3. Find frequency table for the words in the sentences
    - The x-axis is the sentence number
    - The y-axis is each word
    - Each cell is the frequency of the word in the sentence
4. Initialize blocks of text with a fixed size
5. Iterate until no change in the blocks
    - Calculate the intra-group cohesion for each block
    - Find blocks of text where the topic changes (the cohesion drops in these blocks)
    - Move block boundaries to the topic change points
6. Return the blocks of text

In [66]:
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from collections import Counter
from typing import List
from string import punctuation


In [67]:
class Sentence:
    def __init__(self, text: str, words: List[str]):
        self.text = text
        self.words = words
        
    def __eq__(self, other):
        return self.text == other.text
        
    def __repr__(self):
        return self.text
    
class Block:
    def __init__(self, sentences: List[Sentence]):
        self.sentences = sentences
        
    def __len__(self):
        return len(self.sentences)
    
    def __eq__(self, other):
        if len(self.sentences) != len(other.sentences):
            return False
        # true if all sentences equal
        return all([self.sentences[i] == other.sentences[i] for i in range(len(self.sentences))])
        
    def __repr__(self):
        return " ".join([sentence.text for sentence in self.sentences])
    
    def get_string_sentences(self) -> List[str]:
        return [" ".join(sentence.words) for sentence in self.sentences]

In [68]:
# retrieve text from res/mixed_texts.txt
with open('res/mixed_texts.txt', 'r') as file:
    raw_text = file.read()

In [82]:
def preprocess_text(text: str) -> List[Sentence]:
    """
    Preprocess the text by splitting it into sentences and removing unimportant words.
    :param text: the text to be preprocessed
    :return: a list of relevant words extracted from text
    """
    # get the list of stopwords
    stopwords_list = stopwords.words('english')
    
    sentence_list: List[Sentence] = []

    # split the text into sentences
    sentences = nltk.sent_tokenize(text)
    
    # remove stopwords and punctuation
    for sentence in sentences:
        relevant_words = [word.lower() for word in nltk.word_tokenize(sentence) if word.lower() not in stopwords_list and word not in punctuation]
        sentence_list.append(Sentence(sentence, relevant_words))
        
    return sentence_list

preprocess_text(raw_text)[:5]

[All the powers of old Europe have entered into a holy alliance to exorcise this spectre: Pope and Tsar, Metternich and Guizot, French Radicals and German police-spies.,
 Where is the party in opposition that has not been decried as communistic by its opponents in power?,
 Where is the opposition that has not hurled back the branding reproach of communism, against the more advanced opposition parties, as well as against its reactionary adversaries?,
 Two things result from this fact: Communism is already acknowledged by all European powers to be itself a power, It is high time that Communists should openly, in the face of the whole world, publish their views, their aims, their tendencies, and meet this nursery tale of the  spectre of communism with a manifesto of the party itself.,
 To this end, Communists of various nationalities have assembled in London and sketched the following manifesto, to be published in the English, French, German, Italian, Flemish and Danish languages.]

In [70]:
# # create a frequency table for the words in the sentences
# frequency_table = []
# 
# for sentence in cleaned_sentences:
#     # get counts of word frequency in the sentence
#     frequency = Counter(sentence)
#     frequency_table.append(frequency)
#     
# # print the frequency table for the first 5 sentences
# frequency_table[:5]

In [71]:
def initialize_blocks(sentences: List[Sentence], num_blocks: int) -> List[Block]:
    """
    Initialize blocks of text with a fixed size. Each block will contain a subset of the text.
    :param sentences: the list of sentences to be split into blocks
    :param num_blocks: the number of blocks to initialize
    :return: a list of blocks of text
    """
    blocks = []
    block_size = len(sentences) // num_blocks
    
    for i in range(num_blocks):
        start = i * block_size
        end = (i + 1) * block_size
        block = Block(sentences[start:end])
        blocks.append(block)
        
    return blocks

In [72]:
# Example usage
# sentences = preprocess_text(raw_text)
# blocks = initialize_blocks(sentences, 3)
# for i, block in enumerate(blocks):
#     print(f"Block {i+1}:")
#     for sentence in block:
#         print(sentence)
#     print()

In [73]:
def calculate_intra_group_cohesion(sentences: List[Sentence]) -> List[float]:

    # TF-IDF vectorization
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform([" ".join(sentence.words) for sentence in sentences])

    # Calculate cosine similarity between adjacent sentences
    cohesion_scores = []
    for i in range(len(sentences) - 1):
        sim_score = cosine_similarity(tfidf_matrix[i], tfidf_matrix[i+1])
        cohesion_scores.append(sim_score[0][0])
    
    return cohesion_scores

In [74]:
# Example usage
text = "This is a sample text. It demonstrates how to calculate intra-document cohesion between sentences. Intra-document cohesion is important for understanding the coherence of a document."
sentences = preprocess_text(text)
cohesion_score = calculate_intra_group_cohesion(sentences)
print("Intra-document cohesion score:", cohesion_score)

Intra-document cohesion score: [0.0, 0.4179779031144455]


In [75]:
def find_topic_change_points(cohesion_scores: List[float], num_splits: int) -> List[int]:
    """
    Identify the indices for topic change points based on the largest drops in cohesion scores.
    :param cohesion_scores: List of cohesion scores between sentences.
    :param num_splits: Number of splits (topic changes) to make.
    :return: Indices of sentences before which there's a potential topic change.
    """
    # Calculate drops in cohesion scores
    drops = [cohesion_scores[i-1] - cohesion_scores[i] + cohesion_scores[i+1] for i in range(1, len(cohesion_scores)-1)]
    
    # drop indexes sorted in ascending order
    drops_indexes_sorted_asc = np.argsort(drops)
    # deepest num_splits drop indexes, sorted in ascending order
    deepest_drops_indexes_sorted_asc = drops_indexes_sorted_asc[-num_splits:]
    # sort the deepest drops indexes in ascending order
    change_points = np.sort(deepest_drops_indexes_sorted_asc)
    
    return [cp + 1 for cp in change_points]  # Adjust indices to point to sentences after the drops.

In [76]:
def move_block_boundaries(sentences: List[Sentence], change_points: List[int]) -> List[Block]:
    """
    Create new blocks based on the identified topic change points.
    :param sentences: List of all sentences.
    :param change_points: Indices of sentences indicating topic changes.
    :return: New list of blocks adjusted for topic changes.
    """
    blocks = []
    # Index of the first sentence in the currently iterated block
    start_index = 0

    for cp in change_points:
        # Calculate temporary block size
        blocks.append(Block(sentences[start_index:cp]))
        start_index = cp
    
    return blocks

In [None]:
def text_tiling(text: str, num_blocks: int, max_block_size: int) -> List[Block]:
    """
    Segment the text into coherent sections using the text tiling algorithm.
    1. Preprocess the text so that we have a list of Sentence objects
    2. Initialize Blocks with a fixed size
    3. Iterate until no change in the Blocks
        - Calculate the intra-block cohesion for each Sentence
        - Find Sentences where the topic changes (the cohesion is lower than the one of the adjacent Sentences)
        - Move Block boundaries to the topic change points
    4. Return the Blocks
    :param text: the text to be segmented
    :param num_blocks: the number of Blocks to split the text into
    :param max_block_size: the maximum number of Sentences in each Block
    :return: 
    """
    
    # Preprocess the text
    sentences: List[Sentence] = preprocess_text(text)
    
    # Initialize blocks of text
    blocks: List[Block] = initialize_blocks(sentences, num_blocks)
    
    n_times_no_change = 0
    remaining_iterations = 100
    # Iterate until no change in the blocks
    while n_times_no_change < 3 or remaining_iterations > 0:
        # Calculate the intra-group cohesion for each block
        
        cohesion_scores: List[float] = calculate_intra_group_cohesion(sentences)
        # print(cohesion_scores)
        
        # for block in blocks:
        #     # Calculate the cohesion score for the block
        #     block_cohesion_scores = calculate_intra_group_cohesion(block)
        #     print(block_cohesion_scores)
        #     cohesion_scores = cohesion_scores + block_cohesion_scores
        
        # Find blocks of text where the topic changes
        change_points = find_topic_change_points(cohesion_scores, num_blocks)
        
        # Move block boundaries to the topic change points
        new_blocks = move_block_boundaries(sentences, change_points)
        
        # Check if the blocks have changed
        if new_blocks == blocks:
            n_times_no_change += 1
        else:
            n_times_no_change = 0
            blocks = new_blocks
            
        remaining_iterations -= 1
        
    print("Number of remaining iterations:", remaining_iterations)
    return blocks

In [0]:

# test
test_text = "Sentence 1 talks about sentences and talks about about. While the penguin penguin while, sings the penguin while while while. A sardinian sardin Sardinian."
blocks = text_tiling(test_text, 3, 5)

for i, block in enumerate(blocks):
    print(f"Block {i+1}:")
    for sentence in block.sentences:
        print(sentence)
    print()