In [3]:
from geoalchemy2 import Geometry, load_spatialite
from sqlalchemy import event, Table, insert, func, select, text
from sqlalchemy.orm import sessionmaker
import os
from flask import Flask
from quickannotator.db import Project, Image, AnnotationClass, Notification, Tile, Setting, Annotation
from quickannotator.db import db
import large_image
import math
import numpy as np 
from shapely.geometry import Point, Polygon
import random
from shapely.affinity import translate
import json
from tqdm import tqdm
import shapely
from shapely.geometry import shape
from quickannotator.api.v1.annotation.helper import count_annotations_within_bbox, delete_all_annotations
from quickannotator.api.v1.tile.helper import reset_all_tiles_seen, get_tile_ids_within_bbox, get_tile_id_for_point
from quickannotator.api.v1.annotation_class.helper import get_annotation_class_by_id
from quickannotator.db import build_annotation_table_name, create_dynamic_model

  from .autonotebook import tqdm as notebook_tqdm
2025-01-30 20:54:46,653	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [4]:
app = Flask("app")
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///quickannotator.db'
os.environ['SPATIALITE_LIBRARY_PATH'] = '/usr/lib/x86_64-linux-gnu/mod_spatialite.so'

db.app = app
db.init_app(app)


with app.app_context():
    db.engine.echo = False
    event.listen(db.engine, 'connect', load_spatialite)

In [None]:

with app.app_context():
    model = create_dynamic_model('annotation_1_2_gt')
    result = db.session.query(model).first()
    print(result.centroid)

In [1]:
def insert_project(app, db, name, description, is_dataset_large):
    with app.app_context():
        project = Project(name=name,
                          description=description,
                          is_dataset_large=is_dataset_large)
        db.session.add(project)
        db.session.commit()
        
def insert_image(app, db, project_id, name, path, height, width, dz_tilesize, embedding_coord, group_id, split):
    with app.app_context():
        image = Image(project_id=project_id,
                      name=name,
                      path=path,
                      height=height,
                      width=width,
                      dz_tilesize=dz_tilesize,
                      embedding_coord=embedding_coord,
                      group_id=group_id,
                      split=split)
        db.session.add(image)
        db.session.commit()

def insert_image_by_path(app, db, project_id, full_path):
    path = full_path.split("quickannotator/")[1]
    slide = large_image.getTileSource(path)
    name = os.path.basename(full_path)
    
    with app.app_context():
        image = Image(project_id=project_id,
                      name=name,
                      path=path,
                      height=slide.sizeY,
                      width=slide.sizeX,
                      dz_tilesize=slide.tileWidth,
                      embedding_coord="POINT (1 1)",
                      group_id=0,
                      split=0
                      )
        
        db.session.add(image)
        db.session.commit()
    
def insert_annotation_class(app, db, project_id, name, color, magnification, patchsize, tilesize, dl_model_objectref):
    with app.app_context():
        annotation_class = AnnotationClass(project_id=project_id,
                                           name=name,
                                           color=color,
                                           magnification=magnification,
                                           patchsize=patchsize,
                                           tilesize=tilesize,
                                           dl_model_objectref=dl_model_objectref)
        db.session.add(annotation_class)
        db.session.commit()
              
def insert_tiles(app, db, image_id, annotation_class_id):
    image_width, image_height = get_image_dimensions(app, image_id)
    
    with app.app_context():
        tilesize = AnnotationClass.query.filter_by(id=annotation_class_id).first().tilesize


    image_bbox = (0, 0, image_width, image_height)
    tile_ids = get_tile_ids_within_bbox(tilesize, image_bbox, image_width, image_height)
    tiles = [Tile(image_id=image_id, annotation_class_id=annotation_class_id, tile_id=id) for id in tile_ids]
    
    with app.app_context():
        db.session.add_all(tiles)
        db.session.commit()
        
  
def get_image_dimensions(app, image_id):
    with app.app_context():
        image = Image.query.filter_by(id=image_id).first()
        return image.width, image.height
        
def create_annotation_table(db, image_id: int, annotation_class_id: int, is_gt: bool):
    table_name = build_annotation_table_name(image_id, annotation_class_id, is_gt=is_gt)
    table = Annotation.__table__.to_metadata(db.metadata, name=table_name)
    db.metadata.create_all(bind=db.engine, tables=[table])
            
