#**Reading the datasets**

In [None]:
#IMPORT YOUR OWN Data
dataPath = "ENTER YOUR DATA PATH"
dictionary_path = 'ENTER YOUR Dictionary PATH'
API_KEY = 'ENTER YOUR API KEY'

In [None]:
!wget https://raw.githubusercontent.com/mohataher/arabic-stop-words/master/list.txt -O arabic_stopwords.txt



In [None]:
from google.colab import drive
# Mount Google Drive (follow the link and enter the authorization code)
drive.mount('/content/drive')

In [None]:
!pip install pyspark



In [None]:
import os
# prompt: read test.txt in pyspark after installing it
# Set PYTHONHASHSEED environment variable to '0' before importing PySpark
os.environ['PYTHONHASHSEED'] = '0'

# Now you can import PySpark and continue with your application
from pyspark import SparkContext
# Your PySpark application code here

import pyspark
sc = pyspark.SparkContext()
test_data = sc.textFile(dataPath)




In [None]:
!pip install pyspark findspark


# **Creating a dataset**


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("excel_to_rdd").getOrCreate()

# Read your CSV file into a DataFrame
df = spark.read.csv(dictionary_path, header=True, inferSchema=True, encoding="UTF-8")
rdd = df.rdd


In [None]:
rdd.take(10)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

def strip_arabic_diacritics(word):
    if word and isinstance(word, str):
        return re.sub(r'[\u064B-\u065F]', '', word)
    return word

strip_arabic_diacritics_udf = udf(strip_arabic_diacritics, StringType())



In [None]:
# Assuming your original DataFrame is 'df'
df_with_stripped = df.withColumn("word_stripped", strip_arabic_diacritics_udf(df["word"]))


# Convert DataFrame to RDD
rdd = df_with_stripped.rdd


In [None]:
def map_function(row):
    return (row.word_stripped, row)
def reduce_function(value1, value2):
    return value1 + [value2] if isinstance(value1, list) else [value1, value2]

dictionary = rdd.map(map_function).reduceByKey(reduce_function)


In [None]:
dictionary.collect()



In [None]:
from pyspark.sql.functions import collect_list

# Group by the stripped word and collect the original words into a list
grouped_df = df_with_stripped.groupBy("word_stripped").agg(collect_list("word").alias("original_words"))

# Show the result
grouped_df.show(truncate=False)


# **Preprocessing the stemmer**

In [None]:
!wget https://raw.githubusercontent.com/mohataher/arabic-stop-words/master/list.txt -O arabic_stopwords.txt


In [None]:
import re

# Assuming 'test_data' is your dataset that you want to process

with open('arabic_stopwords.txt', 'r', encoding='utf-8') as file:
    stopwords = set(file.read().splitlines())

# Assuming 'test_data' is a collection of sentences
def remove_diacritics(sentence):
    words = sentence.split()
    return [(re.sub('[ًٌٍَُِّْ]', '', word), idx) for idx, word in enumerate(words)], ['remove_diacritics']

def remove_numbers(sentence):
    new_sentence, changes = sentence
    filtered = [(re.sub(r'\d+', '', word), idx) for word, idx in new_sentence]
    return filtered, changes + ['remove_numbers']

def remove_punctuation(sentence):
    new_sentence, changes = sentence
    filtered = [(re.sub(r'[^\w\s]', '', word), idx) for word, idx in new_sentence]
    return filtered, changes + ['remove_punctuation']

def remove_stopwords(sentence, stopwords):
    new_sentence, changes = sentence
    filtered = [(word, idx) if word not in stopwords else ('', idx) for word, idx in new_sentence]
    return filtered, changes + ['remove_stopwords']

def remove_extra_spaces_and_reconstruct(sentence):
    new_sentence, changes = sentence
    # Filter out the empty tokens and reconstruct the sentence
    reconstructed_sentence = ' '.join([word for word, idx in new_sentence if word.strip() != ''])
    return reconstructed_sentence, [item for item in new_sentence if item[0].strip() != ''], changes + ['remove_extra_spaces']

# Apply transformations
test_data_transformed = test_data.map(remove_diacritics) \
                                   .map(remove_punctuation) \
                                   .map(remove_numbers) \
                                   .map(remove_extra_spaces_and_reconstruct)
#                                  .map(lambda s: remove_stopwords(s, stopwords)) \

# Collect results to the driver for inspection
results = test_data_transformed.collect()



