# <center>ARABIC DIALECT CLASSIFICATION

## <center>Using Machine Learning and Deep Learning

### <center>By: Zahaf Boualem & Rabiai Mehdi Ayoub

# Datasets

### IADD Dataset

In [None]:
import pandas as pd
import json

# Load the JSON file
with open('Data/country/raw_data/IADD_dataset/IADD.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

# Create lists to store the relevant columns
tweets = []
regions = []
countries = []

# Loop through the JSON data and extract the required information
for entry in data:
    tweets.append(entry['Sentence'])    # Append the tweet (sentence)
    regions.append(entry['Region'])     # Append the region
    countries.append(entry['Country'])  # Append the country

# Create a pandas DataFrame with the required columns
IADD = pd.DataFrame({'tweets': tweets, 'region': regions, 'country': countries})

# Remove rows where the country is 'NA'
IADD = IADD[IADD['country'] != 'NA']

# Calculate the number of samples per country
counts_by_country = IADD['country'].value_counts()

# Display the result
print(counts_by_country)


In [None]:
# Save the DataFrame to a CSV file
IADD.to_csv('Data\country\csv_data\IADD_dataset/IADD_data.csv', index=False)

### Madar dataset

In [None]:
import os
import pandas as pd

# Specify the path to the folder containing the files
folder_path = "Data/country/raw_data/MADAR_dataset"

# Initialize a dictionary to store the data for each country
country_data = {}

# Deductions for common cities to map them to countries
city_to_country = {
    "Aleppo": "Syria",
    "Alexandria": "Egypt",
    "Algiers": "Algeria",
    "Amman": "Jordan",
    "Aswan": "Egypt",
    "Baghdad": "Iraq",
    "Basra": "Iraq",
    "Beirut": "Lebanon",
    "Benghazi": "Libya",
    "Cairo": "Egypt",
    "Damascus": "Syria",
    "Doha": "Qatar",
    "Fes": "Morocco",
    "Jeddah": "Saudi Arabia",
    "Jerusalem": "Palestine",
    "Khartoum": "Sudan",
    "Mosul": "Iraq",
    "MSA": "MSA",  # Modern Standard Arabic
    "Muscat": "Oman",
    "Rabat": "Morocco",
    "Riyadh": "Saudi Arabia",
    "Salt": "Jordan",
    "Sfax": "Tunisia",
    "Tripoli": "Libya",
    "Tunis": "Tunisia",
}

# Loop through all files in the folder
for filename in os.listdir(folder_path):
    # Check if the file matches the format MADAR.corpus.City
    if filename.startswith("MADAR.corpus.") and filename.endswith(".tsv"):
        # Extract the city name from the file
        city = filename.replace("MADAR.corpus.", "").replace(".tsv", "")
        
        # Deduce the country based on the city name
        country = city_to_country.get(city, "Unknown")  # Use "Unknown" if city is not listed
        
        # Read the TSV file using pandas
        file_path = os.path.join(folder_path, filename)
        city_df = pd.read_csv(file_path, delimiter='\t')  # Ensure the delimiter matches the TSV format
        
        # Rename columns and adjust their order
        city_df = city_df.rename(columns={"sent": "tweets", "lang": "province"})
        city_df = city_df[["tweets", "province"]]
        
        # Add the city's data to the dictionary, using the country name as the key
        if country not in country_data:
            country_data[country] = []
        country_data[country].append(city_df)

# Create a folder to store CSV files for each country
output_folder = "Data/country/csv_data/MADAR_dataset"
os.makedirs(output_folder, exist_ok=True)

# Loop through the data for each country and save it to a CSV file
for country, data_list in country_data.items():
    country_df = pd.concat(data_list, ignore_index=True)
    output_filename = os.path.join(output_folder, f"{country}_data.csv")
    country_df.to_csv(output_filename, index=False)

print("CSV files for each country have been successfully created.")

# Now, combine the country-specific CSV files into a final DataFrame
# Specify the folder path containing the country_data.csv files
folder_path = "Data/country/csv_data/MADAR_dataset"

# Initialize a list to store the DataFrames from each file
dfs = []

# Loop through all files in the folder
for filename in os.listdir(folder_path):
    # Check if the file matches the format country_data.csv
    if filename.endswith("_data.csv"):
        # Extract the country name from the file
        country = filename.replace("_data.csv", "")
        
        # Read the CSV file using pandas
        file_path = os.path.join(folder_path, filename)
        country_df = pd.read_csv(file_path)
        
        # Add a "country" column based on the file name
        country_df["country"] = country
        
        # Add the DataFrame to the list
        dfs.append(country_df)

# Concatenate all DataFrames in the list into a single DataFrame
MADAR = pd.concat(dfs, ignore_index=True)

# Reorganize the columns according to the required structure
MADAR = MADAR[["tweets", "country", "province"]]

# Save the final DataFrame to a CSV file
output_filename = os.path.join(folder_path, "MADAR_data.csv")
MADAR.to_csv(output_filename, index=False)

print(f"The combined CSV file has been successfully created: {output_filename}")


In [None]:
import os

# Specify the folder path containing the country_data.csv files
folder_path = "Data/country/csv_data/MADAR_dataset"

# Loop through all files in the folder
for filename in os.listdir(folder_path):
    # Check if the file is in the country_data.csv format and is not the final combined file
    if filename.endswith("_data.csv") and filename != "MADAR_data.csv":
        file_path = os.path.join(folder_path, filename)
        os.remove(file_path)  # Remove the file
        print(f"File deleted: {filename}")

print("Deletion of individual country CSV files completed.")


### Organized data

In [None]:
import pandas as pd

# Modify the IADD dataset by renaming columns and dropping the 'region' column
IADD = IADD.rename(columns={"tweets": "text", "country": "dialect"}).drop("region", axis=1)

# Modify the MADAR dataset by renaming columns and dropping the 'province' column
MADAR = MADAR.rename(columns={"tweets": "text", "country": "dialect"}).drop("province", axis=1)

# Load the MyData dataset from a CSV file
MyData = pd.read_csv("Data\country\csv_data\MyData\MyData.csv")

# Replace the full country names with their respective country codes
country_mapping = {
    "United Arab Emirates": "AE",
    "Bahrain": "BH",
    "Algeria": "DZ",
    "Egypt": "EG",
    "Iraq": "IQ",
    "Jordan": "JO",
    "Kuwait": "KW",
    "Lebanon": "LB",
    "Libya": "LY",
    "Morocco": "MA",
    "Modern Standard Arabic": "MSA",
    "Oman": "OM",
    "Palestine": "PL",
    "Qatar": "QA",
    "Saudi Arabia": "SA",
    "Sudan": "SD",
    "Syria": "SY",
    "Tunisia": "TN",
    "Yemen": "YE"
}

# Map the country names to their respective codes in the IADD dataset
IADD["dialect"] = IADD["dialect"].map(country_mapping)

# Save the updated IADD dataset to a new CSV file
output1_path = "Data/country/csv_data/IADD_dataset/IADD_data.csv"
IADD.to_csv(output1_path, index=False)

# Map the country names to their respective codes in the MADAR dataset
MADAR["dialect"] = MADAR["dialect"].map(country_mapping)

# Save the updated MADAR dataset to a new CSV file
output2_path = "Data\country\csv_data\MADAR_dataset\MADAR_data.csv"
MADAR.to_csv(output2_path, index=False)

# Count the number of samples per country (dialect) in each dataset
IADD_count = IADD.groupby("dialect")["text"].count().reset_index()
MADAR_count = MADAR.groupby("dialect")["text"].count().reset_index()
MyData_count = MyData.groupby("dialect")["text"].count().reset_index()

# Display the results
print("Count for IADD Dataset:\n", IADD_count)
print("\nCount for MADAR Dataset:\n", MADAR_count)
print("\nCount for MyData Dataset:\n", MyData_count)


In [None]:
# Initialize a list to store the DataFrames from each dataset
dfs = [IADD, MADAR, MyData]

# Concatenate all DataFrames into a single one
all_data = pd.concat(dfs, ignore_index=True)

# Exclude samples from the 'YE' (Yemen) dialect
all_data = all_data[all_data['dialect'] != 'YE']

# Group the DataFrame by dialect and count the number of text samples per group
samples_per_country = all_data.groupby("dialect")["text"].count()

# Display the number of samples for each country
for country, sample_count in samples_per_country.items():
    print(f"There are {sample_count} sample(s) for the country {country}.")


### Equilibrage des données

In [None]:
# Define the desired number of samples per country
desired_sample_count = 21000

# Initialize a dictionary to store the balanced DataFrames for each country
balanced_data = {}

# Iterate through each unique country in the 'all_data' DataFrame
for country in all_data['dialect'].unique():
    # Select samples for the current country
    country_df = all_data[all_data['dialect'] == country]
    
    # Limit the number of samples to the desired count or the total available, whichever is smaller
    balanced_country_df = country_df.sample(min(desired_sample_count, len(country_df)), random_state=42)
    
    # Add the balanced DataFrame to the dictionary
    balanced_data[country] = balanced_country_df

# Create a balanced DataFrame by concatenating all the country-specific DataFrames
all_data = pd.concat(list(balanced_data.values()), ignore_index=True)

# Display the number of samples per country after balancing
print("Number of samples per country after balancing:")
print(all_data['dialect'].value_counts())


In [None]:
# Specific dialects to be removed from the DataFrame
dialects_to_remove = ['TN', 'BH', 'AE', 'KW']

# Remove rows with specific dialects from 'all_data'
for dialect in dialects_to_remove:
    all_data = all_data[all_data['dialect'] != dialect]

# Add corresponding rows from 'MyData' for the specific dialects
for dialect in dialects_to_remove:
    all_data = pd.concat([all_data, MyData[MyData['dialect'] == dialect]], ignore_index=True)


# Preprocessing and Normalization

## Preprocessing

In [None]:
import re
import string
import pyarabic.araby as araby
import nltk
from nltk.corpus import stopwords
import textblob
from sklearn import preprocessing

arabic_punctuations = '''`÷×؛<>_()*&^%][ـ،/:"؟.,'{}~¦+|!”…“–ـ'''
english_punctuations = string.punctuation
punctuations_list = arabic_punctuations + english_punctuations

emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002500-\U00002BEF"  # chinese char
                           u"\U00002702-\U000027B0"
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           u"\U0001f926-\U0001f937"
                           u"\U00010000-\U0010ffff"
                           u"\u2640-\u2642"
                           u"\u2600-\u2B55"
                           u"\u200d"
                           u"\u23cf"
                           u"\u23e9"
                           u"\u231a"
                           u"\ufe0f"  
                           u"\u3030"
                           "]+", flags=re.UNICODE)

