## Data preparation pipeline
- Load file to prepare
- Clean the file
- Create an index
- Concatenate



## Functions

In [1]:
from bs4 import BeautifulSoup
import os
import unicodedata
import json
from pprint import pprint as pp
import re

In [2]:

def is_complex(file):
    # Evaluates whether the file structure is simple or complex
    # More about the Sefaria text structures:
    # https://developers.sefaria.org/docs/the-index-schema
    # Returns True if the structure is complexe

    # open the json file and load the content
    with open(file, 'r') as f:
        data = json.load(f)
        
    # Check if 'text' key exists and is a dictionary
    if 'text' in data and isinstance(data['text'], dict):
        return True
    else:
        return False

In [3]:
def browse_simple_structure_and_show_unique_chars(filename):
    '''
    This function reads a simple structured file and returns a set of unique characters in the file.
    The aim is to quickly indentify characters that should be removed from the text.
    '''

    with open(filename, "r") as f:
        data = json.load(f)

    character_set = set()

    for chapter in data["text"]:
        for verse in chapter:
            if isinstance(verse, str):
                character_set.update(verse)
            elif isinstance(verse, list):
                for sub_verse in verse:
                    character_set.update(sub_verse)

    return character_set 

In [4]:
def browse_complex_structure_and_show_unique_chars(filename):

    '''
    This function reads a complex structured file and returns a set of unique characters in the file.
    The aim is to quickly indentify characters that should be removed from the text.
    '''

    def browse_and_show_unique_chars(nodes, level=0, character_set=None):
        '''
        recursively browse the nodes and return a set of unique characters in the file.
        '''

        if character_set is None:
            character_set = set()

        for value in nodes.values():
            # If the node value is a list, add the unique characters of each list item to the set
            if isinstance(value, list):
                for elem in value:
                    # If the element is a list, iterate through its items and add their unique characters to the set
                    if isinstance(elem, list):
                        for sub_elem in elem:
                            if isinstance(sub_elem, list):
                                for sub_sub_elem in sub_elem:
                                    character_set.update(sub_sub_elem)                         
                            else:
                                character_set.update(sub_elem)
                    else:
                        character_set.update(elem)

            # If the node value is a dictionary, recursively call the browse_nodes function on the dictionary.
            elif isinstance(value, dict):
                browse_and_show_unique_chars(value, level + 1, character_set)

        return character_set

    with open(filename, "r") as f:
        data = json.load(f)

    nodes = data['text']

    return browse_and_show_unique_chars(nodes, level=0, character_set=None)


In [5]:
def browse_file_and_show_unique_chars(filename):
    '''
    Identify whether the structure of the file is simple or complex,
    and call the appropriate function to return the unique characters in the file.
    '''
    if is_complex(filename):
        return browse_complex_structure_and_show_unique_chars(filename)
    else:
        return browse_simple_structure_and_show_unique_chars(filename)