In [None]:
# Print results for inspection, limited to 10 iterations
for i, (original, (transformed_sentence, word_mappings, changes)) in enumerate(zip(test_data.collect(), results)):
    if i >= 10:  # Stop after 10 iterations
        break

    print(f"Original: {original}")
    print(f"Transformed: {transformed_sentence}")
    # We're no longer printing the tokenized version or the changes
    print()


In [None]:
transformed_sentences = [transformed_sentence for transformed_sentence, _, _ in results]

# Now, `transformed_sentences` contains all the full sentences after processing.
# You can pass this list to your stemming tool or further processing steps.


In [None]:
transformed_sentences

# **Stemmer**

In [None]:
pip install farasapy


In [None]:
from farasa.pos import FarasaPOSTagger
from farasa.ner import FarasaNamedEntityRecognizer
from farasa.diacratizer import FarasaDiacritizer
from farasa.segmenter import FarasaSegmenter
from farasa.stemmer import FarasaStemmer

stemmer = FarasaStemmer()


In [None]:
!pip install tqdm

# **Diacritics Generation**

In [None]:
!pip install openai
import openai

## **one item test**

In [None]:
index= 0
#test_data.take(50)[index],
#test = remove_extra_spaces_and_reconstruct(remove_diacritics(test_data.take(50)[index]))[0], results[index][0], results[index][1]
#test

In [None]:
from farasa.stemmer import FarasaStemmer
import tqdm

# Initialize the Farasa stemmer
stemmer = FarasaStemmer()


def stem_sentence(sentence, filtered_sentence, wordlist):
  # Stem the processed sentence
  stemmed_sentence = stemmer.stem(sentence)

  # Split the stemmed sentence into words assuming spaces as delimiters
  stemmed_words = stemmed_sentence.split()

  # Process each word-index tuple to append the corresponding stemmed word
  new_tuples = []
  for word, word_index in tqdm.tqdm(wordlist):
      # Ensure the word index is within the bounds of stemmed_words
      if word_index < len(stemmed_words):
          stemmed_word = stemmed_words[word_index]
          new_tuples.append((word_index, stemmed_word, word))
      else:
          # In case the word index is out of bounds, append None or handle appropriately
          new_tuples.append((word, word_index, None))
  return sentence, filtered_sentence, new_tuples

# Now, `new_tuples` contains tuples of the form (original word, word index, stemmed word)
#test = test_data.take(500)[index], remove_extra_spaces_and_reconstruct(remove_diacritics(test_data.take(50)[index]))[0], results[index][0], new_tuples

#test
#stem_sentence(remove_extra_spaces_and_reconstruct(remove_diacritics(test_data.take(50)[index]))[0], results[index][0],  results[index][1])

In [None]:
dictionary.lookup("أول")

In [None]:
dictionary.take(1)

In [None]:
#test[3][15][2]

In [None]:
from openai import OpenAI

client = OpenAI(api_key='your_api_key_here')

def generate_oneWord_diacritics(word,stemmed_word, sentence, dictionary):
    # Contextual explanation of the word within a sentence
    prompt_explanation = "في سياق الجملة '{}', ما هو المعنى الدقيق لكلمة '{}'؟".format(sentence, word)
    #print(prompt_explanation)

    # Derivation and meaning matching
    dictionary_meaning = dictionary.lookup(stemmed_word)
    prompt_meaning = "كلمة '{}' مشتقة من '{}'. أي من المعاني التالية يتوافق مع استخدام '{}' في السياق المذكور: {}؟".format(stemmed_word, word, word, dictionary_meaning)

    # Diacritic addition based on meaning
    prompt_diacritics = "أضف التشكيلات الصوتية لكلمة '{}' في جملة '{}' بناءً على معنى '{}' كما تم تحديده أعلاه.".format(word, sentence, stemmed_word)
    client = OpenAI(api_key=API_KEY)
    # Sending the requests to OpenAI API with structured conversation
    response = client.chat.completions.create(
        model="gpt-4",
        messages = [
            {"role": "system", "content": "أنت خبير لغوي، يرجى تقديم توضيح للمعنى وتشكيل الكلمة."},
            {"role": "user", "content": "في سياق الجملة '{}', ما المعنى الدقيق لكلمة '{}'؟".format(sentence, word)},
            {"role": "system", "content": "استنادًا إلى المعنى المفسر أعلاه، قم بتوقع التشكيل الصحيح لجذر كلمة '{}' في جملة '{}'.".format(stemmed_word, sentence)},
            {"role": "user", "content": "كلمة '{}' مشتقة من '{}'. بناءً على سياق الجملة '{}', قم بتوقع التشكيل لجذر كلمة '{}'.".format(stemmed_word, word, sentence, word)},
            {"role": "system", "content": "الآن، بناءً على التشكيل المقترح للجذر والسياق المحدد، قم بإضافة التشكيلات اللازمة للكلمة بأكملها."},
            {"role": "user", "content": "أضف التشكيلات الصوتية لكلمة '{}' في الجملة '{}' لتوضيح معناها المقصود.".format(word, sentence)}
        ],
        temperature=0.2,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0
    )
    return response.choices[0].message.content

