In [1]:
%pwd

'/home/ubuntu/experiments/object-counter/Amit2416-Object-Detection-with-TensorFlow-Serving-FRCN'

In [3]:
# Import necessary modules
from typing import List
from dataclasses import dataclass
from abc import ABC, abstractmethod

# Define the ObjectCount data class
@dataclass
class ObjectCount:
    object_class: str
    count: int

# Define the ObjectCountRepo interface
class ObjectCountRepo(ABC):
    @abstractmethod
    def read_values(self, object_classes: List[str] = None) -> List[ObjectCount]:
        raise NotImplementedError

# Implement the CountInMemoryRepo class
class CountInMemoryRepo(ObjectCountRepo):

    def __init__(self):
        self.store = dict()

    def read_values(self, object_classes: List[str] = None) -> List[ObjectCount]:
        if object_classes is None:
            return list(self.store.values())

        object_counts = [self.store.get(object_class) for object_class in object_classes]
        
        # Filter out None values (object classes not found in the repository)
        object_counts = [obj_count for obj_count in object_counts if obj_count is not None]
        
        return object_counts

    def update_values(self, new_values: List[ObjectCount]):
        for new_object_count in new_values:
            key = new_object_count.object_class
            try:
                stored_object_count = self.store[key]
                self.store[key] = ObjectCount(key, stored_object_count.count + new_object_count.count)
            except KeyError:
                self.store[key] = ObjectCount(key, new_object_count.count)


# Create an instance of CountInMemoryRepo
count_repo = CountInMemoryRepo()

# Simulate adding object counts to the repository
object_counts_to_add = [
    ObjectCount("apple", 10),
    ObjectCount("banana", 5),
    ObjectCount("apple", 8),
    ObjectCount("orange", 12),
]

count_repo.update_values(object_counts_to_add)

# Retrieve and print all object counts from the repository
all_object_counts = count_repo.read_values()
print("All Object Counts:")
for obj_count in all_object_counts:
    print(f"{obj_count.object_class}: {obj_count.count}")

# Retrieve and print object counts for specific classes
classes_to_retrieve = ["apple", "banana", "grape"]
selected_object_counts = count_repo.read_values(classes_to_retrieve)
print("\nSelected Object Counts:")
for obj_count in selected_object_counts:
    print(f"{obj_count.object_class}: {obj_count.count}")


All Object Counts:
apple: 18
banana: 5
orange: 12

Selected Object Counts:
apple: 18
banana: 5


In [6]:
# Import necessary modules
from typing import List
from PIL import Image
from counter.domain.models import Prediction, ObjectCount, CountResponse
from counter.domain.ports import ObjectDetector, ObjectCountRepo
from functools import reduce

class Box:
    def __init__(self, x, y, width, height):
        self.x = x
        self.y = y 
        self.width = width
        self.height = height

# Define a mock ObjectDetector class for testing
class MockObjectDetector(ObjectDetector):
    def predict(self, image):
        # Mock predictions (class_name, score, box)
        predictions = [
            Prediction("cat", 0.9, Box(10, 10, 50, 50)),
            Prediction("dog", 0.8, Box(20, 20, 60, 60)),
            Prediction("cat", 0.75, Box(30, 30, 70, 70)),
            Prediction("car", 0.6, Box(40, 40, 80, 80)),
        ]
        return predictions

# Define a mock ObjectCountRepo class for testing
class MockObjectCountRepo(ObjectCountRepo):
    def __init__(self):
        self.store = dict()

    def read_values(self, object_classes: List[str] = None) -> List[ObjectCount]:
        if object_classes is None:
            return list(self.store.values())
        return [self.store.get(object_class) for object_class in object_classes]

    def update_values(self, new_values: List[ObjectCount]):
        for new_object_count in new_values:
            key = new_object_count.object_class
            try:
                stored_object_count = self.store[key]
                self.store[key] = ObjectCount(key, stored_object_count.count + new_object_count.count)
            except KeyError:
                self.store[key] = ObjectCount(key, new_object_count.count)

# Define the CountDetectedObjects class
class CountDetectedObjects:
    def __init__(self, object_detector: ObjectDetector, object_count_repo: ObjectCountRepo):
        self.__object_detector = object_detector
        self.__object_count_repo = object_count_repo

    def execute(self, image, threshold) -> CountResponse:
        # Step 1: Get predictions from the object detector
        predictions = self.__find_valid_predictions(image, threshold)
        
        # Step 2: Count objects based on predictions and update the repository
        object_counts = count(predictions)
        self.__object_count_repo.update_values(object_counts)
        
        # Step 3: Read the total object counts from the repository
        total_objects = self.__object_count_repo.read_values()
        
        # Step 4: Return the counts as a CountResponse object
        return CountResponse(current_objects=object_counts, total_objects=total_objects)

    def __find_valid_predictions(self, image, threshold):
        # Use the object detector to get predictions
        predictions = self.__object_detector.predict(image)
        return list(over_threshold(predictions, threshold=threshold))