arabic_diacritics = re.compile("""
                             ّ    | # Tashdid
                             َ    | # Fatha
                             ً    | # Tanwin Fath
                             ُ    | # Damma
                             ٌ    | # Tanwin Damm
                             ِ    | # Kasra
                             ٍ    | # Tanwin Kasr
                             ْ    | # Sukun
                             ـ     # Tatwil/Kashida
                         """, re.VERBOSE)


def text_cleaning(text):
    # Removing Punctuations and Symbols
    translator = str.maketrans('', '', punctuations_list)
    text = text.translate(translator)
    # Remove Emojis
    text = emoji_pattern.sub(r'', text)
    return text


def text_normalization(text):
    text = re.sub(arabic_diacritics, '', text)
    text = araby.strip_diacritics(text)
    text = araby.strip_shadda(text)
    text = araby.strip_tashkeel(text)

    # Remove non-arabic chars
    text = re.sub('[%s]' % re.escape(
        """!"#$%&'()*+,،-./:;<=>؟?@[\]^_`{|}~"""), ' ', text)
    text = re.sub('([@A-Za-z0-9_ـــــــــــــ]+)|[^\w\s]|#|http\S+', ' ', text)
    text = re.sub(r'\\u[A-Za-z0-9\\]+', ' ', text)
    # Remove repeated letters
    text = re.sub(r'[^\u0600-\u06FF\s]', ' ', text)
    text = text.strip()
    text = re.sub("[إأٱآا]", "ا", text)
    text = re.sub("ى", "ي", text)
    text = re.sub("ؤ", "ء", text)
    text = re.sub("ئ", "ء", text)
    text = re.sub("ة", "ه", text)
    text = re.sub(r'(.)\1+', r'\1', text)
    return text