# Example usage (make sure to define 'test' and 'dictionary' appropriately)
#x = generate_oneWord_diacritics(test[3][15], test[1], dictionary)


In [None]:
import string


def replace_word(word,sentence,word_index):
    sentence_split = sentence.split()
    sentence_split[word_index] = word
    return " ".join(sentence_split)

def update_diacritcs(LLM_output, word, sentence, word_index):
  LLM_output = LLM_output.translate(str.maketrans('', '', string.punctuation))
  diacriticized_words = LLM_output.split()
  for diacriticized_word in diacriticized_words:
    if (remove_diacritics(diacriticized_word)[0][0][0] == word) & (len(remove_diacritics(diacriticized_word)[0][0][0]) != len(diacriticized_word)):
      y = replace_word(diacriticized_word,sentence,word_index)
      return y
  return sentence

def create_diacritics(inputSentence, dictionary):
  output = inputSentence[0]
  for word_index, stemmed_word, word in inputSentence[2]:
    LLM_output = generate_oneWord_diacritics(word,stemmed_word, output, dictionary)
    output = update_diacritcs(LLM_output, word, output, word_index)
  return output



In [None]:
#create_diacritics(test[1:], dictionary)

In [None]:
#test[1:][2][0][2]

In [None]:
#test[3][15][0]

In [None]:
#remove_punctuation(remove_diacritics(diacriticized_word[4]))[0][0][0] == test[3][15][2]


## **Bulk test**

In [None]:
#test = test_data.take(500)[index], remove_extra_spaces_and_reconstruct(remove_diacritics(test_data.take(500)[index]))[0], results[index][0], new_tuples
#stem_sentence(remove_extra_spaces_and_reconstruct(remove_diacritics(test_data.take(50)[index]))[0], results[index][0],  results[index][1])


def save_checkpoint(data, filename):
    """Save the data to a file."""
    with open(filename, 'a') as file:  # 'a' mode to append to the file
        for line in data:
            file.write(line + '\n')

# Initialize a list to store the outputs
outputs = []
data = test_data.take(501)
for index in range(30,49):
    target = data[index]
    filtered_text = remove_extra_spaces_and_reconstruct(remove_diacritics(target))[0]
    sample = stem_sentence(filtered_text, results[index][0], results[index][1])
    generated_text = create_diacritics(sample, dictionary)

    # Collect the current iteration's output
    outputs.append(f"Target Text: {target}, Filtered Text: {filtered_text}, Generated Text: {generated_text}")

    # Checkpoint every 10 iterations
    if (index + 1) % 10 == 0:
        save_checkpoint(outputs, f'/content/drive/MyDrive/BulkTestGPT4NoDictionary/outputs_checkpoint_{index // 10}.txt')
        outputs = []  # Reset the outputs list for the next batch

# Save any remaining outputs after the final iteration
if outputs:
    save_checkpoint(outputs, f'/content/drive/MyDrive/BulkTestGPT4NoDictionary/outputs_checkpoint_final.txt')




# **Evaluation**

In [None]:
pip install Levenshtein

In [None]:
import pandas as pd
import os
import glob

