Preprocessing steps:
The preprocessing stage in natural language processing (NLP) involves transforming raw text data into a format that can be more effectively analyzed and used by machine learning models.
This stage is crucial for enhancing the quality of the data and ensuring that the models can learn effectively from it. Here's an overview of what happens during the preprocessing stage and why each step is important:

Preprocessing: Involves initial steps to clean and prepare raw data for further processing. This includes actions like removing duplicates, filtering out irrelevant or small files, and general data cleaning.

Remove:
        - Duplicate files
        - Non dutch files
        - Swear words
        - Smaller files

Why this order: Duplicate files will remove many files and do not effect the other processing steps. special characters will follow up to find the following items easier. Then non dutch files are removed, this needs to be done before swear words because in the blacklist some words do also excist in english. the last steps are not that important in which order to do so.


In [None]:
import os
import shutil
import hashlib
import string
from langdetect import detect
import openpyxl
import re
from bs4 import BeautifulSoup
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import matplotlib.pyplot as plt

In [None]:
folders = [
    '../input/data_folder/Part1',
    '../input/data_folder/part2',
    '../input/data_folder/part3',
    '../input/data_folder/part4',
    '../input/data_folder/part5',
    '../input/data_folder/part6'
]

In [None]:
# function calculates and returns the MD5 hash of the contents of a specified file
def hash_file(filepath):
    try:
        hasher = hashlib.md5()
        with open(filepath, 'rb') as file:
            buf = file.read()
            hasher.update(buf)
        return hasher.hexdigest()
    except Exception as e:
        print(f"Could not hash file {filepath}: {e}")
        return None

# Function to find and move duplicate files
def Duplicates(folders, DuplicateFolder):
    if not os.path.exists(DuplicateFolder):
        os.makedirs(DuplicateFolder)
    hashes = {}
    duplicates = []

    try:
        for folder in folders:
            for root, _, files in os.walk(folder):
                for filename in files:
                    if filename.endswith('.txt'):
                        filepath = os.path.join(root, filename)
                        file_hash = hash_file(filepath)
                        if file_hash is None:
                            continue
                        if file_hash in hashes:
                            duplicates.append(filepath)
                            duplicate_target = os.path.join(DuplicateFolder, os.path.relpath(filepath, folder))
                            os.makedirs(os.path.dirname(duplicate_target), exist_ok=True)
                            shutil.move(filepath, duplicate_target)
                        else:
                            hashes[file_hash] = filepath
    except Exception as e:
        print(f"An error occurred: {e}")
        return

    if duplicates:
        print(f"\nTotal number of duplicate files moved: {len(duplicates)}")
    else:
        print("No duplicate text files found.")

# Function to move small files
def MoveSmallFiles(folders, small_files_folder):
    min_size = 3 * 1024  # Minimum size in bytes (3 KB)
    if not os.path.exists(small_files_folder):
        os.makedirs(small_files_folder)
    for folder in folders:
        for root, _, files in os.walk(folder):
            for file in files:
                try:
                    file_path = os.path.join(root, file)
                    if os.path.getsize(file_path) < min_size:
                        relative_path = os.path.relpath(file_path, folder)
                        target_path = os.path.join(small_files_folder, relative_path)
                        os.makedirs(os.path.dirname(target_path), exist_ok=True)
                        shutil.move(file_path, target_path)
                except Exception as e:
                    print(f"Failed to move {file_path}: {e}")

# Function to detect the language of the file's content
def detect_language(text):
    try:
        return detect(text)
    except:
        return None

# Function to move non-Dutch files
def DutchFiles(folders, non_dutch_txt):
    if not os.path.exists(non_dutch_txt):
        os.makedirs(non_dutch_txt)
    for folder in folders:
        for root, _, files in os.walk(folder):
            for file in files:
                try:
                    file_path = os.path.join(root, file)
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    language = detect_language(content)
                    if language != 'nl':
                        relative_path = os.path.relpath(file_path, folder)
                        target_path = os.path.join(non_dutch_txt, relative_path)
                        os.makedirs(os.path.dirname(target_path), exist_ok=True)
                        shutil.move(file_path, target_path)
                except Exception as e:
                    print(f"Could not process file {file_path}: {e}")