def text_preprocessing(dataFrame, text_column):
    print("[INFO] Starting of pre-processing on the text")
    dataFrame[text_column] = dataFrame[text_column].astype(str)
    print("[INFO] Starting with text cleaning on the text")
    dataFrame[text_column] = dataFrame[text_column].apply(text_cleaning)
    print("[INFO] Finishing with text cleaning on the text")

    

    # Remove stop words
    nltk.download('stopwords')
    stop = stopwords.words('arabic')
    dataFrame[text_column] = dataFrame[text_column].apply(
        lambda x: " ".join(x for x in x.split() if x not in stop))

    print("[INFO] Starting with text normalization on the text")
    dataFrame[text_column] = dataFrame[text_column].apply(text_normalization)
    print("[INFO] Finishing with text normalization on the text")

    # Lemmatisation
    nltk.download('wordnet')
    dataFrame[text_column] = dataFrame[text_column].apply(
        lambda x: " ".join([textblob.Word(word).lemmatize() for word in x.split()]))

    print("[INFO] Finished pre-processing on the text")
    print("[INFO] Last step is encoding the class lables")

    return dataFrame


def inference_cleaning(text):
    text = text_cleaning(text)
    text = text_normalization(text)
    return text

def supprimer_lignes_courtes(df):
    # Filtrer les lignes où la longueur du texte est supérieure ou égale à 4 caractères
    df = df[df['text'].apply(lambda x: len(str(x)) >= 4)]
    
    return df



if __name__ == '__main__':
    print(all_data.info())
    processed_dataFrame = text_preprocessing(all_data, 'text')
    processed_dataFrame = supprimer_lignes_courtes(processed_dataFrame)
    print(processed_dataFrame.info())
    # Save preprocessed dataframe to CSV
    processed_dataFrame.to_csv('Data\country\preprocessed_data/dataset.csv', index=False)

## Visualization

In [None]:
# Display the number of samples per country after balancing
print("Number of samples per country after balancing:")
print(processed_dataFrame['dialect'].value_counts())


# Entrainement et Prediction

## Splitting the data

In [None]:
import pandas as pd
import random

# Initialize a DataFrame to store the test data
test = pd.DataFrame(columns=["text", "dialect"])

# Iterate through each unique dialect
for dialect in processed_dataFrame['dialect'].unique():
    # Randomly select 1800 texts for each dialect
    selected_texts = processed_dataFrame[processed_dataFrame['dialect'] == dialect].sample(1800)

    # Add the selected texts to the test DataFrame
    test = pd.concat([test, selected_texts])

    # Remove the selected texts from the original DataFrame
    processed_dataFrame = processed_dataFrame.drop(selected_texts.index)

# The remaining data becomes the training set
train = processed_dataFrame

# Display the number of samples per dialect in the test DataFrame
print(test['dialect'].value_counts())
# Display the number of samples per dialect in the training DataFrame
print(train['dialect'].value_counts())


## NB Classifier (Naive Bayes)

### Train and test data

In [None]:
X_train = train['text']
y_train = train['dialect']
X_test = test['text']
y_test = test['dialect']

### Vectorization

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

corpus = X_train
# Initizalize the vectorizer with max nr words and ngrams (1: single words, 2: two words in a row)
vectorizer_tfidf = TfidfVectorizer(ngram_range=(1,2))
# Fit the vectorizer to the training data
vectorizer_tfidf.fit(corpus)

### Training

In [None]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score


# Create the Naive Bayes classifier
classifier_tfidf_NB = MultinomialNB()

# Create the pipeline
model_tfidf_NB = Pipeline([("vectorizer", vectorizer_tfidf), ("classifier", classifier_tfidf_NB)])

# Perform cross-validation
cross_val_scores = cross_val_score(model_tfidf_NB, X_train, y_train, cv=6)

# Print cross-validation scores
print("Cross-validation scores for each fold:", cross_val_scores)

# Print the mean cross-validation score
print("Mean cross-validation score:", cross_val_scores.mean())

# Train the model on the full training set
model_tfidf_NB.fit(X_train, y_train)