def read_checkpoint_files(folder_path):
    # Pattern to match all checkpoint files
    file_pattern = os.path.join(folder_path, 'outputs_checkpoint_*.txt')
    # List of all matching files
    file_list = glob.glob(file_pattern)

    # Initialize an empty list to store DataFrame rows before concatenation
    rows = []

    # Read each file
    for file in file_list:
        with open(file, 'r') as f:
            for line in f:
                try:
                    # Parse the line
                    parts = line.split(', ')
                    target_text = parts[0].split('Target Text: ')[1]
                    filtered_text = parts[1].split('Filtered Text: ')[1]
                    generated_text = parts[2].split('Generated Text: ')[1].strip()

                    # Append to list as dictionary
                    rows.append({'Target Text': target_text,
                                 'Filtered Text': filtered_text,
                                 'Generated Text': generated_text})
                except IndexError:
                    # Handle lines that do not match the expected format
                    print(f"Skipping line due to unexpected format: {line}")

    # Convert the list of dictionaries to a DataFrame
    df = pd.DataFrame(rows)
    return df

# Specify the folder path where your files are saved
folder_path = '/content/drive/MyDrive/BulkTestGPT4NoDictionary'
df = read_checkpoint_files(folder_path)

# Now `df` contains all your data
print(df.head())  # Print the first few rows to check


In [None]:
df

In [None]:
def clean_text(text):
    # Regular expression to keep Arabic letters, diacritics, and spaces, excluding common Arabic punctuation
    pattern = r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\u064B-\u065F\u0670\u08D3-\u08E1\s]'
    arabic_punctuations = r'[،؛؟٪٫٬٭؉۔]'

    # Remove characters not matched by the pattern
    cleaned_text = re.sub(pattern, '', text)
    # Remove Arabic punctuations
    cleaned_text = re.sub(arabic_punctuations, '', cleaned_text)
    # Remove extra spaces
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()

    return cleaned_text


def align_text(original_text, generated_text):
    aligned_original = []
    aligned_generated = []
    original_text = clean_text(original_text)
    generated_text = clean_text(generated_text)
    original_index = 0
    generated_index = 0

    while original_index < len(original_text) or generated_index < len(generated_text):
        original_text
        original_char = original_text[original_index] if original_index < len(original_text) else ''
        generated_char = generated_text[generated_index] if generated_index < len(generated_text) else ''

        if (original_char in "ًٌٍَُِّْ|" and generated_char in "ًٌٍَُِّْ|") or (original_char not in "ًٌٍَُِّْ|" and generated_char not in "ًٌٍَُِّْ|"):
            aligned_original.append(original_char)
            aligned_generated.append(generated_char)
            original_index += 1
            generated_index += 1
        elif original_char not in "ًٌٍَُِّْ|":
            aligned_original.append('|')
            aligned_generated.append(generated_char)
            generated_index += 1
        else:
            aligned_original.append(original_char)
            aligned_generated.append('|')
            original_index += 1

    aligned_original_text = ''.join(aligned_original)
    aligned_generated_text = ''.join(aligned_generated)

    return aligned_original_text, aligned_generated_text


# Sample original text and generated text
original_text = "123()الكَتابُ جَيدٌ جداً"
generated_text = "123)الكِتابُ جيِّدٌ جداً"

# Align the texts
aligned_original_text, aligned_generated_text = align_text(original_text, generated_text)

# Print the aligned texts
print(f"Original Text: {aligned_original_text}")
print(f"Generated Text: {aligned_generated_text}")


for index in range(df.shape[0]):
    aligned_target_text, aligned_generated_text = align_text(df.loc[index,"Target Text"], df.loc[index,"Generated Text"])
    df.at[index, 'Aligned Target Text'] = aligned_target_text
    df.at[index, 'Aligned Generated Text'] = aligned_generated_text



In [None]:
import re


def clean_text(text):
    # Regular expression to keep Arabic letters, diacritics, and spaces, excluding common Arabic punctuation
    pattern = r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\u064B-\u065F\u0670\u08D3-\u08E1\s]'
    arabic_punctuations = r'[،؛؟٪٫٬٭؉۔]'

    # Remove characters not matched by the pattern
    cleaned_text = re.sub(pattern, '', text)
    # Remove Arabic punctuations
    cleaned_text = re.sub(arabic_punctuations, '', cleaned_text)
    # Remove extra spaces
    cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()

    return cleaned_text

# Sample text that includes Arabic characters, diacritics, and some unwanted characters
text = "مرحبًا بالعالم! 123 هذا اختبار... [هل أنت جاهز؟] {نعم، أنا كذلك!}، وأيضًا؛"

# Clean the text
cleaned_text = clean_text(text)

print("Original Text:", text)
print("Cleaned Text:", cleaned_text)


In [None]:
df.loc[4,"Target Text"]

In [None]:
df

