In [None]:
import smtplib
from email.mime.text import MIMEText
import requests
import time

# Constants
LABEL_STUDIO_URL = 'https://app.heartex.com'
TASK_ID = 123043171
API_TOKEN = ''
GROUND_TRUTH_MAIL = ''
ANNOTATION_1_MAIL = ''
ANNOTATION_2_MAIL = ''
SMTP_SERVER = ''
SMTP_PORT = 465
SMTP_SENDER = ''
SMTP_PASSWORD = ''
CHECK_INTERVAL = 5  # in seconds
MAX_RUNTIME = 600  # in seconds
YOUTUBE_LINK = ''

# Global variable to track email sending status
is_mail_sended = {ANNOTATION_1_MAIL: False, ANNOTATION_2_MAIL: False}

def send_email(subject, body, recipient):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = SMTP_SENDER
    msg['To'] = recipient
    with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as smtp:
        smtp.login(SMTP_SENDER, SMTP_PASSWORD)
        smtp.sendmail(SMTP_SENDER, recipient, msg.as_string())
    print(f"Mail sent to {recipient}")
    is_mail_sended[recipient] = True

def get_task_data():
    url = f'{LABEL_STUDIO_URL}/api/tasks/{TASK_ID}'
    headers = {'Authorization': f'Token {API_TOKEN}'}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

def extract_paragraph_data(task_data):
    all_paragraph = task_data['data']['my_text_1']
    sections = ['TR-1', 'EN-2', 'TR-2', 'EN-3', 'TR-3']
    positions = [all_paragraph.find(section) for section in sections] + [len(all_paragraph)]
    intervals = [(positions[i], positions[i+1]) for i in range(len(positions) - 1)]
    return all_paragraph, intervals

def separate_clean_data(paragraph):
    lines = paragraph.strip().split('\n')
    english_data, turkish_data = {}, {}
    for line in lines:
        line = line.strip()
        if line.startswith('EN-'):
            key, value = line.split(': ')
            english_data[key] = value
        elif line.startswith('TR-'):
            key, value = line.split(': ')
            turkish_data[key] = value
    return english_data, turkish_data

def count_words(data, keys):
    return sum(len(data[key].split()) for key in keys)

def get_annotation_results(annotation, email, intervals_for_sentences):    
    results = {'email': email, 'ground_truth': annotation['ground_truth']}
    features = {
        'english_detected_terms': [],
        'turkish_detected_terms': [],
        'turkish_detected_labels': [],
        'turkish_detected_corrections': [],
        'english_terimler_org_detected': []
    }
    
    for item in annotation['result']:
        if 'value' in item and 'labels' in item['value']:
            label = item['value']['labels'][0]
            text, start, end = item['value']['text'], item['value']['start'], item['value']['end']
            meta_text = item.get('meta', {}).get('text', [None])[0]
            section = None
            if 0 <= start < end <= intervals_for_sentences[0][0]:
                section = 'EN-1'
            elif intervals_for_sentences[0][0] <= start < end <= intervals_for_sentences[0][1]:
                section = 'TR-1'
            elif intervals_for_sentences[1][0] <= start < end <= intervals_for_sentences[1][1]:
                section = 'EN-2'
            elif intervals_for_sentences[2][0] <= start < end <= intervals_for_sentences[2][1]:
                section = 'TR-2'
            elif intervals_for_sentences[3][0] <= start < end <= intervals_for_sentences[3][1]:
                section = 'EN-3'
            elif intervals_for_sentences[4][0] <= start < end <= intervals_for_sentences[4][1]:
                section = 'TR-3'
            else:
                print(f"Invalid interval: {start}-{end}")
            if label == 'TERM':
                features['english_detected_terms'].append((text, start, end,section))
                if meta_text:
                    features['english_terimler_org_detected'].append((text, start, end, section, meta_text))
            elif label in ['CORRECT_TRANSLATION', 'WRONG_TRANSLATION']:
                features['turkish_detected_terms'].append((text, start, end, section))
                features['turkish_detected_labels'].append((text, start, end, section, label))
                if label == 'WRONG_TRANSLATION' and meta_text:
                    features['turkish_detected_corrections'].append((text, start, end, section, meta_text))
    
    results.update(features)
    return results