In [6]:
def create_index_from_simple_structured_files(filename):
    '''
    Create a function to browse simple structures files and create index entries.
    With the index, we can then quickly identify the reference of a text chunk,
    knowing the start and end character position of the text chunk in the book.
    '''
    # Load the file
    with open(filename, "r") as f:
        data = json.load(f)

    # Create a list to store index entries
    index = {}
    # add the title of the book to the index
    index["title"] = data["title"]
    index['categories'] = data['categories']
    index['sectionNames'] = data['sectionNames']
    index['lines'] = []

    # Generate a unique identifier for each of the smallest text unit,
    # E.g. verses.
    def generate_uid(title, number):
        return f"{title.replace(' ', '_')}_{number}"

    # Counts the chars, and fills the index with start and end char position
    # Hebrew diacritics are removed from the count.
    def browse_text_and_count_verses_chars(text):
        def remove_hebrew_diacritics(text):
            normalized_text = unicodedata.normalize('NFKD', text)
            return ''.join(c for c in normalized_text if not unicodedata.combining(c))

        # Variable to track unique identification number
        uid = 1
        # Initialize character counter
        verse_start_char = 1
        # Intialize chapter counter
        chapter_number = 1

        for chapter in data['text']:
            # Initialize verse counter
            verse_number = 1

            for verse in chapter:
                if isinstance(verse, str):
                    verse = remove_hebrew_diacritics(verse)
                    verse_end_char = verse_start_char + len(verse) - 1

                    # Add verse to index with UID
                    index['lines'].append({
                        "uid": generate_uid(index['title'], uid),  # UID unique
                        "chapter_number": chapter_number,
                        "verse_number": verse_number,
                        "start_char": verse_start_char,
                        "end_char": verse_end_char,
                        "length": len(verse),
                        "text": verse
                    })

                    # Increment UID for next verse
                    uid += 1

                    # Increment verse number for next verse
                    verse_number += 1

                    # Update character counter for next verse
                    verse_start_char = verse_end_char + 1
                elif isinstance(verse, list):

                    # Initialize subverse counter
                    sub_verse_number = 1

                    for sub_verse in verse:
                        sub_verse = remove_hebrew_diacritics(sub_verse)
                        sub_verse_end_char = verse_start_char + len(sub_verse) - 1

                        # Add verse to index with UID
                        index['lines'].append({
                            "uid": generate_uid(index['title'], uid),  # UID unique
                            "chapter_number": chapter_number,
                            "verse_number": verse_number,
                            "sub_verse_number": sub_verse_number,
                            "start_char": verse_start_char,
                            "end_char": sub_verse_end_char,
                            "length": len(sub_verse),
                            "text": sub_verse
                        })

                        # Increment UID for next verse
                        uid += 1

                        # Increment verse number for next verse
                        sub_verse_number += 1

                        # Update character counter for next verse
                        verse_start_char = sub_verse_end_char + 1
                    verse_number += 1
            chapter_number += 1
    
    # Load the nodes
    text = data['text']
    browse_text_and_count_verses_chars(text)

    # Create a new directory named "indexes" if it doesn't exist
    index_dir = "indexes"
    if not os.path.exists(index_dir):
        os.makedirs(index_dir)

    # Save the index to a file, in the "indexes" directory
    title = index["title"]
    index_filename = os.path.join(index_dir, "index_" + title.replace(' ', '_') + ".json")
    with open(index_filename, "w") as f:
        json.dump(index, f, indent=2, ensure_ascii=False)


