In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q "labelbox[data]"
!pip install sqlalchemy==2.0.36

In [None]:
import pandas as pd
from labelbox import Client
import re
from datetime import datetime, timezone
from uuid import uuid4
import numpy as np
from IPython.display import display_html
import os
import shutil

from sqlalchemy import create_engine, select, update, text
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import sessionmaker

# Database connection

In [None]:
db_engine = create_engine()
Session = sessionmaker(db_engine)
session = Session()

Base = automap_base()
Base.prepare(db_engine)

LabelboxLog = Base.classes.labelbox_log
ReviewSheetLog = Base.classes.reviewsheet_log
ProjectCursor = Base.classes.cursor

# CONNETORS


LabelboxLog

| task_id | district | date | synced_rows | total |
| :-: | :-: | :-: | :-: | :-: |
| 1fg49-6I | Sylhet | 12-12-2024 | 89 | 7086 |
| 45g-s462 | Sandwip | 21-12-2024 | 89 | 789 |


ProjectCursor

| district | cursor |
| :-: | :-: |
| Sylhet | 2024-12-25T06:30:12+00:00 |
| Chittagong | 2024-06-25T08:30:12+00:00 |


Reviewsheet destination

| task_id | sheetname | prev_count | curr_count | task_type |
| :-: | :-: | :-: | :-: | :-: |
| 1fg49-6I | Sylhet Data | 500 | 589 | append |
| 1fg49-6I | Ashraful | 20 | 60 | append |
| 1fg49-6I | Dipa | 0 | 29 | new sheet |
| 1fg49-6I | Monisha | 20 | 40 | append |

In [None]:
class LabelboxSource():
    def __init__(self, api_key, project_id, district):
        client = Client(api_key = api_key)
        self.project_id = project_id
        self.project = client.get_project(project_id)
        self.district = district

    def get_new_or_updatd_data(self):
        """Return most recently transcribed data with a unique id"""
        try:
            cursor = session.execute(
                select(ProjectCursor.cursor).where(ProjectCursor.district == self.district)
            ).first()[0]
        except TypeError:
            cursor = None

        stream = self.get_labelbox_data(cursor)
        lb_df = self.load_labelbox_data_into_dataframe(stream)

        if cursor != None:
            lb_df = lb_df.query('updated_at > @ cursor')
        if lb_df.shape[0] > 0:
            if cursor != None:
                session.execute(
                    update(ProjectCursor).
                    where(ProjectCursor.district == self.district).
                    values(cursor = lb_df.updated_at.max())
                )
            else:
                session.add(ProjectCursor(
                    district = district, cursor = lb_df.updated_at.max())
                )
        # log actions
        TASK_ID = str(uuid4())
        session.add(LabelboxLog(
            task_id = TASK_ID,
            district = self.district,
            date = datetime.today(),
            synced_rows = lb_df.shape[0],
            total = self.project.data_row_count
        ))
        return lb_df, TASK_ID

    def get_labelbox_data(self, cursor:str|None=None):
        if cursor != None:
            x = re.findall(r"\d+-\d+-\d+T\d+:\d+:\d+\d+|\+\d+:\d+", cursor) # remove decimal from seconds
            cursor = x[0] + x[1]
        print("Fetching Data from Labelbox for:", self.project.name)
        export_task = self.project.export(
            filters = {
                "last_activity_at" : [cursor, datetime.now(timezone.utc).isoformat(timespec='seconds')]
            }, params = {
                "label_details" : True, "performance_details" : True
            }
        )
        export_task.wait_till_done()
        return export_task.get_buffered_stream()

    def load_labelbox_data_into_dataframe(self, stream):
        dataframe_rows = []
        for datarow in stream:
            if len(datarow.json["projects"][self.project_id]["labels"]) == 0:
                continue
            try:
                transcript = datarow.json["projects"][self.project_id]["labels"][0]["annotations"]["classifications"][0]["text_answer"]["content"]
            except:
                continue

            ext_id = datarow.json["data_row"]["external_id"]
            og_name = re.findall(r"(rec_\d*)", ext_id)[0] + '.wav'
            annotator = datarow.json["projects"][self.project_id]["labels"][0]["label_details"]["created_by"]
            updated_at = datarow.json["projects"][self.project_id]["labels"][0]["label_details"]["updated_at"]
            time = datarow.json['projects'][self.project_id]['labels'][0]['performance_details']['seconds_to_create']

            dataframe_rows.append([og_name, ext_id, annotator, transcript, self.district, time, updated_at])

        return pd.DataFrame(
            dataframe_rows,
            columns = [
                "original_file", "External_ID",
                "annotator", "transcripts",
                "district", "time", "updated_at"
            ]
        )

    def get_annotator_info(self):
        output = {}
        for member in self.project.members():
            if member.role().name in ["Labeler", "Reviewer"]:
                output[member.user().name] = member.user().email
        return output