def generate_random_polygon(max_area=10, centroid=(0, 0)):
    num_points = random.randint(10, 20)  # Polygons need at least 3 points

    # Generate points in polar coordinates
    radii = np.sqrt(np.random.uniform(0, 1, num_points))  # Square root ensures uniform distribution
    angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False)

    # Convert polar coordinates to Cartesian coordinates
    points = [(r * np.cos(a), r * np.sin(a)) for r, a in zip(radii, angles)]

    # Sort points to form a simple polygon
    sorted_points = sorted(points, key=lambda p: np.arctan2(p[1], p[0]))

    # Create a Polygon and scale its area
    polygon = Polygon(sorted_points)

    # Calculate scaling factor
    current_area = polygon.area
    if current_area > 0:
        scaling_factor = np.sqrt(max_area / current_area)
        scaled_points = [(x * scaling_factor, y * scaling_factor) for x, y in sorted_points]
        polygon = Polygon(scaled_points)

    polygon = translate(polygon, xoff=centroid[0], yoff=centroid[1])

    return polygon
    
def generate_annotations(tilesize, image_width, image_height, n_polygons, image_id, annotation_class_id, gtpred):
    annotations = []
    for i in range(n_polygons):
        c = Point(np.random.randint(image_width), np.random.randint(image_height))
        
        poly: Polygon = generate_random_polygon(max_area=10000, centroid=(c.x, c.y))
        x = poly.centroid.x
        y = poly.centroid.y

        tile_id = get_tile_id_for_point(tilesize, x, y, image_width, image_height)

        if tile_id:
            d = {
                "image_id": image_id,
                "annotation_class_id": annotation_class_id,
                "isgt": gtpred == "gt",
                "centroid": poly.centroid.wkt,  # Adding SRID=0 for pixel-based coordinates
                "area": poly.area,  # The area of the polygon
                "polygon": poly.wkt,  # Adding SRID=0 for the polygon
                "custom_metrics": {"iou": 0.5},  # Convert custom_metrics to a JSON string
                "tile_id": tile_id
            }
            annotations.append(d)
        
    return annotations

        
def insert_annotations(app, db, image_id, annotation_class_id, gtpred, annotations, is_geojson=False, is_mask=False):
    all_anno = []
    
    with app.app_context():
        image_width, image_height = get_image_dimensions(app, image_id)
        tilesize = get_annotation_class_by_id(annotation_class_id).tilesize
        tablename = build_annotation_table_name(image_id, annotation_class_id, gtpred)
        model = create_dynamic_model(tablename)

        for i, d in enumerate(tqdm(annotations)):
            if is_geojson:
                if not is_mask and d['properties']['classification']['name'] != 'tubule':
                    continue
                shapely_geometry = shape(d['geometry'])
            else:
                shapely_geometry = generate_random_polygon(max_area=10000, centroid=(np.random.randint(30, image_width - 30), np.random.randint(30, image_height - 30)))
            
            if is_mask:
                tile_id = None
            else:
                x = shapely_geometry.centroid.x
                y = shapely_geometry.centroid.y
                tile_id = get_tile_id_for_point(tilesize, x, y, image_width, image_height)

            annotation = model(image_id=image_id,
                               annotation_class_id=annotation_class_id,
                               isgt=gtpred == "gt",
                               centroid=shapely_geometry.centroid.wkt,
                               area=shapely_geometry.area,
                               polygon=shapely_geometry.wkt,
                               custom_metrics={"iou": 0.5},
                               tile_id=tile_id)
            
            all_anno.append(annotation)
            
            if len(all_anno) == 1_000:
                db.session.bulk_save_objects(all_anno)
                db.session.commit()
                all_anno = []
    
        if all_anno:
            db.session.bulk_save_objects(all_anno)
            db.session.commit()

def insert_existing_annotations(app, db, image_id, annotation_class_id, gtpred, filepath):
    path = filepath.split("quickannotator/")[1]
    with open(path, 'r') as file:
        data = json.load(file)
    insert_annotations(app, db, image_id, annotation_class_id, gtpred, data, is_geojson=True, is_mask=False)