# Function to load bad words from an Excel file
def load_bad_words(BadWords):
    bad_words = set()
    try:
        workbook = openpyxl.load_workbook(BadWords)
        sheet = workbook.active
        for row in sheet.iter_rows():
            for cell in row:
                if cell.value:
                    bad_words.add(cell.value.strip().lower())
        workbook.close()
    except Exception as e:
        print("load_bad_words:", e)
    return bad_words

# Function to check if a file contains bad words
def file_contains_bad_words(file_path, BadWords):
    Max_badword_count = 3
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().lower()
        words = content.split()
        bad_word_count = sum(1 for word in words if word in BadWords)
        return bad_word_count >= Max_badword_count
    except Exception as e:
        print("file_contains_bad_words:", e)
        return False

# Function to move files containing bad words
def move_files_with_bad_words(folders, BadWords, destination_folder):
    BadWords = load_bad_words(BadWords)
    if not BadWords:
        print("No bad words loaded. Exiting function.")
        return
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)
    for folder in folders:
        try:
            for root, _, files in os.walk(folder):
                for file in files:
                    file_path = os.path.join(root, file)
                    if file_contains_bad_words(file_path, BadWords):
                        relative_path = os.path.relpath(file_path, folder)
                        target_path = os.path.join(destination_folder, relative_path)
                        os.makedirs(os.path.dirname(target_path), exist_ok=True)
                        shutil.move(file_path, target_path)
        except Exception as e:
            print("move_files_with_bad_words:", e)



# Duplicate files
DuplicateFolder = 'duplicates'
Duplicates(folders, DuplicateFolder)

# Small files
small_files_folder = 'SmallFiles'
MoveSmallFiles(folders, small_files_folder)

# Non-Dutch files
non_dutch_txt = 'NonDutch1'
DutchFiles(folders, non_dutch_txt)

# Files containing bad words
BadWords = 'Badwords.xlsx'
destination_folder = 'BadWords_files'
move_files_with_bad_words(folders, BadWords, destination_folder)


Normalization
Remove emoji and emoticons
remove URL and HTML tags
Remove accents

Remove special characters: Only keep alphanumeric characters, dots, dashes, and underscores

In [None]:
import os

def replace_accented_characters(text):
    replacements = {
        'ë': 'e', 'ï': 'i', 'é': 'e', 'è': 'e', 'ö': 'o', 'ê': 'e',
        'ü': 'u', 'ç': 'c', 'à': 'a', 'û': 'u', 'î': 'i', 'ñ': 'n',
        'ä': 'a', 'ô': 'o'
    }
    for key, value in replacements.items():
        text = text.replace(key, value)
    return text

def process_files(folders, output_folder):
    try:
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
    except OSError as e:
        print(f"Error creating output folder {output_folder}: {e}")
        return

    for folder in folders:
        for root, _, files in os.walk(folder):
            for file in files:
                input_file_path = os.path.join(root, file)
                try:
                    with open(input_file_path, 'r', encoding='utf-8') as f:
                        text = f.read()
                except FileNotFoundError as e:
                    print(f"File not found: {input_file_path}: {e}")
                    continue
                except IOError as e:
                    print(f"Error reading file {input_file_path}: {e}")
                    continue

                try:
                    processed_text = replace_accented_characters(text)
                except Exception as e:
                    print(f"Error processing file {input_file_path}: {e}")
                    continue

                relative_path = os.path.relpath(root, folder)
                output_file_folder = os.path.join(output_folder, relative_path)
                try:
                    if not os.path.exists(output_file_folder):
                        os.makedirs(output_file_folder)
                except OSError as e:
                    print(f"Error creating directory {output_file_folder}: {e}")
                    continue

                output_file_path = os.path.join(output_file_folder, file)
                try:
                    with open(output_file_path, 'w', encoding='utf-8') as f:
                        f.write(processed_text)
                except IOError as e:
                    print(f"Error writing file {output_file_path}: {e}")
                    continue


output_folder = 'Processed'

process_files(folders, output_folder)


remove HTML tags.