#-------------------------------------------------------------------------------
# REVIEWSHEET destination
#-------------------------------------------------------------------------------
class Reviewsheet:
    def __init__(self, sheet_path, district):
        self.sheet_path = sheet_path
        self.district = district
        try:
            self.sheet = pd.ExcelFile(sheet_path)
        except FileNotFoundError:
            self.sheet = None

    def load_data_into_reviewsheet(self, source_df:pd.DataFrame, name_to_email:dict, TASK_ID:str):
        if self.sheet != None:
            print("Appending data")
            self.append_to_workbook(source_df, name_to_email, TASK_ID)
        else:
            print("Creating new workbook")
            self.create_new_workbook(source_df, name_to_email, TASK_ID)

    def create_new_workbook(self, source_df, name_to_email, TASK_ID):
        with pd.ExcelWriter(self.sheet_path, mode="w") as writer:
            source_df.to_excel(writer, sheet_name=self.district+" Data", header=True, index=False)
            session.add(ReviewSheetLog(
                task_id = TASK_ID,
                sheetname = self.district+" Data",
                prev_count = 0,
                curr_count = source_df.shape[0],
                task_type = "new sheet"
            ))
            for name, email in name_to_email.items():
                source_df.query('annotator == @email')\
                .to_excel(writer, sheet_name=name, header=True, index=False)
                session.add(ReviewSheetLog(
                    task_id = TASK_ID,
                    sheetname = name,
                    prev_count = 0,
                    curr_count = source_df.query('annotator == @email').shape[0],
                    task_type = "new sheet"
                ))

    def append_to_workbook(self, source_df, name_to_email, TASK_ID):
        with pd.ExcelWriter(
            self.sheet_path,
            mode = 'a',
            engine = "openpyxl",
            if_sheet_exists="overlay"
        ) as writer:

            # <district> Data sheet, e.g. "Sylhet Data", "Chittagong Data" etc.
            prev_count = self.sheet.parse(self.district+" Data").shape[0]
            task_type = "append"
            if source_df.shape[0] == 0:
                task_type = "no change"
            source_df.to_excel(writer, sheet_name=self.district+" Data", header=False,
                               startrow= prev_count + 1)
            session.add(ReviewSheetLog(
                task_id = TASK_ID,
                sheetname = self.district+" Data",
                prev_count = prev_count,
                curr_count = prev_count + source_df.shape[0],
                task_type = task_type
            ))

            # Individual annotator sheets
            for name, email in name_to_email.items():
                new_data = source_df.query('annotator == @email')
                prev_count = 0

                # when no new transcripts by annotator
                if new_data.shape[0] == 0:
                    task_type = "no change"
                    prev_count = pd.read_excel(self.sheet_path, sheet_name=name).shape[0]

                # new annotator joined the team
                elif name not in self.sheet.sheet_names:
                    task_type = "new sheet"
                    new_data.to_excel(writer, sheet_name=name, header=True)

                # new transcripts by annotator
                else:
                    task_type = "append"
                    prev_count = pd.read_excel(self.sheet_path, sheet_name=name).shape[0]
                    new_data.to_excel(writer, sheet_name=name, startrow=prev_count+1, header=False)

                session.add(ReviewSheetLog(
                    task_id = TASK_ID,
                    sheetname = name,
                    prev_count = prev_count,
                    curr_count = prev_count + new_data.shape[0],
                    task_type = task_type
                ))

# Backup and version control

In [None]:
def back_up(src, dst, district):
    previous_version = session.execute(text("""
    SELECT task_id FROM (
        SELECT task_id, MAX("date") FROM labelbox_log
        WHERE labelbox_log.district = :district
    );
    """), {"district":district}).one()[0]
    version_path = os.path.join(dst, f"{district}-{previous_version}.xlsx")
    try:
        shutil.copy2(src, os.path.join(dst, f"{district}-{previous_version}.xlsx"))
        print("Backing up | path:", version_path)
    except FileNotFoundError:
        return None

# Task configuration

In [None]:
district = ""
backup_path = ""
sheet_path = f"base/{district}_Data.xlsx"

In [None]:
PROJECT_ID = pd.read_excel("", index_col=0)

labelbox_src = LabelboxSource(
    api_key= "",
    project_id = PROJECT_ID.loc[district, "id"],
    district = district
)

review_dst = Reviewsheet(
    sheet_path = f"base/{sheet_path}",
    district = district
)

# Run Task

In [None]:
# RUN TASK -------------------------------------------------
labelbox_df, TASK_ID = labelbox_src.get_new_or_updatd_data()
name_to_email = labelbox_src.get_annotator_info()
print()
back_up(review_dst.sheet_path, backup_path, district)
print()
review_dst.load_data_into_reviewsheet(labelbox_df, name_to_email, TASK_ID)

session.commit()

# TASK SUMMARY ---------------------------------------------
def display_side_by_side(dfs, titles):
    html_str = ""
    for df, title in zip(dfs, titles):
        html_str += f"<div style='margin: 10px;'><h3 style='text-align: center;'>{title}</h3>{df.to_html(index=False)}</div>"
    display_html(f"<div style='display: flex; justify-content: space-around;'>{html_str}</div>", raw=True)

labelbox_log = pd.read_sql(f"select * from labelbox_log where labelbox_log.task_id = '{TASK_ID}'", db_engine)
reviewsheet_log = pd.read_sql(f"select * from reviewsheet_log where reviewsheet_log.task_id = '{TASK_ID}'", db_engine)

display_side_by_side([labelbox_log, reviewsheet_log], ["Labelbox", "Reviewsheet"])