Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions src/robin/utilities/bam_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,47 +38,63 @@ class BamEventHandler(FileSystemEventHandler):
A class to handle file system events specifically for ".bam" files.

This class extends the FileSystemEventHandler from the watchdog library
to handle creation events for ".bam" files within a watch folder.
to handle creation and move events for ".bam" files within a watch folder.

Attributes:
bam_count (DefaultDict[str, Any]): A dictionary to keep track of the number of ".bam" files
and their creation timestamps.
bam_count (Queue): A queue to store BAM file paths and their detection timestamps.
"""

def __init__(self, bam_count: Queue):
"""
Initializes the BamEventHandler with a bam_count dictionary.
Initializes the BamEventHandler with a bam_count queue.

Args:
bam_count (DefaultDict[str, Dict[str, Any]]): A dictionary to store the count and details of ".bam" files.
Expected to have a "counter" key initialized to zero.
bam_count (Queue): A queue to store BAM file paths and their timestamps.
"""
super().__init__()
self.bam_count = bam_count

def _handle_bam_file(self, file_path: str) -> None:
"""
Helper method to process BAM files consistently across different events.

This method checks if the file has a .bam extension and if so, adds it to
the processing queue with its current timestamp.

Args:
file_path (str): Path to the file to process
"""
if file_path.endswith(".bam"):
self.bam_count.put((file_path, time.time()))
logger.info(f"New .bam file detected: {file_path}")

def on_created(self, event: FileSystemEvent) -> None:
"""
Handles the file creation event.

This method is called when a new file is created in the watch folder.
If the created file has a ".bam" extension, it increments the counter in bam_count
and records the file's creation timestamp.
If the created file has a ".bam" extension, it processes the file.

Args:
event (FileSystemEvent): The event object representing the file system event.
"""
if event.is_directory:
return

if event.src_path.endswith(".bam"):
self.bam_count.put((event.src_path, time.time()))
logger.info(
# f"New .bam file detected: {event.src_path}, total count: {self.bam_count['counter']}"
f"New .bam file detected: {event.src_path}" # , total count: {self.bam_count['counter']}"
)
# except Exception as e:
# print("Bam error in handler.")
# print(e)
if not event.is_directory:
self._handle_bam_file(event.src_path)

def on_moved(self, event: FileSystemEvent) -> None:
"""
Handles file move/rename events.

This method is called when a file is moved or renamed in the watch folder,
which catches rsync's final move operation when copying files into the
watched directory.

Args:
event (FileSystemEvent): The event object representing the move/rename event,
containing both source and destination paths.
"""
if not event.is_directory:
self._handle_bam_file(event.dest_path)


def create_bam_count() -> DefaultDict[str, Dict[str, Any]]:
Expand Down