In [None]:
print("Test, World!")

In [None]:
# Install packages

# For Python packages
!pip install --upgrade PyPDF2
!pip install --upgrade pdf2image
!pip install --upgrade pdfplumber
!pip install --upgrade pytesseract
!pip install --upgrade fuzzywuzzy

!pip install python-Levenshtein
!pip install pandas
!pip install scikit-image
!pip install opencv-python
!pip install numpy
!pip install opencv-python

pip install matplotlib
pip install seaborn

# Run in terminal
# brew install poppler
# brew install tesseract


In [None]:
# Import statements
import os
import pandas as pd
import re  # For regular expressions
from PIL import Image
from PyPDF2 import PdfReader
from PyPDF2 import PdfFileReader
from pdf2image import convert_from_path
from skimage.metrics import structural_similarity as ssim
import cv2
import numpy as np
import shutil
import requests  # For downloading files
import pdfplumber
import pytesseract
from fuzzywuzzy import fuzz  # for text similarity
import json
from ast import literal_eval
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

In [None]:
# File types that are supported
SUPPORTED_EXTENSIONS = ['.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff']

# Names of the courses / certificates that are being evaluated
certificate_names = [
    'Boost Your Career with SAP Skills My Learning screenshot',
    'Learn how to learn (Google)',
    'Business Communication (Google)',
    'Communicate your ideas through storytelling and design (Google)',
    'Get started with Microsoft Teams',
    'Tech for Good: The Role of ICT in Achieving the SDGs'
]

In [None]:
# When working in Colab, mount your Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Define the directory where attachments are stored
attachments_dir = '/Users/gilbert/Downloads/attachments '

In [None]:
# Sanitize filepath and check it (requires Helper Functions to be loaded)
attachments_dir = sanitize_path(attachments_dir)

# Check if attachments directory exists
if check_file_exists(attachments_dir):
    print(f"The directory {attachments_dir} exists.")
    
    # Show the list of subfolders
    with os.scandir(attachments_dir) as entries:
        subfolders = [entry.name for entry in entries if entry.is_dir()]
    print(f"Subfolders: {subfolders}")
else:
    print(f"The directory {attachments_dir} does not exist. Please check the path.")


In [None]:
# Load template certificates into their respective folders

# Define the folder paths for each certificate type
certificate_folders = {
    'Boost Your Career with SAP Skills My Learning screenshot': '/Users/gilbert/Downloads/example_certs/Boost Your Career with SAP Skills My Learning screenshot',
    'Learn how to learn (Google)': '/Users/gilbert/Downloads/example_certs/Learn how to learn (Google)',
    'Business Communication (Google)': '/Users/gilbert/Downloads/example_certs/Business Communication (Google)',
    'Get started with Microsoft Teams': '/Users/gilbert/Downloads/example_certs/Get started with Microsoft Teams',
    'Tech for Good: The Role of ICT in Achieving the SDGs': '/Users/gilbert/Downloads/example_certs/Tech for Good: The Role of ICT in Achieving the SDGs',
    'Communicate your ideas through storytelling and design (Google)': '/Users/gilbert/Downloads/example_certs/Communicate your ideas through storytelling and design (Google)'
    # ... (more certificates and their corresponding folders)
}

# Populate the cert_name_to_example_paths dictionary by listing all files in each folder
cert_name_to_example_paths = {}
for cert_name, folder_path in certificate_folders.items():
    if os.path.exists(folder_path):
        files_in_folder = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
        cert_name_to_example_paths[cert_name] = files_in_folder
    else:
        print(f"Warning: The folder for '{cert_name}' does not exist at path:\n\t{folder_path}")

# Debug: Print to verify
print("\n--- Certificate Names and their Example Paths ---")
for cert_name, example_paths in cert_name_to_example_paths.items():
    print(f"\nCertificate Name: {cert_name}")
    for path in example_paths:
        print(f"\t- {path}")

# Check if all certificate folders have at least one file
print("\n--- Certificate Folders with Missing Example Files ---")
for cert_name, example_paths in cert_name_to_example_paths.items():
    if not example_paths:
        print(f"Warning: No example files found for '{cert_name}'")


In [None]:
# Download example_certs to have a record

# Create a zip file from the folder
shutil.make_archive('/content/example_certs', 'zip', '/content/example_certs')

# Download the zip file
files.download('/content/example_certs.zip')


In [None]:
# Extract text from example certificates to be used for comparrison

# Initialize an empty dictionary to hold the extracted text for each example certificate
example_cert_texts = {}

# Loop through each certificate type and its corresponding example paths
for cert_name, example_paths in cert_name_to_example_paths.items():
    example_cert_texts[cert_name] = []
    print(f"Processing certificates of type: {cert_name}")
    print("=" * 50)  # Print a separator line for readability

    for example_path in example_paths:
        try:
            if os.path.exists(example_path):
                extracted_text = extract_text_from_file(example_path)
                example_cert_texts[cert_name].append(extracted_text)
                print(f"Successfully processed: {example_path}")
            else:
                print(f"\nWarning: The example certificate file for {cert_name} is missing.\nPath: {example_path}\n")
                print("-" * 50)  # Print a separator line for readability

        except pytesseract.TesseractError as e:
            print(f"\nError: An issue occurred while processing the file at {example_path}.\nError Details: {e}\n")
            print("-" * 50)  # Print a separator line for readability

    print("=" * 50)  # Print a separator line for readability

# The dictionary example_cert_texts will now hold the extracted text for each certificate that exists
print("\nFinal extracted text dictionary:")

for cert_type, extracted_texts in example_cert_texts.items():
    print(f"\nCertificate Type: {cert_type}")
    print("-" * 50)  # Separator

    for i, text in enumerate(extracted_texts):
        print("\n\tExample {}:\n\t-------------------".format(i + 1))
        print("\t{}".format(text.replace('\n', '\n\t')))  # Replacing new lines within the text with new lines followed by tabs for better readability

    print("=" * 50)  # Separator




In [None]:
# Define key phrases to calculate the 'Text Similarity Score'

# Dictionary to hold key phrases for each certificate type
key_phrases_by_cert = {
    'Business Communication (Google)': [
        'Business Communication', 'Goodwill Community Foundation',
        'Congratulations! You answered enough questions correctly',
        'Correct Answers: 5/5', 'Total Points: 5/5','Google', 'Passed', 'Assessment Passed', '100%'
    ],
    'Boost Your Career with SAP Skills My Learning screenshot': [
        'Date completed', 'Learning Journey', 'Completed', 'sap', 'SAP Learning Journey', 
        'Boost Your Career with SAP Skills', 'Available', "Type Name Status Date completed", "Hi, Aliyu - welcome to My Learning",
        "What's next"
    ],
    'Learn how to learn (Google)': [
        'Communicate Effectively at Work', 'HAS COMPLETED', 'THIS CERTIFIES THAT', 'AWARDED ON',
        'Applied Digital Skills', 'date:', 'lesson:'
    ],
    'Communicate your ideas through storytelling and design (Google)': [
        'Communicate your ideas', 'Storytelling', 'Design', 'Google','OpenClassrooms',
        'Congratulations!','Assessment Passed', '4/4 Correct Answers', 'Total Points: 4/4'
    ],
    'Get started with Microsoft Teams': [
        'Microsoft Teams', 'Facilitate meetings', 'and chats', 'through conversations in channels and chats',
        'settings as a team owner in Microsoft Teams','Learn how to create teams and channels',
        'Personalize your environment by managing your', 'Number of modules completed', 
        'collaborating with communicate and collaborate more effectively', 'using Outlook with Teams',
        'Module title Description Completed Duration', 'Learn how to use Microsoft Teams to schedule'
    ],
    'Tech for Good: The Role of ICT in Achieving the SDGs': [
        'Tech for Good', 'ICT', 'SDGs', 'Certificate', 'The Role of ICT in Achieving the SDGs',
        'Course Progress Dates Discussion Overview About the Partners', 
        'This represents how much of the course content you have completed',
        'You are in an audit track and do not qualify for a certificate', 'Earn a certificate', 'Upgrade now'
    ]
}

In [None]:
# Set CSV path

# Define the file path
file_path = '/Users/gilbert/Downloads/view_2.csv'

In [None]:
# Read the csv as a dataframe

# Read the renamed CSV file into a DataFrame
df_learners = pd.read_csv(file_path)

# Display the first few rows to get an overview of the data
df_learners.head()

In [None]:
df_learners.info()

In [None]:
# Get column names
df_learners.columns