In [7]:
def create_index_from_complex_structured_files(filename):
    '''
    Create a function to browse complex structured files and create index entries.
    With the index, we can then quickly identify the reference of a text chunk,
    knowing the start and end character position of the text chunk in the book.
    '''

    # Load the file
    with open(filename, "r") as f:
        data = json.load(f)

    # Create a list to store index entries
    index = {}
    # add the title of the book to the index
    index["title"] = data["title"]
    index['categories'] = data['categories']
    index['lines'] = []

    def generate_uid(title, number):
        return f"{title.replace(' ', '_')}_{number}"

    def browse_nodes_and_count_verses_chars(nodes, parent_keys=[], level=0):
        def remove_hebrew_diacritics(text):
            normalized_text = unicodedata.normalize('NFKD', text)
            return ''.join(c for c in normalized_text if not unicodedata.combining(c))

        # Variable to track unique identification number
        uid = 1

        # Initialize character counter
        verse_start_char = 1

        item_lvl_1_number, item_lvl_2_number, item_lvl_3_number = 0, 0, 0


        for key, value in nodes.items():
            # Add the current key to the list of parent keys
            current_keys = parent_keys + [key]

            if isinstance(value, list):
                # If the node value is a list, check if it contains strings or lists

                # Initialize verse counter
                item_lvl_1_number = 1

                for item_lvl_1 in value:
                # 1st lvl: If the item is a string, treat it as a single verse
                    if isinstance(item_lvl_1, str):
                        # remove hebrew vowels
                        item_lvl_1 = remove_hebrew_diacritics(item_lvl_1)
                        verse_end_char = verse_start_char + len(item_lvl_1) - 1

                        # Add verse to index with UID
                        index['lines'].append({
                            "uid": generate_uid(index['title'], uid),  # UID unique
                            "parent_titles": current_keys,
                            "item_lvl_1_number": item_lvl_1_number,
                            "start_char": verse_start_char,
                            "end_char": verse_end_char,
                            "length": len(item_lvl_1),
                            "text": item_lvl_1
                        })

                        # Increment UID for next verse
                        uid += 1


                        # Update character counter for next verse
                        verse_start_char = verse_end_char + 1


                    # 1st lvl: if the item in value is a list, treat it as a list
                    if isinstance(item_lvl_1, list):
                        # Initialize verse counter
                        item_lvl_2_number = 1
                        for item_lvl_2 in item_lvl_1:
                            # 2nd lvl:
                            #  if item is a string, treat it as a single verse
                            if isinstance(item_lvl_2, str):

                                # remove hebrew vowels
                                item_lvl_2 = remove_hebrew_diacritics(item_lvl_2)
                                verse_end_char = verse_start_char + len(item_lvl_2) - 1

                                # Add verse to index with UID
                                index['lines'].append({
                                    "uid": generate_uid(index['title'], uid),  # UID unique
                                    "parent_titles": current_keys,
                                    "item_lvl_1_number": item_lvl_1_number,
                                    "item_lvl_2_number": item_lvl_2_number,
                                    "start_char": verse_start_char,
                                    "end_char": verse_end_char,
                                    "length": len(item_lvl_2),
                                    "text": item_lvl_2
                                })

                                # Increment UID for next verse
                                uid += 1

                                # Increment verse number for next verse
                                item_lvl_2_number += 1

                                # Update character counter for next verse
                                verse_start_char = verse_end_char + 1

                            # 2nd lvl: if the item is a list
                            elif isinstance(item_lvl_2, list):
                                # Initialize subverse counter
                                item_lvl_3_number = 1
                                for item_lvl_3 in item_lvl_2:
                                
                                    # 3rd lvl: if the sub_verse is a string
                                    if isinstance(item_lvl_3, str):
                                        item_lvl_3 = remove_hebrew_diacritics(item_lvl_3)
                                        verse_end_char = verse_start_char + len(item_lvl_3) - 1

                                        # Add verse to index with UID
                                        index['lines'].append({
                                        "uid": generate_uid(index['title'], uid),  # UID unique
                                        "parent_titles": current_keys,
                                        "item_lvl_1_number": item_lvl_1_number,
                                        "item_lvl_2_number": item_lvl_2_number,
                                        "item_lvl_3_number": item_lvl_3_number,
                                        "start_char": verse_start_char,
                                        "end_char": verse_end_char,
                                        "length": len(item_lvl_3),
                                        "text": item_lvl_3
                                        })

                                        # Increment UID for next verse
                                        uid += 1

                                        # Increment verse number for next verse
                                        item_lvl_3_number += 1

                                        # Update character counter for next verse
                                        verse_start_char = verse_end_char + 1

                                    # 3rd lvl: if the sub_verse is a list
                                    if isinstance(item_lvl_3, list):
                                        print("nested verses detected and not handled !")

                                item_lvl_2_number += 1
                        # Increment verse number for next verse
                        item_lvl_1_number += 1


            # If the node value is a dictionary, recursively call the browse_nodes_and_count_verses_chars function on the dictionary.
            elif isinstance(value, dict):
                browse_nodes_and_count_verses_chars(value, parent_keys=current_keys, level=level + 1)

    # Load the nodes
    nodes = data['text']
    browse_nodes_and_count_verses_chars(nodes, level=0)

    # Create a new directory named "indexes" if it doesn't exist
    index_dir = "indexes"
    if not os.path.exists(index_dir):
        os.makedirs(index_dir)

    # Save the index to a file, in the "indexes" directory
    title = index["title"]
    index_filename = os.path.join(index_dir, "index_" + title.replace(' ', '_') + ".json")
    with open(index_filename, "w") as f:
        json.dump(index, f, indent=2, ensure_ascii=False)


## Functions for the pipeline

In [8]:
def create_index(filename):
    '''
    Create an index from a structured file.
    The index will contain the start and end character position of each text chunk in the book.
    Handles both simple and complex structured files.
    '''
    
    if is_complex(filename) == True:
        create_index_from_complex_structured_files(filename)
    
    else:
        create_index_from_simple_structured_files(filename)