### Testing

In [None]:
from sklearn.metrics import accuracy_score


# Predict and evaluate the model on the test data
predicted_test_nb = model_tfidf_NB.predict(X_test)
accuracy_test_nb = accuracy_score(y_test, predicted_test_nb)
print('Accuracy Test data for NB model: {:.1%}'.format(accuracy_test_nb))



### Matrice de confusion

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

data = confusion_matrix(y_test, predicted_test_nb)
df_cm = pd.DataFrame(data, columns=np.unique(y_test), index = np.unique(y_test))
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'


f, ax = plt.subplots(figsize=(12, 12))
cmap = sns.cubehelix_palette(light=1, as_cmap=True)

sns.heatmap(df_cm, cbar=False, annot=True, cmap=cmap, square=True, fmt='.0f',
            annot_kws={'size': 18})
plt.title('Actuals vs Predicted')

# Save the plot as a PNG file
plt.savefig('Result/country/MachineLearning/NB/confusion_matrix.png')

plt.show()

## Random Forest Classifier (RFC)

### Train and test split data

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Define the number of samples to select for training and testing
samples_for_train = 9000
samples_for_test = 700

# Initialize the DataFrames for training and testing
train_RF = pd.DataFrame()
test_RF = pd.DataFrame()

# Loop through each unique dialect
for dialect in train['dialect'].unique():
    # Select samples for training
    train_dialect = train[train['dialect'] == dialect].sample(samples_for_train, random_state=42)
    train_RF = pd.concat([train_RF, train_dialect])

    # Select samples for testing
    test_dialect = test[test['dialect'] == dialect].sample(samples_for_test, random_state=42)
    test_RF = pd.concat([test_RF, test_dialect])

# Ensure that the data is shuffled
train_RF = train_RF.sample(frac=1, random_state=42).reset_index(drop=True)
test_RF = test_RF.sample(frac=1, random_state=42).reset_index(drop=True)

# Display information about the new DataFrames
print("train_RF shape:", train_RF.shape)
print("test_RF shape:", test_RF.shape)


### Splitting the data

In [None]:
X_train_RF = train_RF['text']
y_train_RF = train_RF['dialect']
X_test_RF = test_RF['text']
y_test_RF = test_RF['dialect']

### Vectorization (TF-IDF)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

corpus = X_train_RF
# Initizalize the vectorizer with max nr words and ngrams (1: single words, 2: two words in a row)
vectorizer_tfidf = TfidfVectorizer(ngram_range=(1,3))
# Fit the vectorizer to the training data
vectorizer_tfidf.fit(corpus)

### Training

In [None]:
from tqdm import tqdm
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import cross_val_score

# Create the Random Forest classifier
classifier_tfidf_RF = RandomForestClassifier(n_estimators=25)

# Create the pipeline with the Tfidf vectorizer
model_tfidf_RF = Pipeline([("vectorizer", TfidfVectorizer()), ("classifier", classifier_tfidf_RF)])

# Set verbose mode for the classifier
model_tfidf_RF.named_steps['classifier'].verbose = 1

# Use cross_val_score to perform cross-validation
cross_val_scores = cross_val_score(model_tfidf_RF, X_train_RF, y_train_RF, cv=6)  # 6 folds

# Display the cross-validation scores for each fold
print("Cross-validation scores for each fold:", cross_val_scores)

# Display the average cross-validation score
print("Average cross-validation score:", cross_val_scores.mean())

# Train the model on the complete training set
model_tfidf_RF.fit(X_train_RF, y_train_RF)


### Testing

In [None]:
from sklearn.metrics import accuracy_score

predicted_test_tfidf_RF = model_tfidf_RF.predict(X_test_RF)
accuracy_test_tfidf_RF = accuracy_score(y_test_RF, predicted_test_tfidf_RF)
print('Accuracy Test data for RF model: {:.1%}'.format(accuracy_test_tfidf_RF))

### Matrice de confusion

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

data = confusion_matrix(y_test_RF, predicted_test_tfidf_RF)
df_cm = pd.DataFrame(data, columns=np.unique(y_test_RF), index = np.unique(y_test_RF))
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'


f, ax = plt.subplots(figsize=(12, 12))
cmap = sns.cubehelix_palette(light=1, as_cmap=True)

sns.heatmap(df_cm, cbar=False, annot=True, cmap=cmap, square=True, fmt='.0f',
            annot_kws={'size': 18})
plt.title('Actuals vs Predicted')

# Save the plot as a PNG file
plt.savefig('Result/country/MachineLearning/RandomForest/confusion_matrix.png')

plt.show()

## Logistic Regression Classifier (LR)

### Train and test split data

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Define the number of samples to select for training and testing
samples_for_train = 9000
samples_for_test = 700

# Initialize the DataFrames train_LR and test_LR
train_LR = pd.DataFrame()
test_LR = pd.DataFrame()

# Loop through each dialect
for dialect in train['dialect'].unique():
    # Select samples for training
    train_dialect = train[train['dialect'] == dialect].sample(samples_for_train, random_state=42)
    train_LR = pd.concat([train_LR, train_dialect])

    # Select samples for testing
    test_dialect = test[test['dialect'] == dialect].sample(samples_for_test, random_state=42)
    test_LR = pd.concat([test_LR, test_dialect])

# Ensure that the data is shuffled
train_LR = train_LR.sample(frac=1, random_state=42).reset_index(drop=True)
test_LR = test_LR.sample(frac=1, random_state=42).reset_index(drop=True)