# Define a function to filter predictions above a threshold
def over_threshold(predictions: List[Prediction], threshold: float):
    return filter(lambda prediction: prediction.score >= threshold, predictions)

# Define a function to count occurrences of object classes
def count(predictions: List[Prediction]) -> List[ObjectCount]:
    object_classes = map(lambda prediction: prediction.class_name, predictions)
    object_classes_counter = reduce(__count_object_classes, object_classes, {})
    return [ObjectCount(object_class, occurrences) for object_class, occurrences in object_classes_counter.items()]

# Define a function to count occurrences of object classes
def __count_object_classes(class_counter: dict, object_class: str):
    class_counter[object_class] = class_counter.get(object_class, 0) + 1
    return class_counter

# Example usage:
if __name__ == "__main__":
    # Create instances of the mock detector and repository
    mock_detector = MockObjectDetector()
    mock_repo = MockObjectCountRepo()

    # Create an instance of CountDetectedObjects
    object_counter = CountDetectedObjects(mock_detector, mock_repo)

    # Load an example image (you can replace this with your own image)
    image = Image.open("resources/images/boy.jpg")

    # Set a threshold for filtering predictions
    threshold = 0.7

    # Execute object counting
    result = object_counter.execute(image, threshold)

    # Print the current and total object counts
    print("Current Object Counts:")
    for obj_count in result.current_objects:
        print(f"{obj_count.object_class}: {obj_count.count}")

    print("\nTotal Object Counts:")
    for obj_count in result.total_objects:
        print(f"{obj_count.object_class}: {obj_count.count}")


Current Object Counts:
cat: 2
dog: 1

Total Object Counts:
cat: 2
dog: 1


In [1]:
pip install sqlalchemy

Collecting sqlalchemy
  Obtaining dependency information for sqlalchemy from https://files.pythonhosted.org/packages/dc/3e/d0c97146e4c707451c05b80ac712025d635b19aed1d867fed351de70c71c/SQLAlchemy-2.0.20-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading SQLAlchemy-2.0.20-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.4 kB)
Collecting greenlet!=0.4.17 (from sqlalchemy)
  Downloading greenlet-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (618 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m618.8/618.8 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading SQLAlchemy-2.0.20-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m58.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: greenlet, sqlalchemy
Successfully installed greenlet-2.0.2 sqla

In [3]:
from typing import List
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import declarative_base
from counter.domain.models import ObjectCount
from counter.domain.ports import ObjectCountRepo

Base = declarative_base()

class ObjectCountEntity(Base):
    __tablename__ = 'object_counts'

    id = Column(Integer, primary_key=True, autoincrement=True)
    object_class = Column(String, nullable=False)
    count = Column(Integer, nullable=False)

class CountPostgreSQLRepo(ObjectCountRepo):
    def __init__(self, db_url):
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def read_values(self, object_classes: List[str] = None) -> List[ObjectCount]:
        query = self.session.query(ObjectCountEntity).filter(
            ObjectCountEntity.object_class.in_(object_classes)) if object_classes else self.session.query(
            ObjectCountEntity)
        return [ObjectCount(row.object_class, row.count) for row in query]

    def update_values(self, new_values: List[ObjectCount]):
        for new_object_count in new_values:
            existing_record = self.session.query(ObjectCountEntity).filter_by(object_class=new_object_count.object_class).first()
            if existing_record:
                existing_record.count += new_object_count.count
            else:
                new_record = ObjectCountEntity(object_class=new_object_count.object_class, count=new_object_count.count)
                self.session.add(new_record)
        self.session.commit()


In [2]:
from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class ObjectCountEntity(Base):
    __tablename__ = 'object_counts'

    id = Column(Integer, primary_key=True, autoincrement=True)
    object_class = Column(String, nullable=False)
    count = Column(Integer, nullable=False)


In [3]:
from typing import List
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
#from models import Base, ObjectCountEntity
from counter.domain.models import ObjectCount
from counter.domain.ports import ObjectCountRepo

class CountPostgreSQLRepo(ObjectCountRepo):
    def __init__(self, db_url):
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def read_values(self, object_classes: List[str] = None) -> List[ObjectCount]:
        query = self.session.query(ObjectCountEntity).filter(
            ObjectCountEntity.object_class.in_(object_classes)) if object_classes else self.session.query(
            ObjectCountEntity)
        return [ObjectCount(row.object_class, row.count) for row in query]

    def update_values(self, new_values: List[ObjectCount]):
        for new_object_count in new_values:
            existing_record = self.session.query(ObjectCountEntity).filter_by(object_class=new_object_count.object_class).first()
            if existing_record:
                existing_record.count += new_object_count.count
            else:
                new_record = ObjectCountEntity(object_class=new_object_count.object_class, count=new_object_count.count)
                self.session.add(new_record)
        self.session.commit()
