In [None]:
# # combine sessionmaker Session, with-statement & begin() to start session, handle exceptions, rollback, commit and close at once
# with Session.begin() as session:
#     session.add(object)
#     session.add_all([objects])
#     result = session.query(Project).filter(Project.name == 'Test').scalar() # scalar is good if only one match is possible due to database constraints

In [None]:
import settings # type: ignore
from camtrappy.db.testdb import initialize_new_project
from camtrappy.io import parsing

if not 'initialized' in locals():
    # Set data Dir & Data Project (file parser)
    DATA_DIR = settings.data_dir_animals
    new_project = parsing.ProjectParser(name='Test', projectfolder=None, datafolder=DATA_DIR, restrict_to=[])

    # Create new db project with parsed data
    initialize_new_project(new_project)
    initialized = True

In [None]:
import settings # type: ignore
from camtrappy.db.testdb import Session
from camtrappy.db.schema import Video, Location
from camtrappy.core.base import VideoList, VideoLoader

with Session.begin() as session:
    q = session.query(Video)
r = q.limit(10).all()
v = r[0]
print(v.fullpath)

with Session.begin() as session:
    q = session.query(Video).where(Video.location.has(name='7_Ittlingen')).order_by(Video.date, Video.time)
r = q.all()
print(r)

# Gutes Setting

gb7 = GaussianBlur((7, 7))

ce = CannyEdge(180, 400)

re = Resize()

bgs_mog = BgsMOG(250, 5, 0.7, 25, -1)

norm = Normalize()

bi = BilateralFilter(9)

base_transforms = [re]

compare_transforms = [bi, norm, bgs_mog, gb7, ce]  # ce is not so good for findContours

In [7]:
import settings # type: ignore
from camtrappy.db.testdb import Session
from camtrappy.core.base import VideoLoader, VideoPlayer
from camtrappy.core.transforms import (CannyEdge, GaussianBlur, TransformFactory,
                                       AdaptiveHistogram, Resize, Gamma, Hysteresis,
                                       BGS, Normalize, BilateralFilter, AdaptiveThreshold,
                                       BgsMOG, Threshold)

from camtrappy.core.analysis import CentroidTracker

at = AdaptiveThreshold(7, 3)
ah = AdaptiveHistogram(2, (12, 12))
gb3 = GaussianBlur((3, 3))
ce = CannyEdge(180, 400)
gamma = Gamma(0.25)
hyst = Hysteresis(0.15, 0.20)
bgs1 = BGS('KNN', 0.1)
bgs2 = BGS('KNN', 0.8)


bi = BilateralFilter(9) # smoothing that keeps the edges
norm = Normalize() # a little bit homogenization
bgs_mog = BgsMOG(hist=250, nmixtures=5, background_ratio=0.7,
                 noise=25, learningrate=-1) # background subtractor
gb7 = GaussianBlur((7, 7)) # some more smoothing
thr = Threshold(50, 'tozero') # eliminate some more noise


transforms = TransformFactory([bi, norm, bgs_mog, gb7, thr])

vl = VideoLoader(Session, 3, skip_n_frames=0) #, transforms=transforms)
object_tracker = CentroidTracker(min_area=30, eps=1.5, maxDisappeared=50)
VideoPlayer(vl).play(resize=Resize(), transforms=transforms, visitor=object_tracker, compare=True)

2021-08-03 18:04:10,911 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-08-03 18:04:10,913 INFO sqlalchemy.engine.Engine SELECT videos.id AS videos_id, videos.path AS videos_path, videos.date AS videos_date, videos.time AS videos_time, videos.fps AS videos_fps, videos.duration AS videos_duration, videos.date_added AS videos_date_added, videos.location_id AS videos_location_id, (SELECT projects.datafolder 
FROM projects, locations 
WHERE projects.id = locations.project_id AND locations.id = videos.location_id) AS anon_1, (SELECT locations.folder 
FROM locations 
WHERE locations.id = videos.location_id) AS anon_2, (SELECT projects.datafolder 
FROM projects, locations 
WHERE projects.id = locations.project_id AND locations.id = videos.location_id) || ? || (SELECT locations.folder 
FROM locations 
WHERE locations.id = videos.location_id) || ? || videos.path AS anon_3 
FROM videos 
WHERE videos.location_id = ? ORDER BY videos.date, videos.time
2021-08-03 18:04:10,913 INFO sqlalchemy.eng

In [41]:
from __future__ import annotations
from typing import TYPE_CHECKING

from dataclasses import dataclass, field
from collections import OrderedDict

@dataclass
class Object:

    id: int
    _data: OrderedDict = field(default_factory=OrderedDict)

    def add(self, video_id, frame_no, bbox, centroid):
        video = self._data.setdefault(video_id, OrderedDict())
        video.setdefault('frames', list()).append(frame_no)
        video.setdefault('bboxes', list()).append(bbox)
        video.setdefault('centroids', list()).append(centroid)

    def bboxes(self, video_id):
        return self._data[video_id]['bboxes']
        
    def centroids(self, video_id):
        return self._data[video_id]['centroids']

    def frames(self, video_id):
        return self._data[video_id]['frames']

    @property
    def last_bbox(self):
        video_id = next(reversed(self._data))
        return self.bboxes(video_id)[-1]

    @property
    def last_centroid(self):
        video_id = next(reversed(self._data))
        return self.centroids(video_id)[-1]

    @property
    def video_ids(self):
        return list(self._data.keys())


@dataclass
class Objects:

    next_object_id: int = 0
    finished_objects: OrderedDict[int, Object] = field(default_factory=OrderedDict)
    current_objects: OrderedDict[int, Object] = field(default_factory=OrderedDict)
    disappeared_objects: OrderedDict[int, Object] = field(default_factory=OrderedDict)

    def register(self, video_id, frame_no, bbox, centroid):
        id = self.next_object_id
        self.current_objects.\
            setdefault(id, Object(id)).\
            add(video_id, frame_no, bbox, centroid)
            
        self.disappeared_objects[id] = 0
        self.next_object_id += 1

    def deregister(self, object_id):
        self.finished_objects[object_id] = self.current_objects[object_id]
        
        del self.current_objects[object_id]
        del self.disappeared_objects[object_id]

    def save_to_db(self):
        """Pop all items in self.finished_objects and push to database."""
        pass
    
# o = Objects()
# o.register(1, 200, (1,2,3,4), (0,10))
# o.deregister(0)
# o

o = Object(1)
o.add(10, 200, (1,2,3,4), (0, 10))
o.last_centroid

(0, 10)