# Display the shapes of the new DataFrames
print("train_LR shape:", train_LR.shape)
print("test_LR shape:", test_LR.shape)


### Splitting the data

In [None]:
X_train_LR = train_LR['text']
y_train_LR = train_LR['dialect']
X_test_LR = test_LR['text']
y_test_LR = test_LR['dialect']

### Vectorization (TF-IDF)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

corpus = X_train_LR
# Initizalize the vectorizer with max nr words and ngrams (1: single words, 2: two words in a row)
vectorizer_tfidf = TfidfVectorizer(ngram_range=(1,3))
# Fit the vectorizer to the training data
vectorizer_tfidf.fit(corpus)

### Training

In [None]:
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import cross_val_score

# Initialize the Logistic Regression classifier
classifier_tfidf_LR = LogisticRegression()

# Create the pipeline with the Tfidf vectorizer and the Logistic Regression classifier
model_tfidf_LR = Pipeline([("vectorizer", TfidfVectorizer()), ("classifier", classifier_tfidf_LR)])

# Set verbose mode for the classifier to show progress
model_tfidf_LR.named_steps['classifier'].verbose = 1

# Use cross_val_score to perform cross-validation
cross_val_scores_LR = cross_val_score(model_tfidf_LR, X_train_LR, y_train_LR, cv=6)  # 6-fold cross-validation

# Display the cross-validation scores for each fold
print("Cross-validation scores for each fold:", cross_val_scores_LR)

# Display the average score from cross-validation
print("Average cross-validation score:", cross_val_scores_LR.mean())

# Train the model on the complete training set
model_tfidf_LR.fit(X_train_LR, y_train_LR)


### Testing

In [None]:
from sklearn.metrics import accuracy_score

predicted_test_tfidf_LR = model_tfidf_LR.predict(X_test_LR)
accuracy_test_tfidf_LR = accuracy_score(y_test_LR, predicted_test_tfidf_LR)
print('Accuracy Test data for LR model: {:.1%}'.format(accuracy_test_tfidf_LR))

### Matrice de confusion

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

data = confusion_matrix(y_test_LR, predicted_test_tfidf_LR)
df_cm = pd.DataFrame(data, columns=np.unique(y_test_LR), index = np.unique(y_test_LR))
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'


f, ax = plt.subplots(figsize=(12, 12))
cmap = sns.cubehelix_palette(light=1, as_cmap=True)

sns.heatmap(df_cm, cbar=False, annot=True, cmap=cmap, square=True, fmt='.0f',
            annot_kws={'size': 18})
plt.title('Actuals vs Predicted')

# Save the plot as a PNG file
plt.savefig('Result/country/MachineLearning/LogisticRegression/confusion_matrix.png')

plt.show()

## K-Nearest Neighbors Classifier (KNN)

### Train and test split data

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Define the number of samples you want to select for training and testing
samples_for_train = 9000
samples_for_test = 700

# Initialize the DataFrames train_knn and test_knn
train_knn = pd.DataFrame()
test_knn = pd.DataFrame()

# Loop through each dialect
for dialect in train['dialect'].unique():
    # Select samples for training
    train_dialect = train[train['dialect'] == dialect].sample(samples_for_train, random_state=42)
    train_knn = pd.concat([train_knn, train_dialect])

    # Select samples for testing
    test_dialect = test[test['dialect'] == dialect].sample(samples_for_test, random_state=42)
    test_knn = pd.concat([test_knn, test_dialect])

# Ensure data is shuffled
train_knn = train_knn.sample(frac=1, random_state=42).reset_index(drop=True)
test_knn = test_knn.sample(frac=1, random_state=42).reset_index(drop=True)

# Display information about the new DataFrames
print("train_knn shape:", train_knn.shape)
print("test_knn shape:", test_knn.shape)


### Train and test split data

In [None]:
# Separate features and labels
X_train_knn = train_knn['text']
y_train_knn = train_knn['dialect']
X_test_knn = test_knn['text']
y_test_knn = test_knn['dialect']

### vectorization (TF-IDF)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

corpus = X_train_knn
# Initizalize the vectorizer with max nr words and ngrams (1: single words, 2: two words in a row)
vectorizer_tfidf_knn = TfidfVectorizer(ngram_range=(1,3))
# Fit the vectorizer to the training data
vectorizer_tfidf_knn.fit(corpus)

### Training

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV

classifier_tfidf_knn = KNeighborsClassifier(n_neighbors=18)  

# Create the pipeline for KNN
model_tfidf_knn = Pipeline([("vectorizer", vectorizer_tfidf_knn), ("classifier", classifier_tfidf_knn)])
# Fit with a progress bar
model_tfidf_knn.named_steps['classifier'].verbose = 1

# Cross-validation for KNN
cross_val_scores_knn = cross_val_score(model_tfidf_knn, X_train_knn, y_train_knn, cv=6)

# Display cross-validation scores
print("Cross-validation scores for each fold:", cross_val_scores_knn)

# Display the average score
print("Average cross-validation score:", cross_val_scores_knn.mean())

# Train the KNN model on the entire training set
model_tfidf_knn.fit(X_train_knn, y_train_knn)

### Testing

In [None]:

# Make predictions on the test set
predicted_test_tfidf_knn = model_tfidf_knn.predict(X_test_knn)
accuracy_test_tfidf_knn = accuracy_score(y_test_knn, predicted_test_tfidf_knn)
print('Accuracy on test data: {:.1%}'.format(accuracy_test_tfidf_knn))


### Matrice de confusion

