-
Notifications
You must be signed in to change notification settings - Fork 7.1k
/
employee_checkin.py
175 lines (154 loc) · 7.75 KB
/
employee_checkin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe.utils import now, cint, get_datetime
from frappe.model.document import Document
from frappe import _
from erpnext.hr.doctype.shift_assignment.shift_assignment import get_actual_start_end_datetime_of_shift
class EmployeeCheckin(Document):
def validate(self):
self.validate_duplicate_log()
self.fetch_shift()
def validate_duplicate_log(self):
doc = frappe.db.exists('Employee Checkin', {
'employee': self.employee,
'time': self.time,
'name': ['!=', self.name]})
if doc:
doc_link = frappe.get_desk_link('Employee Checkin', doc)
frappe.throw(_('This employee already has a log with the same timestamp.{0}')
.format("<Br>" + doc_link))
def fetch_shift(self):
shift_actual_timings = get_actual_start_end_datetime_of_shift(self.employee, get_datetime(self.time), True)
if shift_actual_timings[0] and shift_actual_timings[1]:
if shift_actual_timings[2].shift_type.determine_check_in_and_check_out == 'Strictly based on Log Type in Employee Checkin' and not self.log_type and not self.skip_auto_attendance:
frappe.throw(_('Log Type is required for check-ins falling in the shift: {0}.').format(shift_actual_timings[2].shift_type.name))
if not self.attendance:
self.shift = shift_actual_timings[2].shift_type.name
self.shift_actual_start = shift_actual_timings[0]
self.shift_actual_end = shift_actual_timings[1]
self.shift_start = shift_actual_timings[2].start_datetime
self.shift_end = shift_actual_timings[2].end_datetime
else:
self.shift = None
@frappe.whitelist()
def add_log_based_on_employee_field(employee_field_value, timestamp, device_id=None, log_type=None, skip_auto_attendance=0, employee_fieldname='attendance_device_id'):
"""Finds the relevant Employee using the employee field value and creates a Employee Checkin.
:param employee_field_value: The value to look for in employee field.
:param timestamp: The timestamp of the Log. Currently expected in the following format as string: '2019-05-08 10:48:08.000000'
:param device_id: (optional)Location / Device ID. A short string is expected.
:param log_type: (optional)Direction of the Punch if available (IN/OUT).
:param skip_auto_attendance: (optional)Skip auto attendance field will be set for this log(0/1).
:param employee_fieldname: (Default: attendance_device_id)Name of the field in Employee DocType based on which employee lookup will happen.
"""
if not employee_field_value or not timestamp:
frappe.throw(_("'employee_field_value' and 'timestamp' are required."))
employee = frappe.db.get_values("Employee", {employee_fieldname: employee_field_value}, ["name", "employee_name", employee_fieldname], as_dict=True)
if employee:
employee = employee[0]
else:
frappe.throw(_("No Employee found for the given employee field value. '{}': {}").format(employee_fieldname,employee_field_value))
doc = frappe.new_doc("Employee Checkin")
doc.employee = employee.name
doc.employee_name = employee.employee_name
doc.time = timestamp
doc.device_id = device_id
doc.log_type = log_type
if cint(skip_auto_attendance) == 1: doc.skip_auto_attendance = '1'
doc.insert()
return doc
def mark_attendance_and_link_log(logs, attendance_status, attendance_date, working_hours=None, late_entry=False, early_exit=False, shift=None):
"""Creates an attendance and links the attendance to the Employee Checkin.
Note: If attendance is already present for the given date, the logs are marked as skipped and no exception is thrown.
:param logs: The List of 'Employee Checkin'.
:param attendance_status: Attendance status to be marked. One of: (Present, Absent, Half Day, Skip). Note: 'On Leave' is not supported by this function.
:param attendance_date: Date of the attendance to be created.
:param working_hours: (optional)Number of working hours for the given date.
"""
log_names = [x.name for x in logs]
employee = logs[0].employee
if attendance_status == 'Skip':
frappe.db.sql("""update `tabEmployee Checkin`
set skip_auto_attendance = %s
where name in %s""", ('1', log_names))
return None
elif attendance_status in ('Present', 'Absent', 'Half Day'):
employee_doc = frappe.get_doc('Employee', employee)
if not frappe.db.exists('Attendance', {'employee':employee, 'attendance_date':attendance_date, 'docstatus':('!=', '2')}):
doc_dict = {
'doctype': 'Attendance',
'employee': employee,
'attendance_date': attendance_date,
'status': attendance_status,
'working_hours': working_hours,
'company': employee_doc.company,
'shift': shift,
'late_entry': late_entry,
'early_exit': early_exit
}
attendance = frappe.get_doc(doc_dict).insert()
attendance.submit()
frappe.db.sql("""update `tabEmployee Checkin`
set attendance = %s
where name in %s""", (attendance.name, log_names))
return attendance
else:
frappe.db.sql("""update `tabEmployee Checkin`
set skip_auto_attendance = %s
where name in %s""", ('1', log_names))
return None
else:
frappe.throw(_('{} is an invalid Attendance Status.').format(attendance_status))
def calculate_working_hours(logs, check_in_out_type, working_hours_calc_type):
"""Given a set of logs in chronological order calculates the total working hours based on the parameters.
Zero is returned for all invalid cases.
:param logs: The List of 'Employee Checkin'.
:param check_in_out_type: One of: 'Alternating entries as IN and OUT during the same shift', 'Strictly based on Log Type in Employee Checkin'
:param working_hours_calc_type: One of: 'First Check-in and Last Check-out', 'Every Valid Check-in and Check-out'
"""
total_hours = 0
in_time = out_time = None
if check_in_out_type == 'Alternating entries as IN and OUT during the same shift':
in_time = logs[0].time
if len(logs) >= 2:
out_time = logs[-1].time
if working_hours_calc_type == 'First Check-in and Last Check-out':
# assumption in this case: First log always taken as IN, Last log always taken as OUT
total_hours = time_diff_in_hours(in_time, logs[-1].time)
elif working_hours_calc_type == 'Every Valid Check-in and Check-out':
logs = logs[:]
while len(logs) >= 2:
total_hours += time_diff_in_hours(logs[0].time, logs[1].time)
del logs[:2]
elif check_in_out_type == 'Strictly based on Log Type in Employee Checkin':
if working_hours_calc_type == 'First Check-in and Last Check-out':
first_in_log_index = find_index_in_dict(logs, 'log_type', 'IN')
first_in_log = logs[first_in_log_index] if first_in_log_index or first_in_log_index == 0 else None
last_out_log_index = find_index_in_dict(reversed(logs), 'log_type', 'OUT')
last_out_log = logs[len(logs)-1-last_out_log_index] if last_out_log_index or last_out_log_index == 0 else None
if first_in_log and last_out_log:
in_time, out_time = first_in_log.time, last_out_log.time
total_hours = time_diff_in_hours(in_time, out_time)
elif working_hours_calc_type == 'Every Valid Check-in and Check-out':
in_log = out_log = None
for log in logs:
if in_log and out_log:
if not in_time:
in_time = in_log.time
out_time = out_log.time
total_hours += time_diff_in_hours(in_log.time, out_log.time)
in_log = out_log = None
if not in_log:
in_log = log if log.log_type == 'IN' else None
elif not out_log:
out_log = log if log.log_type == 'OUT' else None
if in_log and out_log:
out_time = out_log.time
total_hours += time_diff_in_hours(in_log.time, out_log.time)
return total_hours, in_time, out_time
def time_diff_in_hours(start, end):
return round((end-start).total_seconds() / 3600, 1)
def find_index_in_dict(dict_list, key, value):
return next((index for (index, d) in enumerate(dict_list) if d[key] == value), None)