In [None]:
# Select relevant columns, including all certificates and screenshots, into a new data frame
df_learners_selected = df_learners[['Learner ID',
                                    'Email address',
                                    'First Name',
                                    'Last name',
                                    'Block 1 progress %',
                                    'Boost Your Career with SAP Skills My Learning screenshot',
                                    'Learn how to learn (Google)',
                                    'Business Communication (Google)',
                                    'Communicate your ideas through storytelling and design (Google)',
                                    'Get started with Microsoft Teams',
                                    'Tech for Good: The Role of ICT in Achieving the SDGs'
                                    ]]

In [None]:
# Check the data types of each column
df_learners_selected.info()

In [None]:
# How many learners have started / made progress?

df_learners_selected['Block 1 progress %'].value_counts()

In [None]:
# Filter the DataFrame to include only learners who have started
df_learners_selected = df_learners_selected[df_learners_selected['Block 1 progress %'] >= 0.1]

In [None]:
# Check the DataFrame has been filtered
df_learners_selected.info()

In [None]:
# Check for missing values - incomplete courses
df_learners_selected.isna().sum()

In [None]:
df_learners_selected.columns

In [None]:
# Extract file names of the certificate images uploaded by learners

columns_to_process = certificate_names

for col in columns_to_process:
    for idx, data_str in df_learners_selected[col].items():
        if isinstance(data_str, str):  # Skip if not a string
            try:
                data = literal_eval(data_str)
                filenames = []
                for entry in data:
                    if 'filename' in entry:
                        filenames.append(entry['filename'])
                if filenames:
                    df_learners_selected.at[idx, col] = filenames
            except (ValueError, SyntaxError):  # Catching errors related to literal_eval
                print(f"Could not decode: {data_str} in column {col}")


In [None]:
# Confirm file names have been extracted in lists
df_learners_selected.head()

In [None]:
df_learners_selected.info()

In [None]:
# Find a specific file

# Assuming df_learners_selected is your DataFrame and 'column_name' is the name of the column you want to search in
matched_rows = df_learners_selected.loc[df_learners_selected['Boost Your Career with SAP Skills My Learning screenshot'] == 'Screenshot_20230913-233344_Chrome.jpg']

# If you want to search in all columns, you can do:
matched_rows = df_learners_selected[df_learners_selected.apply(lambda row: row.astype(str).str.contains('Screenshot_20230913-233344_Chrome.jpg').any(), axis=1)]

# Now, matched_rows will contain all the rows where 'Screenshot_20230913-233344_Chrome.jpg' appears
print(matched_rows)


In [None]:
# Helper Functions

def split_file_url_complex(entry):
    '''
    This function, split_file_url_complex, takes an entry from the "Business Communication (Google)" column
    and separates it into multiple filenames and URLs. It uses regular expressions to find URLs and isolates
    the filenames accordingly.

    Parameters:
        - entry (str): The original entry containing both filenames and URLs.

    Returns:
        - pd.Series: A pandas Series containing two lists, one for filenames and one for URLs.

    Example:
        Input: 'Screenshot.png (https://example.com/1), Image.jpg (https://example.com/2)'
        Output: pd.Series([['Screenshot.png', 'Image.jpg'], ['https://example.com/1', 'https://example.com/2']])
    '''

    filenames = []
    urls = []

    if not isinstance(entry, str):
        return pd.Series([filenames, urls])

    # Use regex to find all URLs in the entry
    found_urls = re.findall(r'https?://[^\s,)]+', entry)
    urls.extend(found_urls)

    # Remove the found URLs from the original entry to isolate filenames
    for url in found_urls:
        entry = entry.replace(url, '')

    # Split the remaining entry by comma to get filenames
    filenames = entry.split(',')

    # Strip extra spaces and parentheses from filenames
    filenames = [name.replace('(', '').replace(')', '').strip() for name in filenames]

    return pd.Series([filenames, urls])

def download_file(url, file_path):
    r = requests.get(url)
    with open(file_path, 'wb') as f:
        f.write(r.content)

def read_pdf_form_fields(file_path):
    text_content = ''
    pdf_reader = PdfReader(open(file_path, "rb"))
    for i in range(len(pdf_reader.pages)):
        page = pdf_reader.pages[i]

        # Try to read form fields
        if '/Annots' in page:
            annotations = page['/Annots']
            for annotation in annotations:
                annotation_object = annotation.get_object()

                # Look for form fields
                if '/FT' in annotation_object and '/T' in annotation_object:
                    field_name = annotation_object['/T']
                    field_type = annotation_object['/FT']
                    field_value = None

                    if field_type == '/Tx':
                        field_value = annotation_object.get('/V', None)

                    text_content += f"\n{field_name}: {field_value}"
    return text_content.strip()

def extract_text_from_file(file_path):
    file_extension = os.path.splitext(file_path)[-1].lower()

    def image_to_text(image_path):
        try:
            # Try using OpenCV first
            print("Attempting to read with OpenCV")
            image = cv2.imread(image_path)
            gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            pil_image = Image.fromarray(gray_image)
            return pytesseract.image_to_string(pil_image, config='--psm 6').strip()
        except Exception as e1:
            print(f"Failed to process {image_path} with OpenCV: {str(e1)}")
            
            # Try using PIL as a fallback
            try:
                print("Attempting to read with PIL")
                image = Image.open(image_path)
                return pytesseract.image_to_string(image).strip()
            except Exception as e:
                print(f"An error occurred with PIL while processing {image_path}: {str(e)}")
                return ""

    try:
        if file_extension == '.pdf':
            text_content_plumber = ''

            # First, use pdfplumber
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    text_content_plumber += page.extract_text()

            # Combine and keep unique lines (assuming you have a function read_pdf_form_fields)
            text_content_reader = read_pdf_form_fields(file_path)
            combined_text = set(text_content_plumber.splitlines()) | set(text_content_reader.splitlines())
            combined_text_str = "\n".join(combined_text)

            return combined_text_str.strip()

        elif file_extension in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
            try:
                return image_to_text(file_path)
            except Exception as e:
                print(f"An error occurred while processing {file_path}: {str(e)}")
                return convert_image_and_process(file_path, image_to_text)

        else:
            return ""  # Unsupported file format

    except Exception as e:
        print(f"An error occurred while processing {file_path}: {str(e)}")
        return ""

# Test extract_text_from_file(file_path)
text = extract_text_from_file("/Users/gilbert/Downloads/content/attachments/Boost Your Career with SAP Skills My Learning screenshot/Screenshot_20230913-233344_Chrome.jpg")
print(text)

def convert_image_and_process(file_path, processing_function):
    try:
        file_extension = os.path.splitext(file_path)[-1].lower()

        # If the file is already a PNG, convert to JPG, otherwise convert to PNG
        new_extension = '.jpg' if file_extension == '.png' else '.png'

        print(f"Attempting to convert {file_path} to {new_extension} and process again.")

        converted_file_path = file_path.replace(file_extension, new_extension)
        image = Image.open(file_path)
        image.save(converted_file_path)

        return processing_function(converted_file_path)
    except Exception as e:
        print(f"Failed to process {file_path} even after converting to {new_extension}: {str(e)}")
        return ""

def convert_file_to_images(file_path):
    file_extension = os.path.splitext(file_path)[-1].lower()

    if file_extension not in SUPPORTED_EXTENSIONS:
        raise Exception(f"Unsupported file format: {file_extension}. Manual handling required.")
    
    images = []
    
    if file_extension == '.pdf':
        images = convert_from_path(file_path)
    else:
        try:
            # Try reading with OpenCV first
            print("Attempting to read with OpenCV")
            img = cv2.imread(file_path)
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            pil_img = Image.fromarray(img_rgb)
            images.append(pil_img)
        except Exception as cv_error:
            print(f"OpenCV failed to process {file_path}: {cv_error}")
            
            try:
                # Fallback to PIL
                images.append(Image.open(file_path))
                print("Attempting to read with PIL")
            except Exception as pil_error:
                print(f"An error occurred with PIL while processing {file_path}: {pil_error}")
                
    return images

def compare_images(img1, img2):
    img1_gray = cv2.cvtColor(np.array(img1), cv2.COLOR_BGR2GRAY)
    img2_gray = cv2.cvtColor(np.array(img2), cv2.COLOR_BGR2GRAY)
# Resize the image if they are not of the same shape
    if img1_gray.shape != img2_gray.shape:
        img2_gray = cv2.resize(img2_gray, (img1_gray.shape[1], img1_gray.shape[0]))

    return ssim(img1_gray, img2_gray)

def resize_image(image, scale_percent=50):
    width = int(image.shape[1] * scale_percent / 100)
    height = int(image.shape[0] * scale_percent / 100)
    dim = (width, height)
    return cv2.resize(image, dim, interpolation=cv2.INTER_AREA)