In [None]:
def remove_non_diacritics(text):
    diacritic_characters = "ًٌٍَُِّْ |"
    result = [char for char in text if char in diacritic_characters]
    return ''.join(result)

# Example usage:
text = "الكِتاب جيِّدٌ جداً"
diacritic_text = remove_non_diacritics(text)
print(diacritic_text)


for index in range(df.shape[0]):
    filtered_target_text, filtered_generated_text = remove_non_diacritics(df.loc[index,"Aligned Target Text"]),(remove_non_diacritics(df.loc[index,"Aligned Generated Text"]))
    df.at[index, 'Filtered Target Text'] = filtered_target_text
    df.at[index, 'Filtered Generated Text'] = filtered_generated_text

In [None]:
def remove_corresponding_indices(target_text, generated_text):
    # Initialize empty strings for the new output
    new_generated_text = ""
    new_target_text = ""

    # Initialize an index for iterating over the characters
    i = 0

    # Iterate over the characters in the generated text
    while i < len(generated_text) and i < len(target_text):
        # If either character is '|', increase the index and skip adding the characters
        if generated_text[i] == '|' or target_text[i] == '|':
            i += 1
            continue

        # Add characters to new strings if there's no '|' at the current index in either string
        new_generated_text += generated_text[i]
        new_target_text += target_text[i]
        i += 1

    return new_target_text,  new_generated_text




def word_accuracy(reference, generated):
    reference_words = reference.split()
    generated_words = generated.split()

    correct_words = sum(1 for ref, gen in zip(reference_words, generated_words) if ref == gen)
    total_words = len(reference_words)

    accuracy = correct_words / total_words
    return accuracy
def character_accuracy(reference, generated):
    correct_chars = sum(1 for ref, gen in zip(reference, generated) if ref == gen)
    total_chars = len(reference)

    if total_chars == 0:
        return 0  # Or return None or any other value that signifies undefined accuracy

    accuracy = correct_chars / total_chars
    return accuracy
import Levenshtein

def levenshtein_distance(reference, generated):
    distance = Levenshtein.distance(reference, generated)
    return distance

# Sample reference text and generated text (with diacritics)
reference_text = remove_non_diacritics(aligned_original_text)
generated_text = remove_non_diacritics(aligned_generated_text)

# Calculate metrics
word_acc = word_accuracy(reference_text, generated_text)
char_acc = character_accuracy(reference_text, generated_text)
levenshtein_dist = levenshtein_distance(reference_text, generated_text)
new_target, new_generated = remove_corresponding_indices(reference_text, generated_text)
diacritics_acc = character_accuracy(new_target, new_generated)

# Print results
print(f"Reference Text: {reference_text}")
print(f"Generated Text: {generated_text}")
print(f"Word Accuracy: {word_acc * 100:.2f}%")
print(f"Character Accuracy: {char_acc * 100:.2f}%")
print(f"Levenshtein Distance: {levenshtein_dist}")
print(f"Diacritics Accuracy: {diacritics_acc * 100:.2f}%")


for index in range(df.shape[0]):
    word_acc = word_accuracy(df.loc[index,"Filtered Target Text"], df.loc[index,"Filtered Generated Text"])
    char_acc = character_accuracy(df.loc[index,"Filtered Target Text"], df.loc[index,"Filtered Generated Text"])
    levenshtein_dist = levenshtein_distance(df.loc[index,"Filtered Target Text"], df.loc[index,"Filtered Generated Text"])
    new_target, new_generated = remove_corresponding_indices(df.loc[index,"Filtered Target Text"], df.loc[index,"Filtered Generated Text"])
    diacritics_acc = character_accuracy(new_target, new_generated)
    df.at[index, 'Word Accuracy'] = word_acc
    df.at[index, 'Character Accuracy'] = char_acc
    df.at[index, 'Levenshtein Distance'] = levenshtein_dist
    df.at[index, 'Diacritics Accuracy'] = diacritics_acc



In [None]:
import pandas as pd

# Assuming 'df' is your DataFrame

# Specify the directory where you want to save the file
directory_path = '/content/drive/MyDrive/BulkTestGPT4NoDictionary'

# Specify the filename
filename = 'GPT4Results.csv'

# Full path
full_path = f'{directory_path}/{filename}'

# Save the DataFrame to CSV
df.to_csv(full_path, index=False)

print(f'DataFrame saved to {full_path}')


In [None]:
df.describe()