In [None]:
# Function to remove HTML tags
def remove_html_tags(text):
    html_pattern = re.compile(r'<.*?>')
    return html_pattern.sub(r'', text)

# Function to process files in a folder
def process_folder(folder_path):
    try:
        # Iterate through all files in the folder
        for filename in os.listdir(folder_path):
            file_path = os.path.join(folder_path, filename)
            # Check if it's a file
            if os.path.isfile(file_path):
                try:
                    with open(file_path, 'r', encoding='utf-8') as file:
                        content = file.read()
                    cleaned_content = remove_html_tags(content)
                    with open(file_path, 'w', encoding='utf-8') as file:
                        file.write(cleaned_content)
                    #print(f"Processed file: {filename}")
                except Exception as e:
                    print(f"Error processing file {filename}: {e}")
    except Exception as e:
        print(f"Error accessing folder {folder_path}: {e}")


folder_path = output_folder
process_folder(folder_path)


remove emoticons and emojis

In [None]:
# Function to remove emojis from text
def remove_emoji(text):
    try:
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticons
                                   u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                   u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                   u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   u"\U00002702-\U000027B0"
                                   u"\U000024C2-\U0001F251"
                                   "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)
    except Exception as e:
        print(f"Error removing emojis: {e}")
        return text

# Function to remove emoticons from text
def remove_emoticons(text):
    try:
        EMOTICONS = {':)', ':-)', ':(', ':-(', ':D', ':-D', ':P', ':-P', 'XD', 'xD'} #well known emojis not all, manually inputed
        emoticon_pattern = re.compile(u'(' + u'|'.join(re.escape(e) for e in EMOTICONS) + u')')
        return emoticon_pattern.sub(r'', text)
    except Exception as e:
        print(f"Error removing emoticons: {e}")
        return text

def process_files_in_folder(folder_path):
    try:
        for filename in os.listdir(folder_path):
            file_path = os.path.join(folder_path, filename)

            if os.path.isfile(file_path):
                try:
                    with open(file_path, 'r', encoding='utf-8') as file:
                        content = file.read()

                    processed_content = remove_emoji(content)
                    processed_content = remove_emoticons(processed_content)

                    with open(file_path, 'w', encoding='utf-8') as file:
                        file.write(processed_content)

                except Exception as e:
                    print(f"Error processing file {file_path}: {e}")
            else:
                print(f"Skipping non-file entry: {file_path}")
    except Exception as e:
        print(f"Error iterating through folder {folder_path}: {e}")

folder_path = folder_path
process_files_in_folder(folder_path)

for removing HTML tags

In [None]:
# Function to remove HTML tags using BeautifulSoup
def remove_html_tags_bs4(text):
    soup = BeautifulSoup(text, "html.parser")
    return soup.get_text()

# Function to remove HTML tags using regex
def remove_html_tags_regex(text):
    html_pattern = re.compile(r'<.*?>')
    return html_pattern.sub(r'', text)

# Function to remove HTML tags from files in the given folder
def RemoveHTMLTags(folder, method='bs4'):
    i = 0
    try:
        for root, dirs, files in os.walk(folder):
            for file in files:
                old_file_path = os.path.join(root, file)
                try:
                    # Read the file content
                    with open(old_file_path, 'r', encoding='utf-8') as f:
                        content = f.read()

                    # Remove HTML tags
                    if method == 'bs4':
                        content = remove_html_tags_bs4(content)
                    elif method == 'regex':
                        content = remove_html_tags_regex(content)
                    else:
                        raise ValueError("Invalid method for removing HTML tags. Choose 'bs4' or 'regex'.")

                    # Write the modified content back to the file
                    with open(old_file_path, 'w', encoding='utf-8') as f:
                        f.write(content)

                except Exception as file_error:
                    i += 1
                    print(f"Error processing file {old_file_path}: {file_error}\n")
                except OSError as err:
                    print("OS error:", err)
                except ValueError:
                    print("Could not convert data to an integer.")
                except Exception as err:
                    print(f"Unexpected {err=}, {type(err)=}")
                    raise

        print("Number of files that could not be processed:", i)

    except Exception as main_error:
        print(f"An error occurred while accessing the folder: {main_error}")