def evaluate_file_similarity(example_file_path, learner_file_path):
    # Extract text (Optional, since we are focusing on visual analysis)
    example_file_text = extract_text_from_file(example_file_path)
    learner_file_text = extract_text_from_file(learner_file_path)

    # Convert files to images
    example_file_images = convert_file_to_images(example_file_path)
    learner_file_images = convert_file_to_images(learner_file_path)

    # Assume the first page/image is the most relevant for comparison
    example_image_np = np.array(example_file_images[0])
    learner_image_np = np.array(learner_file_images[0])

    # Resize the images
    example_image_resized = resize_image(example_image_np)
    learner_image_resized = resize_image(learner_image_np)

    # Compare the resized images
    similarity_index_resized = compare_images(example_image_resized, learner_image_resized)

    return similarity_index_resized

def evaluate_batch_similarity(example_file_path, learner_file_paths):
    # Store the results here
    batch_results = {}

    # Convert the example certificate to images
    example_file_images = convert_file_to_images(example_file_path)
    example_image_np = np.array(example_file_images[0])
    example_image_resized = resize_image(example_image_np)

    for learner_file_path in learner_file_paths:
        # Convert learner files to images
        learner_file_images = convert_file_to_images(learner_file_path)
        learner_image_np = np.array(learner_file_images[0])
        learner_image_resized = resize_image(learner_image_np)

        # Compare the resized images
        similarity_index_resized = compare_images(example_image_resized, learner_image_resized)

        # Store the result
        batch_results[learner_file_path] = similarity_index_resized

    return batch_results

def calculate_text_similarity(reference_text, comparison_text):
    if not comparison_text:
        return 0
    return fuzz.ratio(reference_text, comparison_text)

def calculate_text_similarity_key_phrases(reference_key_phrases, comparison_text):
    if not comparison_text:
        return 0

    score = 0
    for phrase in reference_key_phrases:
        if phrase.lower() in comparison_text.lower():  # Case insensitive check
            score += 1

    # Normalize by the total number of key phrases to get a similarity score between 0 and 100
    normalized_score = (score / len(reference_key_phrases)) * 100

    return normalized_score

def calculate_learner_id_similarity(comparison_text, learner_id):
    # Tokenize both strings into words
    comparison_words = set(comparison_text.lower().split())
    learner_id_words = set(learner_id.lower().split())

    # Calculate intersection and union of both sets
    intersection = len(comparison_words & learner_id_words)
    union = len(comparison_words | learner_id_words)

    # If union is zero, return 0% similarity
    if union == 0:
        return 0
    
    # Calculate Jaccard similarity and multiply by 100 for percentage
    similarity = (intersection / union) * 100

    return round(similarity, 2)  # Round to two decimal places

In [None]:
# This function takes a string cert_data_str which can be either a list-like object or a JSON-like string. It processes it to return a list.

def process_cert_data(cert_data_str):
    if isinstance(cert_data_str, list):
        if not cert_data_str:  # Skip if the list is empty
            return []
        return cert_data_str  # It's already a list
    else:
        if pd.isna(cert_data_str) or not cert_data_str:  # Check if it's NaN or an empty string
            return []

        try:
            # Parsing the string into a list of dictionaries
            return json.loads(cert_data_str.replace("'", '"'))
        except json.JSONDecodeError:
            return []


In [None]:
# This function looks into cache_df DataFrame to find if a cached similarity exists for a given cache_key. If it does, it returns those values; otherwise, it calculates new ones and updates the cache.

def fetch_cached_similarity(cache_df, cache_key, example_cert_path, file_path, learner_id):
    cached_row = cache_df[cache_df['Cache Key'] == cache_key]
    
    if cached_row.empty:
        # Assuming functions evaluate_file_similarity, calculate_text_similarity,
        # and calculate_learner_id_similarity are defined elsewhere
        visual_similarity = evaluate_file_similarity(example_cert_path, file_path)
        text_similarity = calculate_text_similarity(example_cert_path, file_path)
        learner_id_similarity = calculate_learner_id_similarity(learner_id, file_path)

        new_cache_row = pd.DataFrame({
            'Cache Key': [cache_key],
            'Visual Similarity': [visual_similarity],
            'Text Similarity': [text_similarity],
            'Learner ID Similarity': [learner_id_similarity]
        })

        # Check for NA entries before concatenating
        if new_cache_row.dropna().empty:
            print("new_cache_row contains only NA entries; skipping concatenation.")
        else:
            cache_df = pd.concat([cache_df, new_cache_row], ignore_index=True)

        return visual_similarity, text_similarity, learner_id_similarity, cache_df
    else:
        visual_similarity = cached_row['Visual Similarity'].iloc[0]
        text_similarity = cached_row['Text Similarity'].iloc[0]
        learner_id_similarity = cached_row['Learner ID Similarity'].iloc[0]

        return visual_similarity, text_similarity, learner_id_similarity, cache_df



In [None]:
# Clean file paths
def sanitize_path(file_path):
    return os.path.normpath(file_path.replace("\\", "/"))

# Check file paths exist
def check_file_exists(file_path):
    return os.path.exists(file_path)

# Log Errors with extra info
def log_error(learner_id, certificate_column, file_name, error_msg, extra_info=None):
    new_row = pd.DataFrame({
        'Learner ID': [learner_id],
        'Certificate Type': [certificate_column],
        'File Name': [file_name],
        'Status': ['Failure'],
        'Highest Visual Similarity': [None],
        'Text Similarity Score': [None],
        'Learner ID Similarity Score': [None],
        'Errors': [error_msg],
        'Extra Info': [extra_info]  # Add this line
    })
    return new_row



In [None]:
# Function to save log DataFrame to CSV
def save_log_to_csv(df, path):
    df.to_csv(path, index=False)

# Function to load log DataFrame from CSV
def load_log_from_csv(path):
    if os.path.exists(path):
        return pd.read_csv(path)
    else:
        return pd.DataFrame(columns=['Learner ID', 'Certificate Type', 'File Name', 'Status', 'Highest Visual Similarity', 'Text Similarity Score', 'Learner ID Similarity Score', 'Errors', 'Extra Info'])

In [None]:
# Main Loop to evaluate the learners uploaded images against the example certificates
if __name__ == "__main__":

    # Load existing log_df from CSV if it exists
    log_csv_path = '/Users/gilbert/Downloads/log_df.csv'
    log_df = load_log_from_csv(log_csv_path)

    processed_entries = set(log_df.apply(lambda row: (row['Learner ID'], row['Certificate Type'], row['File Name']), axis=1))
    
    row_limit = 10000  # Define the row limit for testing

    row_counter = 0  # Initialize a counter

    columns_to_process = certificate_names  # Columns to process

    for idx, row in df_learners_selected.iterrows():

        if row_counter >= row_limit:
            print(f"Reached row limit of {row_limit}. Stopping processing.")
            break

        learner_id = row['Learner ID']

        log_rows = []  # Initialize an empty list to collect new log rows for this learner

        for certificate_column in columns_to_process:

            cert_data_list = row[certificate_column]

            if not isinstance(cert_data_list, list):
                continue

            for file_name in cert_data_list:

                entry = (learner_id, certificate_column, file_name)
                
                if entry in processed_entries:
                    print(f"Skipping already processed entry: {entry}")
                    continue
                
                file_path = os.path.join(attachments_dir, certificate_column, file_name)
                file_path = sanitize_path(file_path)  # Sanitize the path

                if not check_file_exists(file_path):  # Check if file exists
                    new_row = log_error(learner_id, certificate_column, file_name, f"File not found: {file_path}").iloc[0].to_dict()
                    log_rows.append(new_row)
                    continue  # Skip this file and move to the next

                example_cert_paths = cert_name_to_example_paths.get(certificate_column, [])

                for example_cert_path in example_cert_paths:

                    try:
                        cache_key = f"{file_path}-{example_cert_path}"
                        visual_similarity, text_similarity, learner_id_similarity, cache_df = fetch_cached_similarity(
                            cache_df, cache_key, example_cert_path, file_path, learner_id)

                        new_row = {
                            'Learner ID': learner_id,
                            'Certificate Type': certificate_column,
                            'File Name': file_name,
                            'Status': 'Success',
                            'Highest Visual Similarity': visual_similarity,
                            'Text Similarity Score': text_similarity,
                            'Learner ID Similarity Score': learner_id_similarity,
                            'Errors': None  # No errors, so None
                        }
                        log_rows.append(new_row)

                    except FileNotFoundError:
                        error_msg = f"File not found: {file_path} or {example_cert_path}"
                        extra_info = f"Current working directory: {os.getcwd()}, File Size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'}"
                        new_row = log_error(learner_id, certificate_column, file_name, error_msg, extra_info).iloc[0].to_dict()
                        log_rows.append(new_row)

                    except Exception as e:
                        error_msg = f"An exception occurred: {e}"
                        extra_info = f"Exception type: {type(e).__name__}, Arguments: {e.args}"
                        new_row = log_error(learner_id, certificate_column, file_name, error_msg, extra_info).iloc[0].to_dict()
                        log_rows.append(new_row)

                # Increment the counter at the end of processing each row
                row_counter += 1

        # Save log_df to CSV after each learner is processed
        log_df = pd.concat([log_df, pd.DataFrame(log_rows)], ignore_index=True)
        save_log_to_csv(log_df, log_csv_path)

        # Add the processed learner to the set
        processed_entries.add(entry)


