In [None]:
# default_exp storage

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from nbdev import *

# Storage

In [None]:
#exporti

import os
import json
import sqlite3
import copy
from pathlib import Path
from collections.abc import MutableMapping

# Project Structure setup

just to folders, first containing object (e.g images) to annotate and second folder
contains annotation data/results.

In [None]:
#exporti

def setup_project_paths(project_path:Path, image_dir='pics', label_dir=None):
    assert project_path.exists(), "Project path should point to " \
                                       "existing directory"
    assert project_path.is_dir(), "Project path should point to " \
                                       "existing directory"
    im_dir = Path(project_path, image_dir)
    results_dir = Path(project_path, 'results')
    results_dir.mkdir(parents=True, exist_ok=True)
    annotation_file_path = Path(results_dir, 'annotations.json')
    
    project_paths = (im_dir, annotation_file_path)
    
    if label_dir is not None:
        project_paths += (Path(project_path, label_dir),)
        
    return project_paths

In [None]:
test_proj_path = Path('../data/test')
setup_project_paths(test_proj_path)

In [None]:
test_proj_path = Path('../data/test')
setup_project_paths(test_proj_path, image_dir='ims', label_dir='labels')

In [None]:
#exporti

def get_image_list_from_folder(image_dir, strip_path=False):
    ''' Scans <image_dir> to construct list of existing images as <Path> objects
    '''
        
    path_list = [Path(image_dir, f) for f in os.listdir(image_dir) if
                 os.path.isfile(os.path.join(image_dir, f))]
    
    if strip_path:
        path_list = [p.name for p in path_list]
    return path_list

In [None]:
get_image_list_from_folder('../data/mock/pics')

In [None]:
get_image_list_from_folder('../data/mock/pics', strip_path=True)

# Generic Storage for Annotations

key values store

- key, object_id / file_name
- value json blob containing annotation

In [None]:
#export

class AnnotationStorage(MutableMapping):
    """
    Represents generic storage for annotations.
    
    `key` is object_id / file_name and `value` - json blob containing annotation.
    
    im_paths - list of existing images as <Path> objects
    
    """
    def __init__(self, im_paths):
        self.mapping = {}
        self.update({p.name: None for p in im_paths})
        
    def __getitem__(self, key):
        return self.mapping[key]
    
    def __delitem__(self, key):
        if key in self:
            del self.mapping[key]
        
    def __setitem__(self, key, value):
        self.mapping[key] = value
        
    def __iter__(self):
        return iter(self.mapping)
    def __len__(self):
        return len(self.mapping)
    def __repr__(self):
        return f"{type(self).__name__}({self.mapping})"
    
    def save(self, file_name):
        with open(file_name, 'w', encoding='utf-8') as f:
            json.dump(self.mapping, f, ensure_ascii=False, sort_keys = True, indent=4)
        
    def load(self, file_name):
        with open(file_name) as data_file:
            self.mapping = json.load(data_file)
    
    def to_dict(self, only_annotated=True):
        if only_annotated:
            return {k: copy.deepcopy(v) for k, v in self.mapping.items() if v}
        else:
            return copy.deepcopy(self.mapping)

In [None]:
im_paths = [Path('some/path', f) for f in ['name1', 'name2', 'name3']]
storage = AnnotationStorage(im_paths)
storage

In [None]:
storage['name5'] = {'x': 5, 'y': 3, 'width': 7, 'height': 1}

In [None]:
test_eq(storage['name5'], {'x': 5, 'y': 3, 'width': 7, 'height': 1})

In [None]:
len_before = len(storage)
storage.pop('name1')
test_eq(len(storage), len_before - 1)

In [None]:
storage.to_dict()

In [None]:
storage.to_dict(only_annotated=False)

In [None]:
storage.save('/tmp/ttest.json')

In [None]:
storage_from_file = AnnotationStorage([])
storage_from_file.load('/tmp/ttest.json')
test_eq(storage, storage_from_file)

In [None]:
storage

In [None]:
test_eq(storage.get('name8', {'dict':'obj'}), {'dict':'obj'})

# DB backed storage

- Changes in annotation should be tracked in db.
- db
  - sqlite memory / disk, how to sync so that race conditons are avoided?
  - remote db (postgres, mysql etc.) with sqlalchemy layer
  
## write sqlite functions

- init db
- write json + timestamp to db BUT only if json has changed!
- iterate over db
- iterate over values with latest timestamp
- get all history for key
- allow for metadata?
- check how sqlite write locks work

In [None]:
import sqlite3