def calculate_term_detection_score(annotation, ground_truth, num_words_en, language='english'):
    feature = 'english_detected_terms' if language == 'english' else 'turkish_detected_terms'
    pred = set(annotation[feature])
    truth = set(ground_truth[feature])

    tp_set = pred.intersection(truth)
    tp = sum(len(term[0].split()) for term in tp_set)
    
    fp_set = pred - truth
    fp = sum(len(term[0].split()) for term in fp_set)
    
    fn_set = truth - pred
    fn = sum(len(term[0].split()) for term in fn_set)
    
    tn = num_words_en - tp - fp - fn

    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if precision + recall > 0 else 0
    accuracy = (tp + tn) / (tp + tn + fp + fn) if tp + tn + fp + fn > 0 else 0
    
    all_sets = [tp_set, fp_set, fn_set]

    return round(precision, 2), round(recall, 2), round(f1_score, 2), round(accuracy, 2), all_sets

def calculate_exact_match(annotation, ground_truth, feature):    
    pred = set(annotation[feature])
    truth = set(ground_truth[feature])
    
    pred_first_three = {(x[0], x[1], x[2], x[3]) for x in pred}
    truth_first_three = {(x[0], x[1], x[2], x[3] ) for x in truth}
    
    intersection = pred_first_three.intersection(truth_first_three)
    
    pred_dict = { (x[0], x[1], x[2], x[3]): x[4] for x in pred }
    truth_dict = { (x[0], x[1], x[2], x[3]): x[4] for x in truth }
    
    pred = [ (x[0], x[1], x[2], x[3], pred_dict[x]) for x in intersection]
    truth = [ (x[0], x[1], x[2],x[3], truth_dict[x]) for x in intersection]
    
    pred = set(pred)
    truth = set(truth)
    
    intersection_num = len(pred.intersection(truth))
    difference_num = len(truth - pred)
    
    intersection_set   = pred.intersection(truth)
    difference_set = pred - truth
    
    all_sets = [intersection_set, difference_set]
    
    exact_match = intersection_num / (intersection_num + difference_num) if intersection_num + difference_num > 0 else 0
    
    return round(exact_match, 2), all_sets