In [None]:
log_df.info()

In [None]:
log_df['Learner ID Similarity Score'].value_counts()

In [None]:
log_df[log_df['Learner ID Similarity Score']>0]

In [None]:
log_df

In [None]:
# Save log_df as csv
log_df.to_csv('/Users/gilbert/Downloads/log_df_backup2.csv', index=False)

In [None]:
# Upload log_df if you want to use a saved copy. Usually you would generate it in the Main Function.
# log_df = pd.read_csv('/Users/gilbert/Downloads/log_df.csv')

In [None]:
log_df.info()

In [None]:
log_df.head()

In [None]:
# Set the display option to avoid truncation of text in the Errors column
pd.set_option('display.max_colwidth', None)

# Now print the 'Errors' column
log_df[log_df['Errors'].notna()]

In [None]:
# Copy the error files for inspection - FILES COPIED SEEM FINE? I think there an error in Errors logging

# Create error directory if it doesn't exist
error_dir = os.path.join(attachments_dir, 'errors')
if not os.path.exists(error_dir):
    os.makedirs(error_dir)

# Copy files that produced errors
for idx, row in log_df[log_df['Errors'].notna()].iterrows():
    src_path = os.path.join(attachments_dir, row['Certificate Type'], row['File Name'])
    dest_file_name = f"{row['Learner ID']} - {row['Certificate Type']} - {row['File Name']}"
    dest_path = os.path.join(error_dir, dest_file_name)

    if os.path.exists(src_path):
        shutil.copy(src_path, dest_path)

In [None]:
# Convert 'Text Similarity Score' to numeric, setting errors='coerce' will turn unconvertible values to NaN
log_df['Text Similarity Score'] = pd.to_numeric(log_df['Text Similarity Score'], errors='coerce')

# Convert 'Learner ID Similarity Score' to numeric, setting errors='coerce' will turn unconvertible values to NaN
log_df['Learner ID Similarity Score'] = pd.to_numeric(log_df['Learner ID Similarity Score'], errors='coerce')

In [None]:
log_df.columns

In [None]:
log_df['Status'].value_counts()

In [None]:
# Check scores

# Columns to analyze
columns_to_analyze = ['Highest Visual Similarity', 'Text Similarity Score', 'Learner ID Similarity Score']

for col in columns_to_analyze:
    filled_df = log_df.fillna({col: 0})
    global_min = filled_df[col].min()
    global_max = filled_df[col].max()

    # Create the grid of subplots
    g = sns.FacetGrid(log_df, col='Certificate Type', col_wrap=3, height=4, aspect=1.2)
    g = (g.map(plt.hist, col, bins=20, edgecolor='black', range=[global_min, global_max])
         .set_axis_labels(col, 'Frequency')
         .set_titles("Certificate Type: {col_name}"))

    # Adjust layout for better spacing
    g.fig.subplots_adjust(top=0.9, bottom=0.1, wspace=0.2, hspace=0.4)
    g.fig.suptitle(f'{col} by Certificate Type', fontsize=16)

    plt.show()


In [None]:
log_df.info()

In [None]:
log_df[['Highest Visual Similarity', 'Text Similarity Score', 'Learner ID Similarity Score']].describe()


In [None]:
log_df[log_df['Status'] != 'Success']

In [None]:
log_df[log_df['Status'] != 'Success']['Errors'].value_counts()

In [None]:
log_df.info()

In [None]:
# Pivot data

# Group by 'Learner ID' and 'Certificate Type' and take the max value for each group
grouped_df = log_df[['Learner ID', 'Certificate Type', 'Highest Visual Similarity', 'Text Similarity Score', 'Learner ID Similarity Score']].groupby(['Learner ID', 'Certificate Type']).max().reset_index()

# Reshape the DataFrame
pivot_df = pd.pivot_table(grouped_df, index='Learner ID', columns='Certificate Type',
                          values=['Highest Visual Similarity', 'Text Similarity Score', 'Learner ID Similarity Score'],
                          aggfunc='first')

# Flatten the multi-level column names and add custom label
pivot_df.columns = [f"{col[1]}_{col[0]}" for col in pivot_df.columns.values]

# Reset index
pivot_df.reset_index(inplace=True)

# Sort columns
sorted_cols = ['Learner ID'] + sorted([col for col in pivot_df.columns if col != 'Learner ID'])
pivot_df = pivot_df[sorted_cols]

pivot_df

In [None]:
# Save to a CSV
pivot_df.to_csv("pivot_backup2.csv", index=False)

In [None]:
pivot_df.head()

In [None]:
pivot_df.describe()

In [None]:
# Calculate thresholds for accepting / rejecting certificates

# Initialize an empty DataFrame
calculated_thresholds_df = pd.DataFrame(columns=['Certificate Type', 'Visual_Threshold_Percentile', 'Visual_Threshold_Zscore', 'Text_Threshold_Percentile', 'Text_Threshold_Zscore', 'Learner_ID_Threshold_Percentile', 'Learner_ID_Threshold_Zscore'])

# Loop through each certificate type to calculate thresholds
for cert_type in certificate_names:
    visual_data = pivot_df[f"{cert_type}_Highest Visual Similarity"].dropna()
    text_data = pivot_df[f"{cert_type}_Text Similarity Score"].dropna()
    learner_id_data = pivot_df[f"{cert_type}_Learner ID Similarity Score"].dropna()

    # Calculate Percentile
    visual_threshold_percentile = visual_data.quantile(percentile_factor)
    text_threshold_percentile = text_data.quantile(percentile_factor)
    learner_id_threshold_percentile = learner_id_data.quantile(percentile_factor)

    # Calculate Z-score thresholds
    visual_threshold_zscore = visual_data.mean() + z_score_factor * visual_data.std()
    text_threshold_zscore = text_data.mean() + z_score_factor * text_data.std()
    learner_id_threshold_zscore = learner_id_data.mean() + z_score_factor * learner_id_data.std()

    # Create a DataFrame for the current certificate type
    temp_df = pd.DataFrame({
        'Certificate Type': [cert_type],
        'Visual_Threshold_Percentile': [visual_threshold_percentile],
        'Visual_Threshold_Zscore': [visual_threshold_zscore],
        'Text_Threshold_Percentile': [text_threshold_percentile],
        'Text_Threshold_Zscore': [text_threshold_zscore],
        'Learner_ID_Threshold_Percentile': [learner_id_threshold_percentile],
        'Learner_ID_Threshold_Zscore': [learner_id_threshold_zscore]
    })

    # Concatenate to the existing DataFrame
    calculated_thresholds_df = pd.concat([calculated_thresholds_df, temp_df], ignore_index=True)

# Show the DataFrame with calculated thresholds
calculated_thresholds_df




In [None]:
# Initialize an empty dictionary to store dynamic thresholds
thresholds = {}

# Loop through the DataFrame to populate the thresholds dictionary
for index, row in calculated_thresholds_df.iterrows():
    cert_type = row['Certificate Type']

    thresholds[cert_type] = {
        'visual': row['Visual_Threshold_Percentile'],  # Replace with row['Visual_Threshold_Zscore'] if you want to use Z-score
        'text': row['Text_Threshold_Percentile'],  # Replace with row['Text_Threshold_Zscore'] if you want to use Z-score
        'learner_id': row['Learner_ID_Threshold_Percentile']  # Replace with row['Learner_ID_Threshold_Zscore'] if you want to use Z-score
    }

# Now the thresholds dictionary is dynamically populated based on calculated_thresholds_df
thresholds