In [None]:
#exporti
def _list_tables(conn):
    query = """
    SELECT 
        name
    FROM 
        sqlite_master 
    WHERE 
        type = 'table' AND 
        name NOT LIKE 'sqlite_%';
    """
    c = conn.cursor()
    return c.execute(query).fetchall()

```sql
DROP TABLE suppliers;

CREATE TABLE suppliers (
    supplier_id   INTEGER PRIMARY KEY,
    supplier_name TEXT    NOT NULL,
    group_id      INTEGER NOT NULL,
    FOREIGN KEY (group_id)
       REFERENCES supplier_groups (group_id) 
);
```

In [None]:
conn = sqlite3.connect(":memory:")

In [None]:
#exporti
def _create_tables(conn):
    c = conn.cursor()
    query = """
    CREATE TABLE IF NOT EXISTS data (objectID TEXT,
                                     timestamp DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
                                     data JSON,
                                     author TEXT,
                                     PRIMARY KEY (objectId, timestamp)
                                     );
    """
    c.execute(query)
    query = """
    CREATE TABLE IF NOT EXISTS objects (objectID TEXT,
                                        orderID INTEGER PRIMARY KEY AUTOINCREMENT
                                        
                                       )
    """
    c.execute(query)
    conn.commit()

In [None]:
#exporti
def _list_table(conn, table_name='data', latest=True):
    if latest:
        query = """
        SELECT * from {}
        
        GROUP BY objectID
        ORDER BY timestamp
        """.format(table_name)
    else:
        query = """
        SELECT * from {}
        """.format(table_name)
    c = conn.cursor()
    return c.execute(query).fetchall()

In [None]:
_create_tables(conn)

In [None]:
_list_tables(conn)

## SQL helper functions
is needed for consistant iteration order

In [None]:
#exporti
def _get_order_id(conn, object_id, table_name='objects'):
    query = """
    SELECT orderID from {}
    WHERE objectID = '{}'
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return res[0]

In [None]:
_get_order_id(conn, 'doesnt exist')

In [None]:
#exporti
def _create_order_id(conn, object_id, table_name='objects'):
    order_id = _get_order_id(conn, object_id, table_name=table_name)
    if order_id:
        return order_id
    query = """
    INSERT INTO {}('objectID') VALUES('{}')
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query)
    return _get_order_id(conn, object_id, table_name=table_name)

In [None]:
_create_order_id(conn, 'lala')

In [None]:
_create_order_id(conn, 'lala')

In [None]:
_create_order_id(conn, 'lala2')

In [None]:
query = """
SELECT * from objects
"""
c = conn.cursor()
res = c.execute(query).fetchall()
res

In [None]:
#exporti
def _get(conn, object_id, table_name='data'):
    query = """
    SELECT data FROM {}
    WHERE objectID = '{}'
    
    GROUP BY objectID
    ORDER BY timestamp
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return json.loads(res[0])

In [None]:
#exporti
def _get_object_id_at_pos(conn, pos, table_name='objects'):
    query = """
    SELECT objectID FROM {}
    ORDER BY orderID
    LIMIT {}, 1
    """.format(table_name, pos)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    if res is not None:
        return res[0]

In [None]:
_get_object_id_at_pos(conn, 1)

In [None]:
#exporti
def _insert(conn, object_id, data: dict, table_name='data', author='author'):
    # insert if values have been changed

    last = _get(conn, object_id)
    
#     if last is None:
    _create_order_id(conn, object_id)
    if data == last:
        return
    c = conn.cursor()
    c.execute("insert into {}('objectID', 'author', 'data') values (?, ?, ?)".format(table_name),
                              [object_id, author, json.dumps(data)])
    conn.commit()

In [None]:
_insert(conn, 'lala3', {'crazy': 44})
_insert(conn, 'lala2', {'crazy': 40})
import time
time.sleep(0.1)
_insert(conn, 'lala3', {'crazy': 44 + 5})
_insert(conn, 'lala2', {'crazy': 40 + 5})

In [None]:
_list_table(conn, latest=False)

In [None]:
_list_table(conn)

In [None]:
# insert existing is ignored
_insert(conn, 'lala2', {'crazy': 40 + 5})

In [None]:
_list_table(conn, latest=False)

In [None]:
_get(conn, _get_object_id_at_pos(conn, 2))

In [None]:
#exporti
def _to_dict(conn, table_name='data'):
    query = """
    SELECT objectID, data from {}
    
    GROUP BY objectID
    ORDER BY timestamp
    """.format(table_name)
    c = conn.cursor()
    return {key: json.loads(value) for key, value in c.execute(query).fetchall()}

In [None]:
_to_dict(conn)

In [None]:
_get(conn, object_id="lala3")

In [None]:
#exporti
def _row_count(conn, table_name='data'):
    query = """
    SELECT COUNT(DISTINCT objectID) FROM {}
    """.format(table_name)
    c = conn.cursor()
    res = c.execute(query).fetchone()
    return res[0]

In [None]:
_row_count(conn)

In [None]:
#exporti
def _delete_last(conn, object_id, table_name='data'):
    query = """
    DELETE FROM {}
    WHERE objectId = '{}'
    ORDER BY timestamp
    LIMIT 1
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query)
    conn.commit()