In [None]:
# Confusion matrix
data_knn = confusion_matrix(y_test_knn, predicted_test_tfidf_knn)
df_cm_knn = pd.DataFrame(data_knn, columns=np.unique(y_test_knn), index=np.unique(y_test_knn))
df_cm_knn.index.name = 'Actual'
df_cm_knn.columns.name = 'Predicted'

# Plot the confusion matrix
f_knn, ax_knn = plt.subplots(figsize=(12, 12))
cmap_knn = sns.cubehelix_palette(light=1, as_cmap=True)

sns.heatmap(df_cm_knn, cbar=False, annot=True, cmap=cmap_knn, square=True, fmt='.0f',
            annot_kws={'size': 18})
plt.title('Actuals vs Predicted (KNN)')

# Save the plot as a PNG file
plt.savefig('Result/country/MachineLearning/KNN/confusion_matrix.png')

plt.show()

## Support Vector Machine Classifier (SVM)

### Train and test split data

In [None]:
import pandas as pd

# Define the number of samples you want to select for training and testing
samples_for_train = 9000
samples_for_test = 700

# Initialize the DataFrames train_svm and test_svm
train_svm = pd.DataFrame()
test_svm = pd.DataFrame()

# Loop through each dialect
for dialect in train['dialect'].unique():
    # Select samples for training
    train_dialect = train[train['dialect'] == dialect].sample(samples_for_train, random_state=42)
    train_svm = pd.concat([train_svm, train_dialect])

    # Select samples for testing
    test_dialect = test[test['dialect'] == dialect].sample(samples_for_test, random_state=42)
    test_svm = pd.concat([test_svm, test_dialect])

# Ensure data is shuffled
train_svm = train_svm.sample(frac=1, random_state=42).reset_index(drop=True)
test_svm = test_svm.sample(frac=1, random_state=42).reset_index(drop=True)

# Display information about the new DataFrames
print("train_svm shape:", train_svm.shape)
print("test_svm shape:", test_svm.shape)

### Train and test split data

In [None]:
X_train_svm = train_svm['text']
y_train_svm = train_svm['dialect']
X_test_svm = test_svm['text']
y_test_svm = test_svm['dialect']

### Vectorization (TF-IDF)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

corpus = X_train_svm
# Initizalize the vectorizer with max nr words and ngrams (1: single words, 2: two words in a row)
vectorizer_tfidf_svm = TfidfVectorizer(ngram_range=(1,3))
# Fit the vectorizer to the training data
vectorizer_tfidf_svm.fit(corpus)

### Training

In [None]:
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import cross_val_score

classifier_tfidf_svm = SVC(kernel='linear') 

# Create the pipeline
model_tfidf_svm = Pipeline([("vectorizer", vectorizer_tfidf_svm), ("classifier", classifier_tfidf_svm)])

# Fit with a progress bar (verbose)
model_tfidf_svm.named_steps['classifier'].verbose = 1

# Use cross_val_score for cross-validation
cross_val_scores_svm = cross_val_score(model_tfidf_svm, X_train_svm, y_train_svm, cv=6)  

# Display cross-validation scores
print("Cross-validation scores for each fold:", cross_val_scores_svm)

# Display the mean score
print("Mean cross-validation score:", cross_val_scores_svm.mean())

# Train the model on the complete training set
model_tfidf_svm.fit(X_train_svm, y_train_svm)


### Testing

In [None]:
from sklearn.metrics import accuracy_score

# Make predictions on the test set
predicted_test_tfidf_svm = model_tfidf_svm.predict(X_test_svm)
accuracy_test_tfidf_svm = accuracy_score(y_test_svm, predicted_test_tfidf_svm)
print('Accuracy on test data: {:.1%}'.format(accuracy_test_tfidf_svm))

### Matrice de confusion

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Confusion matrix
data_svm = confusion_matrix(y_test_svm, predicted_test_tfidf_svm)
df_cm_svm = pd.DataFrame(data_svm, columns=np.unique(y_test_svm), index=np.unique(y_test_svm))
df_cm_svm.index.name = 'Actual'
df_cm_svm.columns.name = 'Predicted'

# Plot the confusion matrix
f_svm, ax_svm = plt.subplots(figsize=(12, 12))
cmap_svm = sns.cubehelix_palette(light=1, as_cmap=True)

sns.heatmap(df_cm_svm, cbar=False, annot=True, cmap=cmap_svm, square=True, fmt='.0f',
            annot_kws={'size': 18})
plt.title('Actuals vs Predicted (SVM)')

# Save the plot as a PNG file
plt.savefig('Result/country/MachineLearning/SVM/confusion_matrix.png')

plt.show()


### Saving the model

In [None]:
import joblib

# Define the directories where you want to save the models
save_directory_nb = "Model/country/MachineLearning/NB/"
save_directory_rf = "Model/country/MachineLearning/RandomForest/"
save_directory_lr = "Model/country/MachineLearning/LogisticRegression/"
save_directory_knn = "Model/country/MachineLearning/KNN/"
save_directory_svm = "Model/country/MachineLearning/SVM/"

# Save the models using joblib
joblib.dump(model_tfidf_NB, save_directory_nb + 'ArabicDialectClassificationNB.pkl')
joblib.dump(model_tfidf_RF, save_directory_rf + 'ArabicDialectClassificationRF.pkl')
joblib.dump(model_tfidf_LR, save_directory_lr + 'ArabicDialectClassificationLR.pkl')
joblib.dump(model_tfidf_knn, save_directory_knn + 'ArabicDialectClassificationKNN.pkl')
joblib.dump(model_tfidf_svm, save_directory_svm + 'ArabicDialectClassificationSVM.pkl')