In [None]:
# Manually input thresholds
thresholds = {
'Boost Your Career with SAP Skills My Learning screenshot': {'visual': 0.50,'text': 50.0,'learner_id': 0.0},
'Learn how to learn (Google)': {'visual': 0.50,'text': 50.0,'learner_id': 0.0},
'Business Communication (Google)': {'visual': 0.50,'text': 50.0,'learner_id': 0.0},
'Communicate your ideas through storytelling and design (Google)': {'visual': 0.50,'text': 50.0, 'learner_id': 0.0},
'Get started with Microsoft Teams': {'visual': 0.50,'text': 50.0,'learner_id': 0.0},
'Tech for Good: The Role of ICT in Achieving the SDGs': {'visual': 0.45,'text': 50.0,'learner_id': 0.0}
}

In [None]:
# Evaluate images based on thresholds

# Create new columns based on certificate-specific thresholds and reasons for rejection
for cert_type in thresholds.keys():
    visual_threshold = thresholds[cert_type]['visual']
    text_threshold = thresholds[cert_type]['text']
    learner_id_threshold = thresholds[cert_type]['learner_id']

    col_suffix = f"{cert_type}_AcceptOrReject"
    reason_suffix = f"{cert_type}_RejectionReasons"

    # Create a series with rejection reasons
    def find_rejection_reasons(row):
        reasons = []
        if row.get(f"{cert_type}_Highest Visual Similarity", 0) < visual_threshold:
            reasons.append('Visual')
        if row.get(f"{cert_type}_Text Similarity Score", 0) < text_threshold:
            reasons.append('Text')
        if row.get(f"{cert_type}_Learner ID Similarity Score", 0) < learner_id_threshold:
            reasons.append('ID')
        return reasons

    # Create the AcceptOrReject and RejectionReasons columns
    pivot_df.loc[:, col_suffix] = (
        (pivot_df.get(f"{cert_type}_Highest Visual Similarity", 0) >= visual_threshold) &
        (pivot_df.get(f"{cert_type}_Text Similarity Score", 0) >= text_threshold) &
        (pivot_df.get(f"{cert_type}_Learner ID Similarity Score", 0) >= learner_id_threshold)
    ).astype(int)

    pivot_df[reason_suffix] = pivot_df.apply(find_rejection_reasons, axis=1)

# Initialize an empty list to store the new order of columns
new_column_order = ['Learner ID']  # Start with 'Learner ID'

# Loop through each certificate type and arrange the columns
for cert_type in thresholds.keys():
    new_column_order.extend([
        f"{cert_type}_Highest Visual Similarity",
        f"{cert_type}_Text Similarity Score",
        f"{cert_type}_Learner ID Similarity Score",
        f"{cert_type}_AcceptOrReject",
        f"{cert_type}_RejectionReasons"  # Include the new RejectionReasons column
    ])

# Reorder the DataFrame columns based on the new list
pivot_df = pivot_df[new_column_order]

# Show the DataFrame with reordered columns
pivot_df


In [None]:
# Extract all columns that contain "AcceptOrReject"
accept_or_reject_cols = [col for col in pivot_df.columns if "AcceptOrReject" in col]

# Compute the minimum value along the rows for the AcceptOrReject columns
minimums = pivot_df[accept_or_reject_cols].min(axis=1)

# Create the 'Learner Status' column
# If minimum is 0, set 'Learner Status' to 0, else set to 1
pivot_df['Learner Status'] = (minimums != 0).astype(int)

# View the updated DataFrame
pivot_df[['Learner ID','Learner Status'] + accept_or_reject_cols]


In [None]:
# Add an explanation by identifying which certificates were rejected

# Extract all columns that contain "AcceptOrReject"
accept_or_reject_cols = [col for col in pivot_df.columns if "AcceptOrReject" in col]

# Compute the minimum value along the rows for the AcceptOrReject columns
minimums = pivot_df[accept_or_reject_cols].min(axis=1)

# Create the 'Learner Status' column
# If minimum is 0, set 'Learner Status' to 0, else set to 1
pivot_df['Learner Status'] = (minimums != 0).astype(int)

# Create a new column to store the names of the AcceptOrReject columns that have 0 values
def find_zero_cols(row):
    zero_cols = [col.replace("_AcceptOrReject", "") for col in accept_or_reject_cols if row[col] == 0]
    return zero_cols  # Returning a list instead of a string

pivot_df['Rejected Certificates'] = pivot_df.apply(find_zero_cols, axis=1)

# View the updated DataFrame
pivot_df[['Learner ID', 'Learner Status', 'Rejected Certificates'] + accept_or_reject_cols]

In [None]:
pivot_df[pivot_df['Learner Status']==0]

In [None]:
pivot_df[pivot_df['Learner Status']==0].count()

In [None]:
pivot_df.head()

In [None]:
# Merge pivot_df and df_learners_selected to compare scores with screenshots

# Merge pivot_df and df_learners_selected based on 'Learner ID', keeping only the rows in pivot_df
df_learners_evaluated = pd.merge(pivot_df, df_learners_selected, on='Learner ID', how='left')

# Show the resulting DataFrame
df_learners_evaluated


In [None]:
df_learners_evaluated.columns

In [None]:
df_learners_evaluated.to_csv('/Users/gilbert/Downloads/learners_evaluated_2.csv', index=False)

In [None]:
# Inspect failed learners with histograms

# Filter the DataFrame where 'Learner Status' equals 0
filtered_df = df_learners_evaluated[
    (df_learners_evaluated['Learner Status'] == 0) &
    (df_learners_evaluated['Rejected Certificates'] == 'Tech for Good: The Role of ICT in Achieving the SDGs')
]

# Columns to create histograms for
columns_to_plot = [
    'Tech for Good: The Role of ICT in Achieving the SDGs_Highest Visual Similarity',
    'Tech for Good: The Role of ICT in Achieving the SDGs_Text Similarity Score',
    'Tech for Good: The Role of ICT in Achieving the SDGs_Learner ID Similarity Score'
]

# Create histograms
for col in columns_to_plot:
    sns.histplot(filtered_df[col], kde=False, bins=30)  # You can customize bins and other style options here
    plt.title(f"Histogram for {col}")
    plt.xlabel(col)
    plt.ylabel('Frequency')
    plt.show()

In [None]:
print(df_learners_evaluated[df_learners_evaluated['Learner Status'] == 0]['Rejected Certificates'])

In [None]:
df_learners_evaluated[(df_learners_evaluated['Learner Status'] == 0) & df_learners_evaluated['Rejected Certificates'].isna()]


In [None]:
# Retrieve learner screenshots to inspect by cert type

import math

destination_base_folder = '/Users/gilbert/Downloads/check_temp'

if not os.path.exists(destination_base_folder):
    os.makedirs(destination_base_folder)

for index, row in df_learners_evaluated.iterrows():
    if row['Learner Status'] == 0:
        learner_id = row['Learner ID']
        rejected_certs = row['Rejected Certificates']
        
        for cert_col in rejected_certs:
            destination_folder = os.path.join(destination_base_folder, cert_col)
            if not os.path.exists(destination_folder):
                os.makedirs(destination_folder)

            cert_names = row[cert_col]
            if not isinstance(cert_names, list):
                cert_names = [certificate_names]

                print(f"cert_names before loop: {cert_names}")

                for cert_name in cert_names:

                    if cert_name is None or (isinstance(cert_name, float) and math.isnan(cert_name)):
                        print(f"Skipping invalid cert_name: {cert_name}")
                        continue
        
                    source_folder = f"/Users/gilbert/Downloads/content/attachments/{cert_col}"
                    source_file_path = os.path.join(source_folder, str(cert_name))  # Convert to str just in case

                    print(f"Checking existence of {source_file_path}")  # Debug print
                    
                    if os.path.exists(source_file_path):
                        file_extension = os.path.splitext(cert_name)[1]
                        new_file_name = f"{learner_id}{file_extension}"
                        destination_file_path = os.path.join(destination_folder, new_file_name)
    
                        print(f"Copying {source_file_path} to {destination_file_path}")  # Debug print
    
                        shutil.copy(source_file_path, destination_file_path)
                    else:
                        print(f"File {source_file_path} does not exist.")  # Debug print

In [None]:
# Retrieve learner screenshots to inspect - by learner
destination_base_folder = '/Users/gilbert/Downloads/check_temp'

if not os.path.exists(destination_base_folder):
    os.makedirs(destination_base_folder)