In [None]:
#exporti
def _delete_all(conn, object_id, table_name='data'):
    query = """
    DELETE FROM {}
    WHERE objectId = '{}'
    """.format(table_name, object_id)
    c = conn.cursor()
    res = c.execute(query)
    conn.commit()

In [None]:
_list_table(conn, latest=False)

In [None]:
_delete_last(conn, 'lala3')

In [None]:
_list_table(conn, latest=False)

In [None]:
_delete_all(conn, 'lala2')

In [None]:
_list_table(conn, latest=False)

In [None]:
_row_count(conn)

## Persistent Storage with history support

In [None]:
#exporti

class AnnotationStorageIterator:
    def __init__(self, annotator_storage):
        self.annotator_storage = annotator_storage
        self.index = 0

    def __next__(self):  
        try:
            result = self.annotator_storage.at(self.index)
            self.index += 1
        except IndexError:
            raise StopIteration
        return result

    def next(self):
        return self.__next__()
    
    def prev(self):
        self.index -= 1
        if self.index < 0:
            raise StopIteration
        return self.annotator_storage.at(self.index)

In [None]:
#exporti

class AnnotationDBStorage(MutableMapping):
    def __init__(self, conn_string, im_paths=None):
        self.conn = sqlite3.connect(conn_string)
        _create_tables(self.conn)
        if im_paths:
            self.update({p.name: {} for p in im_paths})
    
    def update(self, dict_):
        for k, v in dict_.items():
            _insert(self.conn, k, v)
        
    def __getitem__(self, key):
        item = _get(self.conn, key)
        if item is None:
            raise IndexError
        return item

    def get(self, key, default):
        if _get(self.conn, key) is None:
            return default
    
    def __delitem__(self, key):
        _delete_last(self.conn, key)

    def delete_all(self, key):
        _delete_all(self.conn, key)
        
    def at(self, pos):
        # bug fix needed when combined with del operations
        object_id = _get_object_id_at_pos(self.conn, pos)
        if object_id is None or pos < 0:
            raise IndexError
        return _get(self.conn, object_id)
        
    def __setitem__(self, key, value):
        _insert(self.conn, key, value)
        
    def __iter__(self):
        return AnnotationStorageIterator(self)
    
    def __len__(self):
        return _row_count(self.conn)

    def __repr__(self):
        return f"{type(self).__name__}({_list_table(self.conn)[:2] + [' ...']})"
    
    def to_dict(self):
        return _to_dict(self.conn)

In [None]:
im_paths = [Path('some/path', f) for f in ['name1', 'name2', 'name3']]
storage = AnnotationDBStorage(":memory:", im_paths)
storage

In [None]:
storage['name5'] = {'x': 5, 'y': 3, 'width': 7, 'height': 1}

In [None]:
test_eq(storage.at(3), {'x': 5, 'y': 3, 'width': 7, 'height': 1})

In [None]:
test_eq(len(storage), 4)

In [None]:
test_eq(storage['name5'], {'x': 5, 'y': 3, 'width': 7, 'height': 1})

In [None]:
myiter = iter(storage)
for i in range(len(storage)):
    print(i, storage.at(i))
    test_eq(storage.at(i), next(myiter))

In [None]:
myiter.prev()

In [None]:
myiter.prev()

In [None]:
myiter.next()

In [None]:
for i in storage:
    print(i)

In [None]:
len_before = len(storage)
storage.pop('name1')
test_eq(len(storage), len_before - 1)

In [None]:
storage.to_dict()

In [None]:
for i in range(len(storage)):
    print(i, storage.at(i))

In [None]:
# TODO delete objectID from object table if not anymore in data

In [None]:
storage

In [None]:
test_eq(storage.get('name8', {'dict':'obj'}), {'dict':'obj'})

In [None]:
storage.to_dict()

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()