In [9]:
def browse_and_clean(filename):

    '''
    Remove unwanted characters from the text (e.g. numbers, special unicodes, html tags, etc.)
    Handles both simple and complex structured files.
    '''

    def remove_angle_brackets(text):
        return re.sub(r'\<.*?\>', '', text)

    def remove_special_unicodes(text):
        chars_to_remove = {'\n', '\u2003', '\u200d', '\u200e', '\u202c','\u2009', '\xa0' ,'\xad'}
        for char in chars_to_remove:
            text = text.replace(char, '')
        return text

    def remove_numbers(text):
        return ''.join(char for char in text if not char.isdigit())
    
    def clean_html(text):
        if text.strip():  # Checks if text is not empty after removing spaces
            soup = BeautifulSoup(text, 'html.parser')
            
            # Specifically deletes <b> and </b> tags
            for bold_tag in soup.find_all(['b', 'strong']):
                bold_tag.unwrap()
            
            return soup.get_text()
        else:
            return text  # Returns text unchanged if empty

    # we try to catch and display the MarkupResemblesLocatorWarning warning
    # import warnings

    # def clean_html(text):
    #     with warnings.catch_warnings(record=True) as w:
    #         warnings.simplefilter("always")  # Capture tous les avertissements
    #         if text.strip():
    #             text_to_parse = text  # Stocker le texte avant de l'analyser
    #             soup = BeautifulSoup(text, 'html.parser')
    #             text_without_html = soup.get_text()
    #             # Parcourir tous les avertissements capturés
    #             for warning in w:
    #                 if "MarkupResemblesLocatorWarning" in str(warning.message):
    #                     print("MarkupResemblesLocatorWarning captured while parsing the following text:")
    #                     print(text_to_parse)
    #             return text
    #         else:
    #             return text

    
    def remove_braces(text):
        return re.sub(r'\{.*?\}', '', text)
   

    def clean_text(text):
        # text = remove_angle_brackets(text)
        text = clean_html(text)
        text = remove_special_unicodes(text)
        text = remove_numbers(text)
        text = remove_braces(text)

        return text

    with open(filename, "r") as f:
        data = json.load(f)


    if isinstance(data['text'], list):

        # Browse and clean verses directly in the data structure
        for chapter in data["text"]:
            if isinstance(chapter, str):
                chapter = clean_text(chapter)
            else:
                for i, verse in enumerate(chapter):
                    if isinstance(verse, str):
                        chapter[i] = clean_text(verse)
                    elif isinstance(verse, list):
                        for j, sub_verse in enumerate(verse):
                            verse[j] = clean_text(sub_verse)        
    
    
    elif isinstance(data['text'], dict):
        
        def browse_clean_complex(nodes, level=0):
            for key, value in nodes.items():
                if isinstance(value, list):
                    for i, elem in enumerate(value):
                        if isinstance(elem, list):
                            for j, sub_elem in enumerate(elem):
                                if isinstance(sub_elem, list):
                                    for k, sub_sub_elem in enumerate(sub_elem):
                                        nodes[key][i][j][k] = clean_text(sub_sub_elem)
                                else:
                                    nodes[key][i][j] = clean_text(sub_elem)
                        else:
                            nodes[key][i] = clean_text(elem)
                elif isinstance(value, dict):
                    browse_clean_complex(value, level + 1)
            return nodes

        
        # Load the nodes
        nodes = data['text']
        browse_clean_complex(nodes, level=0)

    # Create a new directory named "cleaned" if it doesn't exist
    if not os.path.exists('cleaned'):
        os.makedirs('cleaned')

    # Define the cleaned filename with spaces replaced by underscores
    base_filename = os.path.basename(filename)
    cleaned_filename = os.path.join('cleaned', base_filename.replace(' ', '_').split('.')[0] + "_clean.json")
    
    
    # Save the cleaned file in the "cleaned" directory
    with open(cleaned_filename, "w") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