for index, row in df_learners_evaluated.iterrows():
    if row['Learner Status'] == 0:
        learner_id = row['Learner ID']
        folder_name = learner_id

        destination_folder = os.path.join(destination_base_folder, folder_name)
        if not os.path.exists(destination_folder):
            os.makedirs(destination_folder)

        rejected_certs = row['Rejected Certificates']
        
        for cert_col in rejected_certs:
            cert_names = row[cert_col]
            if not isinstance(cert_names, list):
                cert_names = [cert_names]

            for cert_name in cert_names:
                source_folder = f"/Users/gilbert/Downloads/content/attachments/{cert_col}"
                source_file_path = os.path.join(source_folder, cert_name)

                print(f"Checking existence of {source_file_path}")  # Debug print
                
                if os.path.exists(source_file_path):
                    file_extension = os.path.splitext(cert_name)[1]
                    new_file_name = f"{cert_col}{file_extension}"
                    destination_file_path = os.path.join(destination_folder, new_file_name)

                    print(f"Copying {source_file_path} to {destination_file_path}")  # Debug print

                    shutil.copy(source_file_path, destination_file_path)
                else:
                    print(f"File {source_file_path} does not exist.")  # Debug print

In [None]:
df_learners_evaluated.columns

In [None]:
# Inspect a learner

# Select a specific learner based on 'Learner ID'
specific_learner_df = df_learners_evaluated[df_learners_evaluated['Learner ID'] == 'Olawale  Raheem  - olawalehafees13@gmail.com']

# Identify columns that end with '_RejectionReasons'
rejection_reason_cols = [col for col in df_learners_evaluated.columns if col.endswith('_RejectionReasons')]

# Create a list with 'Rejected Certificates' and all columns ending with '_RejectionReasons'
columns_to_show = ['Rejected Certificates'] + rejection_reason_cols

# Filter the DataFrame based on these columns
filtered_learner_df = specific_learner_df[columns_to_show]

# Display the result
filtered_learner_df



In [None]:
# Create a zip file from the folder
shutil.make_archive('/content/check_temp', 'zip', '/content/check_temp')

# Trigger the download
files.download('/content/check_temp.zip')


In [None]:
# Upload or specify the path to the new example certificate
new_example_cert_path = '/content/temp_downloads/Ajibade_Business_Communication.PNG'

In [None]:
# Take a batch of XX rows from final_log_df
batch_log_df = final_log_df.head(100).copy()

# Initialize a new column for the visual similarity to the new example
# and another for the file paths
batch_log_df['New_Example_Visual_Similarity'] = None
batch_log_df['File Path'] = None  # New column for file paths

for idx, row in batch_log_df.iterrows():
    learner_id = row['Learner ID']
    file_name = row['File Name']
    learner_file_path = os.path.join("temp_downloads", file_name)  # Assuming files are in temp_downloads

    # Populate the File Path column
    batch_log_df.loc[idx, 'File Path'] = learner_file_path

    try:
        # Evaluate similarity using the new example certificate
        similarity_score = evaluate_file_similarity(new_example_cert_path, learner_file_path)

        # Update the DataFrame with the new similarity score
        batch_log_df.loc[idx, 'New_Example_Visual_Similarity'] = similarity_score

        # Optionally log success
        print(f"Processed Learner ID: {learner_id}, File: {file_name}")
    except Exception as e:
        # Log the exception
        batch_log_df.loc[idx, 'New_Example_Visual_Similarity'] = f"Failed - {str(e)}"
        print(f"Failed for Learner ID: {learner_id}, File: {file_name}. Reason: {str(e)}")

# Convert the file paths into clickable URLs
batch_log_df['File URL'] = 'file://' + batch_log_df['File Path'].astype(str)

# Merge the batch log back into the final log
# final_log_df = pd.merge(final_log_df, batch_log_df, on=['Learner ID', 'File Name'], how='left')

In [None]:
batch_log_df.info()

In [None]:
print(batch_log_df['Status'].value_counts())


In [None]:
batch_log_df['Highest Visual Similarity'].hist()

In [None]:
batch_log_df['New_Example_Visual_Similarity'] = pd.to_numeric(batch_log_df['New_Example_Visual_Similarity'], errors='coerce')

In [None]:
batch_log_df['New_Example_Visual_Similarity'].hist()

In [None]:
# Save to a CSV
batch_log_df.to_csv("batch_log.csv", index=False)

In [None]:
import cv2
import numpy as np

def calculate_histogram(image_path):
    image = cv2.imread(image_path)
    hist = cv2.calcHist([image], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    hist = cv2.normalize(hist, hist).flatten()
    return hist

def compare_histograms(hist1, hist2):
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)


In [None]:
import os
from itertools import combinations

# Function to evaluate the similarity between two images
def evaluate_file_similarity(example_file_path, learner_file_path):
    example_hist = calculate_histogram(example_file_path)
    learner_hist = calculate_histogram(learner_file_path)
    similarity = compare_histograms(example_hist, learner_hist)
    return similarity

# Function to find the local winner from a group of images
def find_local_winner(image_group):
    max_similarity = 0
    winner = None

    for img1, img2 in combinations(image_group, 2):
        similarity = evaluate_file_similarity(img1, img2)

        if similarity > max_similarity:
            max_similarity = similarity
            winner = img1  # or img2 depending on your criteria

    return winner

# Function to find the global winner from local winners
def find_global_winner(local_winners):
    return find_local_winner(local_winners)


In [None]:
def estimate_tournament_runtime(folder_path, group_size):
    # Get the number of image files in the folder
    image_files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    num_images = len(image_files)

    # Calculate the number of groups
    num_groups = (num_images + group_size - 1) // group_size  # Ceiling division

    # Estimate the number of comparisons for local winners
    local_comparisons = (group_size * (group_size - 1)) // 2
    total_local_comparisons = local_comparisons * num_groups

    # Estimate the number of comparisons for the global winner
    global_comparisons = (num_groups * (num_groups - 1)) // 2

    # Total number of comparisons
    total_comparisons = total_local_comparisons + global_comparisons

    # Estimate the size of one image (use the first image as a sample)
    sample_image_path = os.path.join(folder_path, image_files[0])
    sample_image_size = os.path.getsize(sample_image_path)  # in bytes

    # Estimate total size for all images
    total_size = sample_image_size * num_images  # in bytes

    # Estimate runtime (this is a very rough estimate; actual time can vary)
    # Assume 0.001 second per comparison per 1000 bytes (this is a made-up number for demonstration)
    estimated_time = 0.001 * total_comparisons * (total_size / 1000)  # in seconds

    return estimated_time, num_images, total_comparisons, total_size

# Usage
folder_path = "/content/temp_downloads"
group_size = 50
estimated_time, num_images, total_comparisons, total_size = estimate_tournament_runtime(folder_path, group_size)
print(f"Estimated time: {estimated_time} seconds")
print(f"Number of images: {num_images}")
print(f"Total number of comparisons: {total_comparisons}")
print(f"Total size: {total_size} bytes")



In [None]:
# List all image files in the folder
all_image_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
group_size = 15  # or any other number depending on your needs

# Step 1: Divide into groups
image_groups = [all_image_paths[i:i + group_size] for i in range(0, len(all_image_paths), group_size)]

# Step 2: Find local winners (can be parallelized)
local_winners = [find_local_winner(group) for group in image_groups]

# Step 3: Find global winner
global_winner = find_global_winner(local_winners)

print(f"The most visually similar image is: {global_winner}")


In [None]:
# Assuming you want to keep the "_x" columns and drop the "_y" columns
cols_to_keep = [col for col in final_log_df.columns if not col.endswith('_y')]

# Remove "_x" suffix from column names
final_log_df = final_log_df[cols_to_keep]
final_log_df.columns = [col.replace('_x', '') for col in cols_to_keep]


In [None]:
# Dropping all columns ending with "_y"
cols_to_drop = [col for col in final_log_df.columns if col.endswith('_y')]
final_log_df.drop(columns=cols_to_drop, inplace=True)

# Removing "_x" suffix from column names
final_log_df.columns = [col.replace('_x', '') for col in final_log_df.columns]


In [None]:
final_log_df.drop(columns='New_Example_Visual_Similarity', inplace=True)
final_log_df

In [None]:
# Replace these paths with the actual file paths
new_example_cert_path = "new_example_cert.png"
learner_cert_path = "/content/temp_downloads/Ajibade_Business_Communication.PNG"

# Manually evaluate file similarity
similarity_score = evaluate_file_similarity(new_example_cert_path, learner_cert_path)

print(f"The visual similarity score between the two files is: {similarity_score}")


In [None]:
# OLD Main Function

