In [1]:
from datetime import datetime, timedelta
import sys
import os
import pandas as pd
import numpy as np
import seaborn as sns
colony = 'worker05'
date_tagging = [ [2024, 3, 14], [2024, 4, 11] ] 
crop_datetime = date_tagging[0]

In [2]:
def parameters_list_crop_by_datetime(lines, crop_date):
    # initialize lists
    ID_num = []
    center_x = []
    center_y = []
    angle = []
    date_time = []
    CPU_temp = []

    for line in lines:
        # replace unnecessary characters in line
        line = line.replace('[','')
        line = line.replace(']','')
        line = line.replace('CPU','')
        
        # replace month name with month number so that it can be converted to datetime type
        months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
        for month in range(0,len(months)):
            line = line.replace(months[month],str(month+1))
            
        # split line by spaces
        line = line.split()
        
        # extract info and convert to usable types
        ID_num.append(int(line[6]))
        center_x.append(float(line[8]))
        center_y.append(float(line[9]))
        angle.append(float(line[11]))
        CPU_temp.append(float(line[13]))
        
        # record dates and times as datetime type for calculations
        (h, m, s) = line[3].split(':')
        line_time = datetime(int(line[4]),int(line[1]),int(line[2]),int(h),int(m),int(s))
        date_time.append(line_time)
    
    
    for index in range(0,len(date_time)):
        if date_time[index] >= datetime(crop_date[0], crop_date[1], crop_date[2]):
            crop_date_index = index
            break
    
    
    
    return ID_num[crop_date_index::], center_x[crop_date_index::], center_y[crop_date_index::], angle[crop_date_index::], date_time[crop_date_index::], CPU_temp[crop_date_index::]
    

In [3]:
with open(colony + '.txt') as file: # $ change file name if necessary 
    lines = file.readlines()
    file.close()

ID_num, center_x, center_y, angles, date_time, CPU_temp = parameters_list_crop_by_datetime(lines, crop_datetime)


In [4]:
vdf = pd.DataFrame.from_dict({'track_tagid':ID_num,'x':center_x,'y':center_y,'datetime':date_time,'angle':angles})

In [5]:
#SECONDS THRESHOLD
#used to classify multiple detections into part of a single event
#also used to tell when two detections are part of different events
#by checking the time distance between them
t = 15

#DISTANCE THRESHOLD
#used to classify an event as entering or exiting
#when two consecutive detections of the same event 
#have this distance in y position, they are utilized to predict
#the trajectory. If not then it checks the detection prior for
#the distance threshold, and continues doing so until it finds
#the last detection in the event or until it finds a distance
#of more than the threshold

t2 = 10

#ANGLE THRESHOLD
#used to generate angle ranges for classifying as exiting
#or entering

angle = 30


In [6]:
#sort values by ID and datetime
vdf = vdf.sort_values(by=['track_tagid','datetime']).reset_index()
#obtain difference in time between datetimes
vdf['timedelta'] = vdf['datetime'].diff().apply(lambda x: x.total_seconds())
#mark all negatives or values above seconds threshold as event breakpoints
vdf['separate_event'] = (vdf['timedelta'] >= t) | (vdf['timedelta'] < 0) 

In [7]:
#store indexes of break points
break_indexes = vdf.separate_event[vdf.separate_event == True].index.tolist()

### GRAPH GENERATION

In [8]:
#filename according to csv line
index = 1
#initialize first event index value as 0
first_detection = 0
for i in range(len(break_indexes)):
    #extract all detections of an event
    detections = vdf.iloc[first_detection:break_indexes[i]+1]

    #extract y coordinates from events
    coordinates = detections['y']
    coordinates.reset_index(drop=True, inplace=True)

    #update first event index for next event extraction
    first_detection = break_indexes[i]+1

    #store positions of each detection
    positions = coordinates
    x = np.arange(len(positions))
    plot = sns.lineplot(x=x,y=positions)
    plot.set_ylim(0, 300)
    #invert y axis so inside is up and outside is down
    plot.invert_yaxis()
    plt = plot.get_figure()
    plt.savefig(f"beecam-graphs/{index}.png")
    plt.clf()
    index += 1

<Figure size 640x480 with 0 Axes>

## Modified look-at prior events for this type of data