def insert_generated_annotations(app, db, image_id, annotation_class_id, gtpred, n):
    annotations = [None] * n  # Placeholder for generating n annotations
    insert_annotations(app, db, image_id, annotation_class_id, gtpred, annotations, is_geojson=False, is_mask=False)

def insert_geojson_mask_file(app, db, image_id, annotation_class_id, gtpred, filepath):
    path = filepath.split("quickannotator/")[1]
    with open(path, 'r') as file:
        data = json.load(file)["features"]
    insert_annotations(app, db, image_id, annotation_class_id, gtpred, data, is_geojson=True, is_mask=True)



In [4]:
models = [Project, Image, AnnotationClass, Notification, Tile, Setting]
with app.app_context():
    db.metadata.create_all(bind=db.engine, tables=[item.__table__ for item in models])
    create_annotation_table(db, 1, 1, is_gt=True)
    create_annotation_table(db, 1, 2, is_gt=True)
    create_annotation_table(db, 1, 3, is_gt=True)
    create_annotation_table(db, 2, 1, is_gt=True)
    create_annotation_table(db, 2, 2, is_gt=True)
    create_annotation_table(db, 2, 3, is_gt=True)

    create_annotation_table(db, 1, 2, is_gt=False)
    create_annotation_table(db, 1, 3, is_gt=False)
    create_annotation_table(db, 2, 2, is_gt=False)
    create_annotation_table(db, 2, 3, is_gt=False)
    

In [5]:
with app.app_context():
    reset_all_tiles_seen(db.session)
    delete_all_annotations(db.session, 1, 2, False)

In [6]:
insert_project(app, db, 
               name="example_project", 
               description="test", 
               is_dataset_large=False)


In [7]:

insert_image_by_path(app, db,
                     project_id=1,
                     full_path="quickannotator/data/test_ndpi/13_266069_040_003 L02 PAS.ndpi"
                     )

insert_image_by_path(app, db,
                     project_id=1,
                     full_path="quickannotator/data/test_ndpi/TCGA-23-2072-01Z-00-DX1.478243FF-BFF0-48A4-ADEA-DE789331A50E.svs")


In [8]:

insert_annotation_class(app, db,
                        project_id=None,
                        name="Tissue Mask",
                        color="black",
                        magnification=None,
                        patchsize=None,
                        tilesize=None,
                        dl_model_objectref=None)

insert_annotation_class(app, db,
                        project_id=1,
                        name="Tubule",
                        color="red",
                        magnification=10,
                        patchsize=256,
                        tilesize=2048,
                        dl_model_objectref=None)

insert_annotation_class(app, db,
                        project_id=1,
                        name="Lumen",
                        color="blue",
                        magnification=10,
                        patchsize=256,
                        tilesize=4096,
                        dl_model_objectref=None)


In [9]:

insert_tiles(app, db, image_id=1, annotation_class_id=2)

insert_tiles(app, db, image_id=2, annotation_class_id=2)


In [10]:

insert_tiles(app, db, image_id=1, annotation_class_id=3)

insert_tiles(app, db, image_id=2, annotation_class_id=3)


In [11]:

insert_geojson_mask_file(app, db,
                            image_id=1,
                            annotation_class_id=1,
                            gtpred='gt',
                            filepath='quickannotator/data/test_ndpi/13_266069_040_003 L02 PAS_tissue_mask.geojson')


  class DynamicAnnotation(base):
100%|██████████| 3/3 [00:00<00:00, 252.50it/s]


In [5]:
insert_existing_annotations(app, db,
                            image_id=1,
                            annotation_class_id=2,
                            gtpred="gt",
                            filepath='quickannotator/data/test_ndpi/13_266069_040_003 L02 PAS.json'
                            )

100%|██████████| 88605/88605 [00:05<00:00, 15659.60it/s]


In [None]:

insert_geojson_mask_file(app, db, 
                           image_id=2,
                           annotation_class_id=1,
                           gtpred='gt',
                           filepath='quickannotator/data/test_ndpi/TCGA-23-2072-01Z-00-DX1.478243FF-BFF0-48A4-ADEA-DE789331A50E_tissue_mask.geojson')


In [None]:
insert_generated_annotations(app, db, image_id=2, annotation_class_id=2, gtpred="gt", n=1_000_000)
insert_generated_annotations(app, db, image_id=2, annotation_class_id=3, gtpred="gt", n=100_000)