## Deep Learning Models

## Arabert Model (BERT)

In [None]:
import pandas as pd
import numpy as np

### GPU check

In [None]:
import torch

if torch.cuda.is_available():

    # Tell PyTorch to use the GPU.
    device = torch.device("cuda")

    print('There are %d GPU(s) available.' % torch.cuda.device_count())

    print('We will use the GPU:', torch.cuda.get_device_name(0))

# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

### Mapping the labels

In [None]:
map_label={
    'EG':0,
    'SY':1,
    'PL':2,
    'KW':3,
    'LB':4,
    'LY':5,
    'JO':6,
    'DZ':7,
    'QA':8,
    'AE':9,
    'BH':10,
    'SA':11,
    'OM':12,
    'MA':13,
    'IQ':14,
    'TN':15,
    'SD':16,
    'YE':17,
    'MSA':18
}
label_map={
    0:'EG',
    1:'SY',
    2:'PL',
    3:'KW',
    4:'LB',
    5:'LY',
    6:'JO',
    7:'DZ',
    8:'QA',
    9:'AE',
    10:'BH',
    11:'SA',
    12:'OM',
    13:'MA',
    14:'IQ',
    15:'TN',
    16:'SD',
    17:'YE',
    18:'MSA'
}

### Preprocessing the data

#### The data is preprocessed already , Don t need to do it again otherwise it will take a lot of time to run

In [None]:
#from arabert.preprocess import ArabertPreprocessor
#model_name="bert-base-arabert"
#arabert_prep = ArabertPreprocessor(model_name=model_name)


### Save the processed data

In [None]:
dataset = pd.read_csv("Data\country\preprocessed_data/dataset.csv")

In [None]:
#dataset["text"]=dataset["text"].apply(lambda x:arabert_prep.preprocess(x))

### Load the processed data

In [None]:
dataset = pd.read_csv("Data\country\preprocessed_data/preprocessed_dataset_araberta.csv")

### Splitting the data

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Group the DataFrame by dialect
grouped_df = dataset.groupby('dialect')

# Initialize the training and test DataFrames
train = pd.DataFrame()
test = pd.DataFrame()

# For each group (dialect), split the data into training and test sets
for group_name, group_data in grouped_df:
    train_data, test_data = train_test_split(group_data, test_size=0.2, random_state=42)
    train = pd.concat([train, train_data])
    test = pd.concat([test, test_data])

# Reset the index of the resulting DataFrames
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)

# Display the first few rows of the resulting DataFrames
print("Train DataFrame:")
print(train.head())

print("\nTest DataFrame:")
print(test.head())


### Tokenization

In [None]:
from arabert.preprocess import ArabertPreprocessor
from sklearn.metrics import (accuracy_score, f1_score,recall_score)
from torch.utils.data import  Dataset
from transformers import (AutoConfig, AutoModelForSequenceClassification,
                        AutoTokenizer, BertTokenizer, Trainer,
                        TrainingArguments)
from transformers.data.processors.utils import InputFeatures

### Parameters

In [None]:
#chose bert model
model_name = 'aubmindlab/bert-base-arabert'

num_labels = 19
max_length = 120

In [None]:
class ClassificationDataset(Dataset):
    def __init__(self, text, target, model_name, max_len, label_map):
      super(ClassificationDataset).__init__()

      self.text = text
      self.target = target
      self.tokenizer_name = model_name
      self.tokenizer = AutoTokenizer.from_pretrained(model_name)
      self.max_len = max_len
      self.label_map = label_map


    def __len__(self):
      return len(self.text)

    def __getitem__(self,item):
      text = str(self.text[item])
      text = " ".join(text.split())

      inputs = self.tokenizer(
          text,
          max_length=self.max_len,
          padding='max_length',
          truncation=True
        )
      return InputFeatures(**inputs,label= self.target[item])

### Mapping and splitting the data

In [None]:
train['dialect'] = train['dialect'].map(map_label)
test['dialect'] = test['dialect'].map(map_label)

In [None]:
train=train[train['dialect'].isnull()==False]
test=test[test['dialect'].isnull()==False]

In [None]:
max_len = 120
train_dataset = ClassificationDataset(
    train['text'].to_list(),
    train['dialect'].to_list(),
    model_name,
    max_len,
    map_label
)
test_dataset = ClassificationDataset(
    test['text'].to_list(),
    test['dialect'].to_list(),
    model_name,
    max_len,
    map_label
)

### Model

In [None]:
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(model_name, return_dict=True, num_labels=num_labels)

In [None]:
def compute_metrics(p): #p should be of type EvalPrediction
  preds = np.argmax(p.predictions, axis=1)
  assert len(preds) == len(p.label_ids)
  macro_f1 = f1_score(p.label_ids,preds,average='macro')
  macro_recall = recall_score(p.label_ids,preds,average='macro')
  acc = accuracy_score(p.label_ids,preds)
  return {
      'macro_f1' : macro_f1,
      'accuracy': acc,
      'recall':macro_recall
  }

In [None]:
from transformers import TrainingArguments
from accelerate import Accelerator

# Utilisez Accelator to configure the auto acceleration
accelerator = Accelerator()

# Utilisez TrainingArguments de la bibliothèque transformers
training_args = TrainingArguments(
    output_dir="./train",
    adam_epsilon=1e-8,
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    gradient_accumulation_steps=4,
    num_train_epochs=15,
    warmup_ratio=0,
    do_eval=True,
    evaluation_strategy='epoch',
    save_strategy='epoch',
    save_total_limit=10,
    load_best_model_at_end=True,
    metric_for_best_model='eval_loss',
    greater_is_better=False,
    report_to=[]
)