def get_annotation_scores(annotation, ground_truth, email, num_words_en, num_words_tr, intervals_for_sentences):
    annotation = get_annotation_results(annotation, email, intervals_for_sentences)
    precision_en_term, recall_en_term, f1_score_en_term, accuracy_en_term, sets_en_term = calculate_term_detection_score(annotation, ground_truth, num_words_en, 'english')
    
    precision_tr_term, recall_tr_term, f1_score_tr_term, accuracy_tr_term, sets_tr_term = calculate_term_detection_score(annotation, ground_truth, num_words_tr, 'turkish')
    
    exact_match_tr_labels, sets_tr_labels = calculate_exact_match(annotation, ground_truth, 'turkish_detected_labels')
    exact_match_tr_corrections, sets_tr_corrections = calculate_exact_match(annotation, ground_truth, 'turkish_detected_corrections')
    exact_match_en_links, sets_en_links = calculate_exact_match(annotation, ground_truth, 'english_terimler_org_detected')
    
    message = ""
    opening_message = f"Hello, \n\nHere is your performance report for task_id: {TASK_ID}.\n\n"
    message += opening_message
    # make -- for separation
    message += "-"*50 + "\n"
    message += f"English Term Detection:\n"
    number_of_correct_terms_en_term = len(sets_en_term[0])
    message += f"You detected {number_of_correct_terms_en_term} English Terms correctly.\n"
    number_of_not_detected_terms_en_term = len(sets_en_term[2])
    message += f"You did not detect {number_of_not_detected_terms_en_term} English Terms which are: {sets_en_term[2]}\n"
    number_of_detected_words_not_terms_en_term = len(sets_en_term[1])
    message += f"You labelled {number_of_detected_words_not_terms_en_term} words/phrases that are not terms which are: {sets_en_term[1]}\n\n"
    message += f"Your scores for English Term Detection are as follows:\n"
    message += f"Precision: {precision_en_term}\n"
    message += f"Recall: {recall_en_term}\n"
    message += f"F1 Score: {f1_score_en_term}\n"
    message += f"Accuracy: {accuracy_en_term}"
    
    message += "\n" + "-"*50 + "\n"
    
    # now turkish term detection
    message += f"Turkish Term Detection:\n"
    number_of_correct_terms_tr_term = len(sets_tr_term[0])
    message += f"You detected {number_of_correct_terms_tr_term} Turkish Terms correctly.\n"
    number_of_not_detected_terms_tr_term = len(sets_tr_term[2])
    message += f"You did not detect {number_of_not_detected_terms_tr_term} Turkish Terms which are: {sets_tr_term[2]}\n"
    number_of_detected_words_not_terms_tr_term = len(sets_tr_term[1])
    message += f"You labelled {number_of_detected_words_not_terms_tr_term} words/phrases that are not terms which are: {sets_tr_term[1]}\n\n"
    
    
    # print scores for turkish term detection
    message += f"Your scores for Turkish Term Detection are as follows:\n"
    message += f"Precision: {precision_tr_term}\n"
    message += f"Recall: {recall_tr_term}\n"
    message += f"F1 Score: {f1_score_tr_term}\n"
    message += f"Accuracy: {accuracy_tr_term}"
    
    message += "\n" + "-"*50 + "\n"
    
    # now turkish labels
    message += f"Translation Labels:\n"
    number_of_correct_labels_tr_labels = len(sets_tr_labels[0])
    message += f"Your {number_of_correct_labels_tr_labels} Turkish Labels are valid.\n"
    number_of_detected_labels_not_correct_tr_labels = len(sets_tr_labels[1])
    message += f"Your {number_of_detected_labels_not_correct_tr_labels} Turkish Labels are not valid which are: {sets_tr_labels[1]}\n\n"
    message += f"Your score for Turkish Labels Detection are as follows:\n"
    message += f"Exact Match: {exact_match_tr_labels}"
    message += "\n" + "-"* 50 + "\n"
    
    # now turkish corrections
    message += f"Corrections for Wrong Translation:\n"
    number_of_correct_corrections_tr_corrections = len(sets_tr_corrections[0])
    message += f"Your {number_of_correct_corrections_tr_corrections} Turkish Corrections are valid.\n"
    number_of_detected_corrections_not_correct_tr_corrections = len(sets_tr_corrections[1])
    message += f"Your {number_of_detected_corrections_not_correct_tr_corrections} Turkish Corrections are not valid which are: {sets_tr_corrections[1]}\n\n"
    message += f"Your score for Turkish Corrections are as follows:\n"
    message += f"Exact Match: {exact_match_tr_corrections}"
    message += "\n" + "-"*50 + "\n"
    
    # now english links
    message += f"English Links:\n"
    number_of_correct_links_en_links = len(sets_en_links[0])
    message += f"Your {number_of_correct_links_en_links} English Links are valid.\n"
    number_of_detected_links_not_correct_en_links = len(sets_en_links[1])
    message += f"Your {number_of_detected_links_not_correct_en_links} English Links are not valid which are: {sets_en_links[1]}\n\n"
    message += f"Your score for English Links are as follows:\n"
    message += f"Exact Match: {exact_match_en_links}"
    message += "\n" + "-"*50 + "\n"
    
    # give youtube link for ground truth
    message += f"Here is the youtube link for the correct annotation: {YOUTUBE_LINK}\n"
    
    # please check the video for the correct annotations
    message += f"Please check the video for the correct annotation.\n"
    return message

In [None]:
# Main loop
start_time = time.time()
while time.time() - start_time < MAX_RUNTIME:
    print("Checking for new annotations...")
    task_data = get_task_data()
    # print number of annotations
    print(f"Number of annotations: {len(task_data['annotations'])}")
    all_paragraph, intervals_for_sentences = extract_paragraph_data(task_data)
    english_data, turkish_data = separate_clean_data(all_paragraph)
    num_words_en = count_words(english_data, ['EN-1', 'EN-2', 'EN-3'])
    num_words_tr = count_words(turkish_data, ['TR-1', 'TR-2', 'TR-3'])
    annotations = task_data['annotations']
    
    
    ground_truth = get_annotation_results(annotations[0], GROUND_TRUTH_MAIL, intervals_for_sentences)

    annotation_1_message = get_annotation_scores(annotations[1], ground_truth, ANNOTATION_1_MAIL, num_words_en, num_words_tr, intervals_for_sentences)
    if not is_mail_sended[ANNOTATION_1_MAIL]:    
        send_email(f'Your Performance for task_id: {TASK_ID}', annotation_1_message, ANNOTATION_1_MAIL)
    if len(annotations) > 2:
        annotation_2_message = get_annotation_scores(annotations[2], ground_truth, ANNOTATION_2_MAIL, num_words_en, num_words_tr, intervals_for_sentences)
        if not is_mail_sended[ANNOTATION_2_MAIL]:
            send_email(f'Your Performance for task_id: {TASK_ID}', annotation_2_message, ANNOTATION_2_MAIL)
    time.sleep(CHECK_INTERVAL)