In [10]:
def concatenate_verses(filename):
    '''
    Concatenate the verses of a book into a single text file.
    Handles both simple and complex structured files.
    '''
    with open(filename, "r") as f:
        data = json.load(f)

    concatenated_text = ""

    def remove_hebrew_diacritics(text):
        normalized_text = unicodedata.normalize('NFKD', text)
        return ''.join(c for c in normalized_text if not unicodedata.combining(c))

    if isinstance(data['text'], list):
        # Browse and clean verses directly in the data structure
        for chapter in data["text"]:
            for verse in chapter:
                if isinstance(verse, str):
                    concatenated_text += verse
                elif isinstance(verse, list):
                    for sub_verse in verse:
                        if isinstance(sub_verse, str):
                            concatenated_text += sub_verse

    elif isinstance(data['text'], dict):

        def browse_concat_complex(nodes):
            text = ""
            for value in nodes.values():
                if isinstance(value, list):
                    for elem in value:
                        if isinstance(elem, str):
                            text += elem
                        elif isinstance(elem, list):
                            for sub_elem in elem:
                                if isinstance(sub_elem, str):
                                    text += sub_elem
                                elif isinstance(sub_elem, list):
                                    for sub_sub_elem in sub_elem:
                                        if isinstance(sub_sub_elem, str):
                                            text += sub_sub_elem
                                        elif isinstance(sub_sub_elem, list):
                                            print("Nested verses detected and not handled!")
                elif isinstance(value, dict):
                    text += browse_concat_complex(value)
            return text

        # Load the nodes
        concatenated_text = browse_concat_complex(data['text'])

    # Create a new directory named "concatenated" if it doesn't exist
    concatenated_dir = 'concatenated'
    if not os.path.exists(concatenated_dir):
        os.makedirs(concatenated_dir)

    # Save the concatenated text to a file, in the "concatenated" directory
    concatenated_filename = os.path.join(concatenated_dir, os.path.splitext(os.path.basename(filename))[0] + "_concatenated.txt")
    with open(concatenated_filename, "w") as f:
        f.write(remove_hebrew_diacritics(concatenated_text))


## Pipeline

In [11]:
# For all the files in the directory, browse, clean and concatenate the verses
import os
path = "../../Corpuses_from_Sefaria/Bavli"

# Variable to track whether an error has occurred
error_occurred = False

# Clean the texts in the directory
for filename in os.listdir(path):
    if filename.endswith(".json"):
        try:
            browse_and_clean(os.path.join(path, filename))
        except Exception as e:
            error_occurred = True
            print(f"An error occurred while processing file {filename}: {str(e)}")

# create an index
for filename in os.listdir("cleaned"):
    if filename.endswith(".json"):
        try:
            create_index(os.path.join("cleaned", filename))
        except Exception as e:
            error_occurred = True
            print(f"An error occurred while creating index for file {filename}: {str(e)}")

# Concatenate the verses in the 'cleaned' directory
for filename in os.listdir("cleaned"):
    if filename.endswith(".json"):
        try:
            concatenate_verses(os.path.join("cleaned", filename))
        except Exception as e:
            error_occurred = True
            print(f"An error occurred while concatenating verses for file {filename}: {str(e)}")

# Afficher un message indiquant que le processus s'est terminé sans erreur
if not error_occurred:
    print("The process was completed without error.")

The process was completed without error.


In [12]:
# import os

# path = "../../Corpuses_from_Sefaria/Liturgy"

# # Variable to track whether an error has occurred
# error_occurred = False

# # Clean the texts in the directory
# for filename in os.listdir(path):
#     if filename.endswith(".json"):
#         browse_and_clean(os.path.join(path, filename))

# # create an index
# for filename in os.listdir("cleaned"):
#     if filename.endswith(".json"):
#         create_index(os.path.join("cleaned", filename))

# # Concatenate the verses in the 'cleaned' directory
# for filename in os.listdir("cleaned"):
#     if filename.endswith(".json"):
#         concatenate_verses(os.path.join("cleaned", filename))

# # Afficher un message indiquant que le processus s'est terminé sans erreur
# if not error_occurred:
#     print("The process was completed without error.")