In [9]:
def beeCleanPrior(bee):

    new_event = []
    datetime = []
    ids = []
    #initialize first event index value as 0
    first_detection = 0
    for i in range(len(break_indexes)):

        #store id and datetime of event
        ids.append(vdf['track_tagid'].iloc[break_indexes[i]])
        datetime.append(vdf['datetime'].iloc[break_indexes[i]])
        
        #extract all detections of an event
        detections = vdf.iloc[first_detection:break_indexes[i]+1]

        #extract y position of detections
        coordinates = detections[['y']]
        coordinates.reset_index(drop=True, inplace=True)

        #mark last y position of all detections
        final = coordinates['y'].iloc[-1]

         #iterate backwards to find first prior y that has a difference from
        #the last value above the distance threshold
        for k in range(len(coordinates)):
            prev = coordinates['y'].iloc[len(coordinates)-k-1]
            dif = final - prev
            if abs(dif) >= t2:
                if dif > 0:
                    new_event.append('exiting')
                elif dif < 0:
                    new_event.append('entering')
                else:
                    new_event.append('unknown')
                break
            elif k == len(coordinates) - 1:
                if dif > 0:
                    new_event.append('exiting')
                elif dif < 0:
                    new_event.append('entering')
                else:
                    new_event.append('unknown')
        #update initial index
        first_detection = break_indexes[i]+1
        

    datadict ={'tagID':ids,'datetime':datetime,'event':new_event}
    return pd.DataFrame.from_dict(datadict)
            

In [10]:
prior = beeCleanPrior(vdf)
prior.to_csv("beecam_prior.csv",index=False)

In [11]:
#calculate how many are unknowns
len(prior[prior['event'] == 'unknown'])/len(prior)

0.3637751208343933

## Angle based from BeeCam-AprilTag

In [12]:
def beeCleanAngle(bee):

    ids = []
    new_event = []
    datetime = []
    #initialize first event index value as 0
    first_detection = 0
    
    enter_min = 180 + angle
    enter_max = 360 - angle
    exit_min = angle
    exit_max = 180 - angle
        
    for i in range(len(break_indexes)):

        #store id and datetime of event
        ids.append(vdf['track_tagid'].iloc[break_indexes[i]])
        datetime.append(vdf['datetime'].iloc[break_indexes[i]])

        #extract all detections of an event
        detections = vdf.iloc[first_detection:break_indexes[i]+1]

        #obtain all angles
        coordinates = detections['angle'].to_numpy()

        #obtain directions from angles
        
        unit_dx = np.cos(np.deg2rad(coordinates))
        unit_dy = np.sin(np.deg2rad(coordinates))
        avg_x = np.average(unit_dx)
        avg_y = np.average(unit_dy)
        if avg_x == 0 and avg_y == 0:
                    deg = 0
        elif avg_x == 0 and avg_y != 0:
            if avg_y > 0:
                deg = 270
            elif avg_y < 0:
                deg = 90
        else:
            # determine direction angle using arctan
            deg = np.rad2deg(np.arctan(avg_y/avg_x))
                    
            # since arctan limits are (-90,90), use coordinate directions to 
            # correct the angle to be within standard [0,360) range
            if avg_x > 0 and avg_y >= 0:
                deg = deg
            elif avg_x < 0 and avg_y >= 0:
                deg = 180 + deg
            elif avg_x < 0 and avg_y < 0:
                deg = deg + 180
            elif avg_x > 0 and avg_y < 0:
                deg = 360 + deg

        if deg >= exit_min and deg <= exit_max:
            new_event.append('exiting')
        elif deg >= enter_min and deg <= enter_max:
            new_event.append('entering')
        else:
            new_event.append('unknown')
        #update initial index
        first_detection = break_indexes[i]+1
                    
    datadict ={'tagID':ids,'datetime':datetime,'event':new_event}
    return pd.DataFrame.from_dict(datadict)


In [13]:
summed = beeCleanAngle(vdf)
summed.to_csv("beecam_angle.csv",index=False)

In [14]:
#calculate amount of unknowns
len(summed[summed['event'] == 'unknown'])/len(summed)

0.178326125667769

In [15]:
#compare similarly classified events for both implementations
len(prior[(prior['event'] == summed['event']) & (prior['event'] != 'unknown') & (summed['event'] != 'unknown')])/len(prior[(prior['event'] != 'unknown') & (summed['event'] != 'unknown')])

0.6626201315123925