Skip to content

Commit

Permalink
feat: improved fall detection with 2 frame lookback instead of 1; closes
Browse files Browse the repository at this point in the history
 #282

fix: fall detection issue #282
Merge pull request #289 from bhavikapanara/feat-3
  • Loading branch information
Ivelin Ivanov committed Feb 1, 2021
2 parents f453f17 + 5d1abe6 commit 4abcfc3
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 59 deletions.
136 changes: 85 additions & 51 deletions src/ambianic/pipeline/ai/fall_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,35 @@ def __init__(self,
}
"""
super().__init__(model, **kwargs)
# previous pose detection information to compare pose changes against
self._prev_vals = []
self._prev_time = time.monotonic()
self._prev_thumbnail = None
# angle b/w left shoulder-hip line with vertical axis
self._prev_left_angle_with_yaxis = None
# angle b/w right shoulder-hip line with vertical axis
self._prev_right_angle_with_yaxis = None
self.previous_body_vector_score = 0

# previous pose detection information for frame at time t-1 and t-2 \
# to compare pose changes against
self._prev_data = [None] * 2

# Data of previous frames lookup constants
self.POSE_VAL = '_prev_pose_dix'
self.TIMESTAMP = '_prev_time'
self.THUMBNAIL = '_prev_thumbnail'
self.LEFT_ANGLE_WITH_YAXIS = '_prev_left_angle_with_yaxis'
self.RIGHT_ANGLE_WITH_YAXIS = '_prev_right_angle_with_yaxis'
self.BODY_VECTOR_SCORE = '_prev_body_vector_score'

_dix = {self.POSE_VAL: [],
self.TIMESTAMP: time.monotonic(),
self.THUMBNAIL: None,
self.LEFT_ANGLE_WITH_YAXIS: None,
self.RIGHT_ANGLE_WITH_YAXIS: None,
self.BODY_VECTOR_SCORE: 0
}

# self._prev_data[0] : store data of frame at t-2
# self._prev_data[1] : store data of frame at t-1
self._prev_data[0] = self._prev_data[1] = _dix

self._pose_engine = PoseEngine(self._tfengine)
self._fall_factor = 60
self.confidence_threshold = self._tfengine._confidence_threshold

# Require a minimum amount of time between two video frames in seconds.
# Otherwise on high performing hard, the poses could be too close to each other and have negligible difference
# for fall detection purpose.
Expand All @@ -58,7 +73,6 @@ def __init__(self,

self.fall_detect_corr = [self.LEFT_SHOULDER, self.LEFT_HIP, self.RIGHT_SHOULDER, self.RIGHT_HIP]


def process_sample(self, **sample):
"""Detect objects in sample image."""
log.debug("%s received new sample", self.__class__.__name__)
Expand Down Expand Up @@ -87,7 +101,6 @@ def process_sample(self, **sample):
str(sample)
)


def calculate_angle(self, p):
'''
Calculate angle b/w two lines such as
Expand All @@ -108,19 +121,21 @@ def calculate_angle(self, p):
angle = abs(theta1-theta2)
return angle


def is_body_line_motion_downward(self, left_angle_with_yaxis, rigth_angle_with_yaxis):
def is_body_line_motion_downward(self, left_angle_with_yaxis,
rigth_angle_with_yaxis, inx):

test = False
l_angle = left_angle_with_yaxis and self._prev_left_angle_with_yaxis and left_angle_with_yaxis > self._prev_left_angle_with_yaxis
r_angle = rigth_angle_with_yaxis and self._prev_right_angle_with_yaxis and rigth_angle_with_yaxis > self._prev_right_angle_with_yaxis

l_angle = left_angle_with_yaxis and self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS] \
and left_angle_with_yaxis > self._prev_data[inx][self.LEFT_ANGLE_WITH_YAXIS]
r_angle = rigth_angle_with_yaxis and self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS] \
and rigth_angle_with_yaxis > self._prev_data[inx][self.RIGHT_ANGLE_WITH_YAXIS]

if l_angle or r_angle:
test = True

return test


def find_keypoints(self, image):

# this score value should be related to the configuration confidence_threshold parameter
Expand Down Expand Up @@ -173,40 +188,50 @@ def find_keypoints(self, image):

return pose, thumbnail, pose_score, pose_dix


def find_changes_in_angle(self, pose_dix):
def find_changes_in_angle(self, pose_dix, inx):
'''
Find the changes in angle for shoulder-hip lines b/w current and previpus frame.
'''

prev_leftLine_corr_exist = all(e in self._prev_vals for e in [self.LEFT_SHOULDER, self.LEFT_HIP])
prev_leftLine_corr_exist = all(e in self._prev_data[inx][self.POSE_VAL] for e in [self.LEFT_SHOULDER, self.LEFT_HIP])
curr_leftLine_corr_exist = all(e in pose_dix for e in [self.LEFT_SHOULDER, self.LEFT_HIP])

prev_rightLine_corr_exist = all(e in self._prev_vals for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP])
prev_rightLine_corr_exist = all(e in self._prev_data[inx][self.POSE_VAL] for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP])
curr_rightLine_corr_exist = all(e in pose_dix for e in [self.RIGHT_SHOULDER, self.RIGHT_HIP])

left_angle = right_angle = 0

if prev_leftLine_corr_exist and curr_leftLine_corr_exist:
temp_left_vector = [[self._prev_vals[self.LEFT_SHOULDER], self._prev_vals[self.LEFT_HIP]], [pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]]
temp_left_vector = [[self._prev_data[inx][self.POSE_VAL][self.LEFT_SHOULDER],
self._prev_data[inx][self.POSE_VAL][self.LEFT_HIP]],
[pose_dix[self.LEFT_SHOULDER], pose_dix[self.LEFT_HIP]]]
left_angle = self.calculate_angle(temp_left_vector)
log.debug("Left shoulder-hip angle: %r", left_angle)


if prev_rightLine_corr_exist and curr_rightLine_corr_exist:
temp_right_vector = [[self._prev_vals[self.RIGHT_SHOULDER], self._prev_vals[self.RIGHT_HIP]], [pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]]
temp_right_vector = [[self._prev_data[inx][self.POSE_VAL][self.RIGHT_SHOULDER],
self._prev_data[inx][self.POSE_VAL][self.RIGHT_HIP]],
[pose_dix[self.RIGHT_SHOULDER], pose_dix[self.RIGHT_HIP]]]
right_angle = self.calculate_angle(temp_right_vector)
log.debug("Right shoulder-hip angle: %r", right_angle)


angle_change = max(left_angle, right_angle)
return angle_change

def assign_prev_records(self, pose_dix, left_angle_with_yaxis, rigth_angle_with_yaxis, now, thumbnail, current_body_vector_score):
self._prev_vals = pose_dix
self._prev_left_angle_with_yaxis = left_angle_with_yaxis
self._prev_right_angle_with_yaxis = rigth_angle_with_yaxis
self._prev_time = now
self._prev_thumbnail = thumbnail
self.previous_body_vector_score = current_body_vector_score

curr_data = {self.POSE_VAL: pose_dix,
self.TIMESTAMP: now,
self.THUMBNAIL: thumbnail,
self.LEFT_ANGLE_WITH_YAXIS: left_angle_with_yaxis,
self.RIGHT_ANGLE_WITH_YAXIS: rigth_angle_with_yaxis,
self.BODY_VECTOR_SCORE: current_body_vector_score
}

self._prev_data[-2] = self._prev_data[-1]
self._prev_data[-1] = curr_data

def draw_lines(self, thumbnail, pose_dix):
"""Draw body lines if available. Return number of lines drawn."""
Expand Down Expand Up @@ -307,11 +332,13 @@ def fall_detect(self, image=None):
start_time = time.monotonic()

now = time.monotonic()
lapse = now - self._prev_time
if self._prev_vals and lapse < self.min_time_between_frames:
log.debug("Received an image frame too soon after the previous frame. Only %.2f ms apart. Minimum %.2f ms distance required for fall detection.", lapse, self.min_time_between_frames)
lapse = now - self._prev_data[-1][self.TIMESTAMP]

if self._prev_data[-1][self.POSE_VAL] and lapse < self.min_time_between_frames:
log.debug("Received an image frame too soon after the previous frame. Only %.2f ms apart.\
Minimum %.2f ms distance required for fall detection.", lapse, self.min_time_between_frames)
inference_result = None
thumbnail = self._prev_thumbnail
thumbnail = self._prev_data[-1][self.THUMBNAIL]
else:
# Detection using tensorflow posenet module
pose, thumbnail, pose_score, pose_dix = self.find_keypoints(image)
Expand All @@ -321,7 +348,7 @@ def fall_detect(self, image=None):
log.debug("No pose detected or detection score does not meet confidence threshold.")
else:
inference_result = []

current_body_vector_score = pose_score

# Find line angle with vertcal axis
Expand All @@ -330,29 +357,36 @@ def fall_detect(self, image=None):
# save an image with drawn lines for debugging
self.draw_lines(thumbnail, pose_dix)

if not self._prev_vals or lapse > self.max_time_between_frames:
log.debug("No recent pose to compare to. Will save this frame pose for subsequent comparison.")
elif not self.is_body_line_motion_downward(left_angle_with_yaxis, rigth_angle_with_yaxis):
log.debug("The body-line angle with vertical axis is decreasing from the previous frame. Not likely to be a fall.")
else:
leaning_angle = self.find_changes_in_angle(pose_dix)
for t in [-1, -2]:
lapse = now - self._prev_data[t][self.TIMESTAMP]

leaning_probability = 1 if leaning_angle > self._fall_factor else 0
fall_score = leaning_probability * (self.previous_body_vector_score + current_body_vector_score) / 2
if not self._prev_data[t][self.POSE_VAL] or lapse > self.max_time_between_frames:
log.debug("No recent pose to compare to. Will save this frame pose for subsequent comparison.")
elif not self.is_body_line_motion_downward(left_angle_with_yaxis, rigth_angle_with_yaxis, inx=t):
log.debug("The body-line angle with vertical axis is decreasing from the previous frame. Not likely to be a fall.")
else:
leaning_angle = self.find_changes_in_angle(pose_dix, inx=t)

#if leaning_angle > self._fall_factor:
if fall_score >= self.confidence_threshold:
# insert a box that covers the whole image as a workaround
# to meet the expected format of the save_detections element
box = [0, 0, 1, 1]
inference_result.append(('FALL', fall_score, box, leaning_angle))
log.info("Fall detected: %r", inference_result)
# Get leaning_probability by comparing leaning_angle with fall_factor probability.
leaning_probability = 1 if leaning_angle > self._fall_factor else 0

# Calculate fall score using average of current and previous frame's body vector score with leaning_probability
fall_score = leaning_probability * (self._prev_data[t][self.BODY_VECTOR_SCORE] + current_body_vector_score) / 2

if fall_score >= self.confidence_threshold:
# insert a box that covers the whole image as a workaround
# to meet the expected format of the save_detections element
box = [0, 0, 1, 1]
inference_result.append(('FALL', fall_score, box, leaning_angle))
log.info("Fall detected: %r", inference_result)

break

log.debug("Saving pose for subsequent comparison.")
self.assign_prev_records(pose_dix, left_angle_with_yaxis, rigth_angle_with_yaxis, now, thumbnail, current_body_vector_score)

# log.debug("Logging stats")

self.log_stats(start_time=start_time)
log.debug("thumbnail: %r", thumbnail)
return inference_result, thumbnail
return inference_result, thumbnail
Binary file added tests/pipeline/ai/fall_img_15.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/pipeline/ai/fall_img_16.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/pipeline/ai/fall_img_17.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/pipeline/ai/fall_img_1_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added tests/pipeline/ai/fall_img_2_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4abcfc3

Please sign in to comment.