folder = folder_path
RemoveHTMLTags(folder, method='bs4')

Text chunks. Combine texts in smaller text files

In [None]:
#This function combines each sentence in a list with its neighboring sentences based on a specified buffer size
#and appends the combined result as a new entry in each sentence's dictionary.
def combine_sentences(sentences, buffer_size=1):
    for i in range(len(sentences)):
        combined_sentence = ''
        for j in range(i - buffer_size, i):
            if j >= 0:
                combined_sentence += sentences[j]['sentence'] + ' '
        combined_sentence += sentences[i]['sentence']

        for j in range(i + 1, i + 1 + buffer_size):
            if j < len(sentences):
                combined_sentence += ' ' + sentences[j]['sentence']

        sentences[i]['combined_sentence'] = combined_sentence
    return sentences

#This function calculates the cosine distance between the embeddings of consecutive combined sentences in a list
#appends each distance to the list and updates each sentence's dictionary with the distance to the next sentence.
def calculate_cosine_distance(sentences):
    distances = []
    for i in range(len(sentences)-1):
        embedding_current = sentences[i].get('combined_sentence_embedding', None)
        embedding_next = sentences[i+1].get('combined_sentence_embedding', None)

        if embedding_current is not None and embedding_next is not None:
            similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]
            distance = 1 - similarity
            distances.append(distance)
            sentences[i]['distance_to_next'] = distance
        else:
            distances.append(0.0)

    return distances, sentences

#This function processes a text file by splitting it into sentences, combining adjacent sentences into chunks based on cosine similarity of their embeddings
#and saving chunks exceeding 2 kB into separate text files in a specified output folder.
def split_into_chunks(input_file_path, output_folder):
    try:
        with open(input_file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        single_sentences_list = re.split(r'(?<=[.?!])\s+', content)
        sentences = [{"sentence": x, "index" : i} for i, x in enumerate(single_sentences_list)]

        sentences = combine_sentences(sentences)

        model = SentenceTransformer('all-MiniLM-L6-v2')
        combined_texts = [sentence['combined_sentence'] for sentence in sentences]
        embeddings = model.encode(combined_texts)

        for i, sentence in enumerate(sentences):
            sentence['combined_sentence_embedding'] = embeddings[i]

        distances, sentences = calculate_cosine_distance(sentences)

        plt.plot(distances)
        y_upper_bound = 0.65
        plt.ylim(0, y_upper_bound)
        plt.xlim(0, len(distances))
        breakpoint = 95
        breakpoint2 = np.percentile(distances, breakpoint)
        plt.axhline(y=breakpoint2, color='r', linestyle='-')
        #plt.show()

        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        start = 0
        chunk_counter = 0
        for i, dist in enumerate(distances):
            if dist > breakpoint2:
                chunk_sentences = sentences[start:i+2]  # Include current and next sentence
                combined_txt = ' '.join([sent['sentence'] for sent in chunk_sentences])

                # Check if size > 2 kB
                if len(combined_txt.encode('utf-8')) > 2000:
                    chunk_counter += 1
                    with open(os.path.join(output_folder, f'{os.path.basename(input_file_path)}_chunk_{chunk_counter}.txt'), 'w') as file:
                        file.write(combined_txt)

                start = i + 1

        # Handle any remaining sentences
        if start < len(sentences):
            combined_txt = " ".join(d["sentence"] for d in sentences[start:])

            # Check if size > 2 kB
            if len(combined_txt.encode('utf-8')) > 2000:
                chunk_counter += 1
                with open(os.path.join(output_folder, f'{os.path.basename(input_file_path)}_chunk_{chunk_counter}.txt'), 'w') as file:
                    file.write(combined_txt)

    except Exception as e:
        print(f"An error occurred while processing {input_file_path}: {e}")

def process_folder(input_folder, output_folder):
    for filename in os.listdir(input_folder):
        input_file_path = os.path.join(input_folder, filename)
        if os.path.isfile(input_file_path) and input_file_path.endswith('.txt'):
            print(f"Processing {input_file_path}...")
            split_into_chunks(input_file_path, output_folder)


input_folder = folder_path
output_folder = "../input/textfiles"
process_folder(input_folder, output_folder)