if __name__ == "__main__":

    columns_to_process = [
        'Boost Your Career with SAP Skills My Learning screenshot',
        'Learn how to learn (Google)',
        'Business Communication (Google)',
        'Communicate your ideas through storytelling and design (Google)',
        'Get started with Microsoft Teams',
        'Tech for Good: The Role of ICT in Achieving the SDGs'
    ]

    for idx, row in df_learners_selected.iterrows():

        if idx == 30:
            break

        learner_id = row['Learner ID']

        for certificate_column in columns_to_process:

            cert_data_list = row[certificate_column]

            # Ensure it is a list, if not continue to next iteration
            if not isinstance(cert_data_list, list):
                continue

            for file_name in cert_data_list:
                file_path = os.path.join(attachments_dir, file_name)

                example_cert_paths = cert_name_to_example_paths.get(certificate_column, [])

                for example_cert_path in example_cert_paths:

                    cache_key = f"{file_path}-{example_cert_path}"

                    visual_similarity, text_similarity, learner_id_similarity, cache_df = fetch_cached_similarity(
                        cache_df, cache_key, example_cert_path, file_path, learner_id)

                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'Certificate Type': [certificate_column],
                        'File Name': [file_name],
                        'Status': ['Success'],
                        'Highest Visual Similarity': [visual_similarity],
                        'Text Similarity Score': [text_similarity],
                        'Learner ID Similarity Score': [learner_id_similarity]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

In [None]:
# Main Function or Script OLD

# Initialize an empty DataFrame for logging results and errors
log_df = pd.DataFrame(columns=['Learner ID', 'File Name', 'Status', 'Highest Visual Similarity'])

if __name__ == "__main__":

    # Define a batch size
    batch_size = 50  # Change this value based on your requirements

    # Calculate the number of batches
    num_batches = len(df_learners_selected) // batch_size + (len(df_learners_selected) % batch_size != 0)

    # Initialize log DataFrame
    log_df = pd.DataFrame(columns=['Learner ID', 'File Name', 'Status', 'Highest Visual Similarity','Template Match'])

    for batch_num in range(num_batches):
        start_idx = batch_num * batch_size
        end_idx = (batch_num + 1) * batch_size
        batch_df = df_learners_selected.iloc[start_idx:end_idx]

    for idx, row in df_learners_selected.iterrows():
        learner_id = row['Learner ID']
        file_names = row['File Names_Business Communication']
        urls = row['URLs_Business Communication']
        downloaded_files = []

        # Create the directory if it doesn't exist
        download_folder = 'temp_downloads'
        if not os.path.exists(download_folder):
            os.makedirs(download_folder)

        for file_name, url in zip(file_names, urls):
            temp_file_path = os.path.join(download_folder, file_name)

            if download_files:
                # Download the file only if download_files flag is True
                download_file(url, temp_file_path)

            try:
                if os.path.exists(temp_file_path):
                    downloaded_files.append(temp_file_path)
                   # Evaluate similarity
                    visual_similarity, template_matching_score = evaluate_file_similarity('example_cert.pdf', temp_file_path, template_cert_image) ## define example cert variable

                    # You can now log or further process the results
                    highest_similarity = max(similarity_scores.values())

                    # Log success
                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'File Name': [file_name],
                        'Status': ['Success'],
                        'Highest Visual Similarity': [highest_similarity],
                        'Template Match': [template_matching_score]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

            except Exception as e:
                # Log the exception
                new_row = pd.DataFrame({
                    'Learner ID': [learner_id],
                    'File Name': [file_name],
                    'Status': [f'Failed - {str(e)}'],
                    'Highest Visual Similarity': [None]
                })
                log_df = pd.concat([log_df, new_row], ignore_index=True)

    # Save log_df to a CSV
    log_df.to_csv("log.csv", index=False)


In [None]:
# OLD Main Function

if __name__ == "__main__":

    # Initialize an empty list to hold errors for logging later
    error_logs = []

    columns_to_process = [
        'Boost Your Career with SAP Skills My Learning screenshot',
        'Learn how to learn (Google)',
        'Business Communication (Google)',
        'Communicate your ideas through storytelling and design (Google)',
        'Get started with Microsoft Teams',
        'Tech for Good: The Role of ICT in Achieving the SDGs'
    ]

for idx, row in df_learners_selected.iterrows():

        if idx == 10:
            break

        learner_id = row['Learner ID']

        for certificate_column in columns_to_process:

            cert_data_list = row[certificate_column]

            # Ensure it is a list, if not continue to next iteration
            if not isinstance(cert_data_list, list):
                continue

            for file_name in cert_data_list:
                # Updated line here to include course name
                file_path = os.path.join(attachments_dir, certificate_column, file_name)

                example_cert_paths = cert_name_to_example_paths.get(certificate_column, [])

                for example_cert_path in example_cert_paths:

                    try:
                        # Your existing code that may throw a FileNotFoundError
                        cache_key = f"{file_path}-{example_cert_path}"
                        visual_similarity, text_similarity, learner_id_similarity, cache_df = fetch_cached_similarity(
                            cache_df, cache_key, example_cert_path, file_path, learner_id)

                        new_row = pd.DataFrame({
                            'Learner ID': [learner_id],
                            'Certificate Type': [certificate_column],
                            'File Name': [file_name],
                            'Status': ['Success'],
                            'Highest Visual Similarity': [visual_similarity],
                            'Text Similarity Score': [text_similarity],
                            'Learner ID Similarity Score': [learner_id_similarity]
                        })
                        log_df = pd.concat([log_df, new_row], ignore_index=True)

                    except FileNotFoundError:
                        error_msg = f"File not found: {file_path} or {example_cert_path}"
                        print(error_msg)
                        error_logs.append(error_msg)
                        continue
                    except Exception as e:  # New block to catch other exceptions
                        error_msg = f"An exception occurred: {e}"
                        print(error_msg)
                        error_logs.append(error_msg)
                        continue

                # Print or save error_logs as needed
                if error_logs:
                    print("The following errors were encountered:")
                    for error in error_logs:
                        print(error)

In [None]:
# Old Main Fuinction with caching

# Initialize an empty DataFrame for logging results and errors
log_df = pd.DataFrame(columns=['Learner ID',
                               'Certificate Type',
                               'File Name',
                               'Status',
                               'Highest Visual Similarity',
                               'Text Similarity Score',
                               'Learner ID Similarity Score'
                               ])

# Initialize a DataFrame for caching
cache_df = pd.DataFrame(columns=['Cache Key', 'Visual Similarity', 'Text Similarity', 'Learner ID Similarity'])

if __name__ == "__main__":

    for idx, row in df_learners_selected.iterrows():

        if idx == 30:  # Stop after processing 30 rows
            break

        learner_id = row['Learner ID']

        for certificate_column in certificate_names:

            cert_data = row[certificate_column]
            if pd.isna(cert_data):  # Skip if this cell is NaN
                continue

            file_name = extract_file_name(cert_data)  # Assuming this function is defined elsewhere
            file_path = os.path.join(attachments_dir, file_name)

            example_cert_paths = cert_name_to_example_paths.get(certificate_column, [])
            if not example_cert_paths:
                continue

            for example_cert_path in example_cert_paths:

                cache_key = f"{file_path}-{example_cert_path}"

                cached_row = cache_df[cache_df['Cache Key'] == cache_key]

                if cached_row.empty:
                    visual_similarity = evaluate_file_similarity(example_cert_path, file_path)
                    text_similarity = calculate_text_similarity(example_cert_path, file_path)
                    learner_id_similarity = calculate_learner_id_similarity(learner_id, file_path)

                    new_cache_row = pd.DataFrame({
                        'Cache Key': [cache_key],
                        'Visual Similarity': [visual_similarity],
                        'Text Similarity': [text_similarity],
                        'Learner ID Similarity': [learner_id_similarity]
                    })
                    cache_df = pd.concat([cache_df, new_cache_row], ignore_index=True)

                else:
                    visual_similarity = cached_row['Visual Similarity'].iloc[0]
                    text_similarity = cached_row['Text Similarity'].iloc[0]
                    learner_id_similarity = cached_row['Learner ID Similarity'].iloc[0]

                try:
                    visual_similarity = evaluate_file_similarity(example_cert_path, file_path)
                    text_similarity = calculate_text_similarity(example_cert_path, file_path)
                    learner_id_similarity = calculate_learner_id_similarity(learner_id, file_path)

                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'Certificate Type': [certificate_column],
                        'File Name': [file_name],
                        'Status': ['Success'],
                        'Highest Visual Similarity': [visual_similarity],
                        'Text Similarity Score': [text_similarity],
                        'Learner ID Similarity Score': [learner_id_similarity]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

                except Exception as e:
                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'Certificate Type': [certificate_column],
                        'File Name': [file_name],
                        'Status': [f'Failed - {str(e)}'],
                        'Highest Visual Similarity': [None],
                        'Text Similarity Score': [0],
                        'Learner ID Similarity Score': [0]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

    log_df.to_csv("log.csv", index=False)