In [None]:
trainer = Trainer(
    model = model_init(),
    args = training_args,
    train_dataset = train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)

### Training

In [None]:
trainer.train()

### Saving the model

In [None]:
#you can chose the model from checkpoint
trainer.model.config.label2id = map_label
trainer.model.config.id2label = label_map
trainer.save_model("./model")
train_dataset.tokenizer.save_pretrained("./model")

## LTSM

In [None]:
import pandas as pd
dataset = pd.read_csv("Data\country\preprocessed_data/dataset.csv", encoding='utf-8')

In [None]:
def freq_words_removal(text, lst_words):
    lst_text = text.split()
    if lst_words is not None:
        lst_text = [word for word in lst_text if word not in lst_words]
    text = " ".join(lst_text)
    return text

In [None]:
wrds = ['مع','لا','على','من','ما','في','الي','هو','انا','أنا','اله']
dataset["text_clean"] = dataset["text"].apply(lambda x: freq_words_removal(x, wrds))

In [None]:
import numpy as np
#Shuffle the dataset
dataset = dataset.reindex(np.random.permutation(dataset.index))
dataset['LABEL'] = 0

In [None]:
from keras.utils import to_categorical


#One-hot encode the lab
dataset.loc[dataset['dialect'] == 'SA', 'LABEL'] = 0
dataset.loc[dataset['dialect'] == 'QA', 'LABEL'] = 1
dataset.loc[dataset['dialect'] == 'KW', 'LABEL'] = 2
dataset.loc[dataset['dialect'] == 'AE', 'LABEL'] = 3
dataset.loc[dataset['dialect'] == 'OM', 'LABEL'] = 4
dataset.loc[dataset['dialect'] == 'JO', 'LABEL'] = 5
dataset.loc[dataset['dialect'] == 'PL', 'LABEL'] = 6
dataset.loc[dataset['dialect'] == 'BH', 'LABEL'] = 7
dataset.loc[dataset['dialect'] == 'LY', 'LABEL'] = 8
dataset.loc[dataset['dialect'] == 'EG', 'LABEL'] = 9
dataset.loc[dataset['dialect'] == 'SD', 'LABEL'] = 10
dataset.loc[dataset['dialect'] == 'IQ', 'LABEL'] = 11
dataset.loc[dataset['dialect'] == 'LB', 'LABEL'] = 12
dataset.loc[dataset['dialect'] == 'SY', 'LABEL'] = 13
dataset.loc[dataset['dialect'] == 'TN', 'LABEL'] = 14
dataset.loc[dataset['dialect'] == 'DZ', 'LABEL'] = 15
dataset.loc[dataset['dialect'] == 'MA', 'LABEL'] = 16
dataset.loc[dataset['dialect'] == 'YE', 'LABEL'] = 17
dataset.loc[dataset['dialect'] == 'MSA', 'LABEL'] = 18
print(dataset['LABEL'][:10])
labels = to_categorical(dataset['LABEL'], num_classes=19)
print(labels[:10])
if 'dialect' in dataset.keys():
    dataset.drop(['dialect'], axis=1)

In [None]:
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

n_most_common_words = 20000
max_len = 250
tokenizer = Tokenizer(num_words=n_most_common_words, filters='!"#$%&()*+,-./:;<=>?@[\]^_`{|}~', lower=True)
tokenizer.fit_on_texts(dataset['text_clean'].values)
sequences = tokenizer.texts_to_sequences(dataset['text_clean'].values)
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))

X = pad_sequences(sequences, maxlen=max_len)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X , labels, test_size=0.1, random_state=42)

In [None]:
epochs = 30
emb_dim = 128
batch_size = 256
labels[:2]

In [None]:
import tensorflow as tf
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
for device in gpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

from keras.layers import Dense, Embedding, LSTM, SpatialDropout1D
from keras.models import Sequential
from keras.callbacks import EarlyStopping



print((X_train.shape, y_train.shape, X_test.shape, y_test.shape))

model = Sequential()
model.add(Embedding(n_most_common_words, emb_dim, input_length=X.shape[1]))
model.add(SpatialDropout1D(0.7))
model.add(LSTM(64, dropout=0.7, recurrent_dropout=0.7))
model.add(Dense(19, activation='softmax'))
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
print(model.summary())


In [None]:
history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size,validation_split=0.2,callbacks=[EarlyStopping(monitor='val_loss',patience=6, min_delta=0.0001)])

In [None]:
accr = model.evaluate(X_test,y_test)
print('Test set\n  Loss: {:0.3f}\n  Accuracy: {:0.3f}'.format(accr[0],accr[1]))

In [None]:
import matplotlib.pyplot as plt
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(acc) + 1)

plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

In [None]:
txt = ["ما قبل البرنامج شي ترسخ في ذهنكم بأن مروان مراوغ وعنده مسألة إقناع هذا شي غلط الي شاهدته هو هروب ودفاع مستميت وإصرار على أن ما تحق هو نجاح"]
seq = tokenizer.texts_to_sequences(txt)
padded = pad_sequences(seq, maxlen=max_len)
pred = model.predict(padded)
labels = ['SA','QA','KW','AE','OM','JO','PL','BH','LY','EG','SD','IQ','LB','SY','TN','DZ','MA','YE','MSA']
print(pred, labels[np.argmax(pred)])