In [None]:
# Old Main Function

# Initialize an empty DataFrame for logging results and errors
log_df = pd.DataFrame(columns=['Learner ID',
                               'Certificate Type',
                               'File Name',
                               'Status',
                               'Highest Visual Similarity',
                               'Text Similarity Score',
                               'Learner ID Similarity Score'
                               ])

if __name__ == "__main__":

    for idx, row in df_learners_selected.iterrows():

        # Break the loop after processing 30 rows
        if idx == 30:
            break

        learner_id = row['Learner ID']
        certificate_columns = certificate_names

        for certificate_column in certificate_columns:
            cert_data = row[certificate_column]
            if pd.isna(cert_data):  # Skip if this cell is NaN
                continue

            # Split the cell data into file names and URLs (Assuming function returns these)
            file_name, url = split_file_url_complex(cert_data)

            # Additional code logic for downloading, folder creation etc.
            # ...

            try:
                # Initialize max_similarity values
                max_visual_similarity = 0
                max_text_similarity = 0
                max_learner_id_similarity = 0

                # Create the directory if it doesn't exist
                download_folder = 'temp_downloads'
                if not os.path.exists(download_folder):
                    os.makedirs(download_folder)

                # Define the path where the file will be downloaded
                temp_file_path = os.path.join(download_folder, file_name)

                # Download the file
                download_file(url, temp_file_path)  # Assuming you have a function 'download_file' that takes a URL and a file path

                # Check if the example certificate path exists
                example_cert_path = cert_name_to_example_path.get(certificate_column, None)
                if example_cert_path is None:
                    continue  # Skip this iteration if we don't have an example

                # Log success with max_similarity values
                new_row = pd.DataFrame({
                    'Learner ID': [learner_id],
                    'Certificate Type': [certificate_column],
                    'File Name': [file_name],
                    'Status': ['Success'],
                    'Highest Visual Similarity': [max_visual_similarity],
                    'Text Similarity Score': [max_text_similarity],
                    'Learner ID Similarity Score': [max_learner_id_similarity]
                })
                log_df = pd.concat([log_df, new_row], ignore_index=True)

            except Exception as e:
                # Log the exception
                new_row = pd.DataFrame({
                    'Learner ID': [learner_id],
                    'Certificate Type': [certificate_column],
                    'File Name': [file_name],
                    'Status': [f'Failed - {str(e)}'],
                    'Highest Visual Similarity': [None],
                    'Text Similarity Score': [0],
                    'Learner ID Similarity Score': [0]
                })
                log_df = pd.concat([log_df, new_row], ignore_index=True)

        # Save log_df to a CSV after each learner for safety
        log_df.to_csv("log.csv", index=False)


In [None]:
# Older Main Function or Script

# Initialize an empty DataFrame for logging results and errors
log_df = pd.DataFrame(columns=['Learner ID',
                               'Certificate Type',
                               'File Name',
                               'Status',
                               'Highest Visual Similarity',
                               'Text Similarity Score',
                               'Learner ID Similarity Score'
                               ])

if __name__ == "__main__":

    # Define a batch size
    batch_size = 20  # Changed batch size to 20 for the test run

    # Calculate the number of batches
    num_batches = len(df_learners_selected) // batch_size + (len(df_learners_selected) % batch_size != 0)

    for batch_num in range(num_batches):
        start_idx = batch_num * batch_size
        end_idx = (batch_num + 1) * batch_size
        batch_df = df_learners_selected.iloc[start_idx:end_idx]

        # Added a break to run only the first batch as a test
        if batch_num == 50:
            break

        for certificate_name in certificate_names:
            col_file_names = f'File Names_{certificate_name}'
            col_urls = f'URLs_{certificate_name}'

            df_learners_selected[[col_file_names, col_urls]] = df_learners_selected[certificate_name].apply(split_file_url_complex)

            for idx, row in batch_df.iterrows():
                learner_id = row['Learner ID']
                cert_names = row['Certificate Names']  # This should contain the list of certificates for the learner
                file_names = row[col_file_names]
                urls = row[col_urls]
                downloaded_files = []

                # Create the directory if it doesn't exist
                download_folder = 'temp_downloads'
                if not os.path.exists(download_folder):
                    os.makedirs(download_folder)

                for cert_name, file_name, url in zip(cert_names, file_names, urls):
                    temp_file_path = os.path.join(download_folder, file_name)

                    if download_files:
                        # Download the file only if download_files flag is True
                        download_file(url, temp_file_path)

                    # Get the corresponding example certificate path using the mapping
                    example_cert_path = cert_name_to_example_path.get(cert_name, None)
                    if example_cert_path is None:
                        continue  # Skip this iteration if we don't have an example cert for this certificate name


                    try:
                        if os.path.exists(temp_file_path):
                            downloaded_files.append(temp_file_path)

                            # Loop through all example paths for this certificate type
                            for example_cert_path in cert_name_to_example_paths.get(cert_name, []):

                              # Evaluate visual similarity
                              visual_similarity = evaluate_file_similarity(example_cert_path, temp_file_path) ## define example cert variable

                              # Extract text from the downloaded file
                              extracted_text = extract_text_from_file(temp_file_path)

                              # Compute text similarity to key phrases
                              text_similarity_score = calculate_text_similarity_key_phrases(key_phrases, extracted_text)

                              # Compute Learner ID similarity
                              learner_id_similarity_score = calculate_learner_id_similarity(extracted_text, learner_id)

                            # Log success
                            new_row = pd.DataFrame({
                                'Learner ID': [learner_id],
                                'Certificate Type': [certificate_name],
                                'File Name': [file_name],
                                'Status': ['Success'],
                                'Highest Visual Similarity': [visual_similarity],
                                'Text Similarity Score': [text_similarity_score],
                                'Learner ID Similarity Score': [learner_id_similarity_score]
                            })
                            log_df = pd.concat([log_df, new_row], ignore_index=True)

                    except Exception as e:
                        # Log the exception
                        new_row = pd.DataFrame({
                            'Learner ID': [learner_id],
                            'Certificate Type': [certificate_name],
                            'File Name': [file_name],
                            'Status': [f'Failed - {str(e)}'],
                            'Highest Visual Similarity': [None],
                            'Text Similarity Score': [0],
                            'Learner ID Similarity Score': [0]
                        })
                        log_df = pd.concat([log_df, new_row], ignore_index=True)

    # Save log_df to a CSV
    log_df.to_csv("log.csv", index=False)


In [None]:
# Old Main Function

# Initialize an empty DataFrame for logging results and errors
log_df = pd.DataFrame(columns=['Learner ID',
                               'Certificate Type',
                               'File Name',
                               'Status',
                               'Highest Visual Similarity',
                               'Text Similarity Score',
                               'Learner ID Similarity Score'
                               ])

# Define the directory where attachments are stored
attachments_dir = '/content/drive/MyDrive/Colab Notebooks/Certificate confirmer data/attachments'

if __name__ == "__main__":

    for idx, row in df_learners_selected.iterrows():

        if idx == 30:  # Stop after processing 30 rows
            break

        learner_id = row['Learner ID']

        for certificate_column in certificate_columns:  # Assuming certificate_columns is defined elsewhere

            cert_data = row[certificate_column]
            if pd.isna(cert_data):  # Skip if this cell is NaN
                continue

            file_name = extract_file_name(cert_data)  # Assuming this function is defined elsewhere
            file_path = os.path.join(attachments_dir, file_name)

            example_cert_paths = cert_name_to_example_paths.get(certificate_column, [])
            if not example_cert_paths:
                continue

            for example_cert_path in example_cert_paths:

                try:
                    visual_similarity = evaluate_file_similarity(example_cert_path, file_path)
                    text_similarity = calculate_text_similarity(example_cert_path, file_path)
                    learner_id_similarity = calculate_learner_id_similarity(learner_id, file_path)

                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'Certificate Type': [certificate_column],
                        'File Name': [file_name],
                        'Status': ['Success'],
                        'Highest Visual Similarity': [visual_similarity],
                        'Text Similarity Score': [text_similarity],
                        'Learner ID Similarity Score': [learner_id_similarity]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

                except Exception as e:
                    new_row = pd.DataFrame({
                        'Learner ID': [learner_id],
                        'Certificate Type': [certificate_column],
                        'File Name': [file_name],
                        'Status': [f'Failed - {str(e)}'],
                        'Highest Visual Similarity': [None],
                        'Text Similarity Score': [0],
                        'Learner ID Similarity Score': [0]
                    })
                    log_df = pd.concat([log_df, new_row], ignore_index=True)

    log_df.to_csv("log.csv", index=False)
