# ZHCPA Dingtalk

Resources:
- [Dingtalk Dev Docs](https://open.dingtalk.com/document/orgapp/create-an-approval-form-template?spm=ding_open_doc.document.0.0.53244a97oo0wyS)
- [Dingtalk Backend API Explorer](https://open-dev.dingtalk.com/apiExplorer#/?devType=org&api=workflow_1.0%23ListProcessInstanceIds)
- [Dingtalk Frontend API Explorer](https://open-dev.dingtalk.com/apiExplorer#/jsapi?api=device.notification.extendModal)
- [Dingtalk Admin Dashboard](https://oa.dingtalk.com/admin/portal/oa#?lang=zh_CN&nation=HK&code=b885d25ac75b3ad2a203ab9fcb3d0799)

## Work/Leave Calculations

Attendance 考勤 + (OT + Leave) OA

### Attendance

- 

### OT

- Need to subtract `abort` from `success` applications (by ID?)
- Maybe a new workflow to cater for the `Food money amount` column?
- Maybe a new workflow to cater for the `Job code` + `Duration`? For `Job code` only extract the first 4 characters. Need to summarise how much time each user "OT" on each job
- ~~Only need to concern with `Duration`~~

### Leave

- Need to subtract `abort` from `success` applications (by ID?)
- ~~Only need to concern with `Duration`~~ 

### Flow

For each user:

1. Get all attendance

## Retrieve access token

In [3]:
import json
from pprint import pprint
from typing import List, Optional, Tuple
from alibabacloud_dingtalk.oauth2_1_0.client import Client as DingtalkClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dingtalk.oauth2_1_0 import models as dingtalk_oauth_models
import os
from dotenv import load_dotenv
from requests import request
from typing import Any, Dict, Callable
from functools import partial

load_dotenv()

app_key = os.getenv("APP_KEY")
app_secret = os.getenv("APP_SECRET")
admin_opuserid = os.getenv("ADMIN_OPUSERID")

if not app_key or not app_secret or not admin_opuserid:
    raise Exception("APP_KEY and APP_SECRET must be set")

config = open_api_models.Config()
config.protocol = "https"
config.region_id = "central"
dingtalk_client = DingtalkClient(config)

get_access_token_request = dingtalk_oauth_models.GetAccessTokenRequest(
    app_key=app_key, app_secret=app_secret
)
try:
    response = dingtalk_client.get_access_token(get_access_token_request)

    if getattr(response, "status_code") != 200:
        raise Exception("Request not ok")

    access_token = getattr(response.body, "access_token")
    if not access_token:
        raise Exception("No access token presented in response body")
except Exception as err:
    raise err


In [6]:
def api(endpoint: str, data: Dict[str, Any], method: str = "POST"):
    try:
        response = request(
            method,
            endpoint,
            params={"access_token": access_token},
            data=data,
        )
        if not response.ok:
            raise Exception("Request not ok")
        
        if not response.json().get("success"):
            raise Exception(f"Dingtalk request not ok: {response.json()['errmsg']}")
        
        return response.json()["result"]

    except Exception as err:
        raise err

In [3]:
def generate_depagination_logic(fetch_from_server: Callable[[int], Tuple[List[Any], Optional[int]]]):
    def wrapper():
        all_data: List[Any] = []
        offset: Optional[int] = 0
        while offset is not None:
            partial_data, offset = fetch_from_server(offset)
            all_data.extend(partial_data)

        return all_data
    
    return wrapper

In [4]:
from alibabacloud_tea_util.models import RuntimeOptions

runtime_options = RuntimeOptions()

## Get all operation users IDs

In [5]:
def get_opuserid_list(
    offset_and_size: Optional[Tuple[int, int]] = None, status_list: str = "2,3,5,-1"
) -> List[str]:
    def fetch_from_server(offset: int, size: int) -> Tuple[List[str], Optional[int]]:
        try:
            data = api(
                "https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob",
                {
                    "status_list": status_list,
                    "offset": offset,
                    "size": size,
                },
            )
            return data["data_list"], data.get("next_cursor")

        except Exception as err:
            raise err

    if offset_and_size:
        return fetch_from_server(*offset_and_size)[0]

    return generate_depagination_logic(partial(fetch_from_server, size=50))()

## Get vacation types

In [6]:
from alibabacloud_dingtalk.attendance_1_0 import models as dingtalk_attendance_models

def get_vacation_types() -> List[dingtalk_attendance_models.AddLeaveTypeResponseBodyResult]:
    try:
        data = api(
            "https://oapi.dingtalk.com/topapi/attendance/vacation/type/list",
            {
                "op_userid": admin_opuserid,
                "vacation_source": "all"
            },
        )
        return data

    except Exception as err:
        raise err

## Get operation users leave records

In [7]:
from alibabacloud_dingtalk.attendance_1_0 import models as dingtalk_attendance_models
from alibabacloud_dingtalk.attendance_1_0.client import (
    Client as DingtalkAttendanceClient,
)


def get_opusers_leave_records(
    leave_code: str, opuserids: List[str]
) -> List[dingtalk_attendance_models.GetLeaveRecordsResponseBodyResultLeaveRecords]:
    def fetch_from_server(
        pageOffset: int,
    ) -> Tuple[
        List[dingtalk_attendance_models.GetLeaveRecordsResponseBodyResultLeaveRecords],
        Optional[int],
    ]:
        req = dingtalk_attendance_models.GetLeaveRecordsRequest(
            op_user_id=admin_opuserid,
            user_ids=opuserids,
            leave_code=leave_code,
            page_number=pageOffset,
            page_size=200,
        )
        headers = dingtalk_attendance_models.GetLeaveRecordsHeaders(
            x_acs_dingtalk_access_token=access_token
        )

        try:
            response = DingtalkAttendanceClient(config).get_leave_records_with_options(
                req, headers, runtime=runtime_options
            )
            if getattr(response, "status_code") != 200:
                raise Exception("Request not ok")
            
            if not getattr(response.body, "success"):
                raise Exception(f"Dingtalk request not ok")
            
            data = response.body.result

            return (
                data.leave_records,
                pageOffset + 1 if getattr(data, "has_more") else None,
            )
        except Exception as err:
            raise err

    return generate_depagination_logic(fetch_from_server)()

## Get all workflow types

In [20]:
from alibabacloud_dingtalk.workflow_1_0 import models as dingtalk_workflow_models
from alibabacloud_dingtalk.workflow_1_0.client import (
    Client as DingtalkWorkflowClient,
)


def get_all_workflow_types() -> (
    List[dingtalk_workflow_models.GetManageProcessByStaffIdResponseBodyResult]
):
    req = dingtalk_workflow_models.GetManageProcessByStaffIdRequest(
        user_id=admin_opuserid
    )
    headers = dingtalk_workflow_models.GetManageProcessByStaffIdHeaders(
        x_acs_dingtalk_access_token=access_token
    )

    try:
        response = DingtalkWorkflowClient(config).get_manage_process_by_staff_id(req)
        if getattr(response, "status_code") != 200:
            raise Exception("Request not ok")

        if not getattr(response.body, "success"):
            raise Exception(f"Dingtalk request not ok")

        data = response.body.result

        return data
    except Exception as err:
        raise err
    
    try:
        data = api(
            "https://oapi.dingtalk.com/topapi/workflow/forms",
            {
                # "op_userid": admin_opuserid,
                # "vacation_source": "all"
            },
        )
        return data

    except Exception as err:
        raise err

## Get all workflows by process code

In [None]:
from alibabacloud_dingtalk.workflow_1_0 import models as dingtalk_workflow_models
from alibabacloud_dingtalk.workflow_1_0.client import (
    Client as DingtalkWorkflowClient,
)

def get_x(
    # leave_code: str, opuserids: List[str]
) -> List[dingtalk_workflow_models.ListProcessInstanceIdsResponseBodyResult]:
    def fetch_from_server(
        pageOffset: int,
    ) -> Tuple[
        List[dingtalk_workflow_models.ListProcessInstanceIdsResponseBodyResult],
        Optional[int],
    ]:
        req = dingtalk_workflow_models.ListProcessInstanceIdsRequest(
            start_time=0,
            next_token=pageOffset,
            max_results=20,
            process_code=
            # op_user_id=admin_opuserid,
            # user_ids=opuserids,
            # leave_code=leave_code,
            # page_number=pageOffset,
            # page_size=200,
        )
        headers = dingtalk_attendance_models.GetLeaveRecordsHeaders(
            x_acs_dingtalk_access_token=access_token
        )

        try:
            response = DingtalkAttendanceClient(config).get_leave_records_with_options(
                req, headers, runtime=runtime_options
            )
            if getattr(response, "status_code") != 200:
                raise Exception("Request not ok")
            
            if not getattr(response.body, "success"):
                raise Exception(f"Dingtalk request not ok")
            
            data = response.body.result

            return (
                data.leave_records,
                pageOffset + 1 if getattr(data, "has_more") else None,
            )
        except Exception as err:
            raise err

    return generate_depagination_logic(fetch_from_server)()

## Lab

In [8]:
import pandas as pd

In [9]:
all_opuserids = get_opuserid_list()

In [10]:
opuserids_df = pd.DataFrame(all_opuserids)
opuserids_df

Unnamed: 0,0
0,011363164867487807455
1,012314351753-368250697
2,2707060727-1037595066
3,012540471117-1641702324
4,012540442450-1155240308
...,...
293,0119122144541614492696
294,23562949295721733623
295,235629476128585729843
296,2356294751481021497293


In [11]:
vacation_types = get_vacation_types()

In [12]:
vacation_types_df = pd.DataFrame(vacation_types)
vacation_types_df

Unnamed: 0,freedom_leave,hours_in_per_day,leave_code,leave_hour_ceil,leave_name,leave_time_ceil_min_unit,leave_view_unit,natural_day_leave,paid_leave,source,when_can_leave
0,True,800,f60377e7-9c60-48bb-91e2-5d02f4415361,up,年假(小時),halfHour,hour,False,False,inner,entry
1,True,800,c4d0316f-c234-4d89-8c66-c90b5931d072,,年假(天),,day,False,False,inner,entry
2,True,800,c02aa600-8dce-4b63-9945-35d346464ef6,down,病假,halfHour,hour,False,False,inner,entry
3,True,800,a9047b4b-0d7f-4d7c-a57d-6cb89f7d3b91,,考試假,hour,day,False,False,inner,entry
4,True,800,52ed3423-0612-41c4-ace8-dc6b0c098030,down,侍/產假,hour,hour,False,True,inner,entry
5,True,800,385e7625-a0fb-4337-8226-43a2f3eac4a2,,恩恤假,hour,day,False,False,inner,entry


In [25]:
df = vacation_types_df
# df = vacation_types_df.loc[vacation_types_df["leave_name"] == "年假(天)", "leave_code"]
# leave_code = df.iloc[0]
# opuserids: List[str] = opuserids_df.iloc[:30, 0].tolist()
# opusers_leave_records = get_opusers_leave_records(leave_code, opuserids)
df

Unnamed: 0,freedom_leave,hours_in_per_day,leave_code,leave_hour_ceil,leave_name,leave_time_ceil_min_unit,leave_view_unit,natural_day_leave,paid_leave,source,when_can_leave
0,True,800,f60377e7-9c60-48bb-91e2-5d02f4415361,up,年假(小時),halfHour,hour,False,False,inner,entry
1,True,800,c4d0316f-c234-4d89-8c66-c90b5931d072,,年假(天),,day,False,False,inner,entry
2,True,800,c02aa600-8dce-4b63-9945-35d346464ef6,down,病假,halfHour,hour,False,False,inner,entry
3,True,800,a9047b4b-0d7f-4d7c-a57d-6cb89f7d3b91,,考試假,hour,day,False,False,inner,entry
4,True,800,52ed3423-0612-41c4-ace8-dc6b0c098030,down,侍/產假,hour,hour,False,True,inner,entry
5,True,800,385e7625-a0fb-4337-8226-43a2f3eac4a2,,恩恤假,hour,day,False,False,inner,entry


In [23]:
opusers_leave_records_df = pd.DataFrame(map(lambda record: record.__dict__, opusers_leave_records))
opusers_leave_records_df

Unnamed: 0,cal_type,end_time,gmt_create,gmt_modified,leave_code,leave_reason,leave_record_type,leave_status,leave_view_unit,op_user_id,quota_id,record_id,record_num_per_day,record_num_per_hour,start_time,user_id
0,,1693324799000,1691577245000,1691631975000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,e5bf67d8-02e2-4e24-b742-b3c46e642e10,300,,1692892800000,235611583320513085396
1,,1693324799000,1691473525000,1691475862000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,2791a1ad-6b9c-430d-b971-f9611663f1eb,300,,1692892800000,013200084905-1478030917
2,,1693324799000,1691388731000,1691394444000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,f43f1357-d8f7-42eb-a8b9-22934f2ab161,300,,1692892800000,012750346165772386982
3,,1693324799000,1691386329000,1691387873000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,a99c7622-7a19-4dfa-93f2-bfb8f939f30c,300,,1692892800000,235610302350-1821309259
4,,1690991999000,1690906618000,1690939904000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,39ee11ce-5b52-4770-9103-7be794deba24,100,,1690905600000,013821291159690982314
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
60,,1672329599000,1672277301000,1672277906000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,cd01e93d-3cdb-49af-bf66-7085b4bb7ae3,100,,1672243200000,013820440019-823715700
61,,1672243199000,1672189304000,1672197954000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,29c375fc-c42b-424e-83ea-d4708b809d36,100,,1672156800000,013820440019-823715700
62,,1671811199000,1671758455000,1671767684000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,63ead44e-d19e-4e91-b5f1-abb69dab1d4a,100,,1671724800000,013820440019-823715700
63,,1671811199000,1671586936000,1671602785000,c4d0316f-c234-4d89-8c66-c90b5931d072,,leave,success,day,,,c3569b6f-9067-45c8-b980-daaf1934847d,200,,1671638400000,235611583320513085396


In [21]:
get_all_workflow_types()

TeaException: Error: AuthenticationFailed.MissingParameter code: 400, 缺少参数：x-acs-dingtalk-access-token request id: 9F595729-F7E7-7C6C-8136-33164F58B64F Response: {'code': 'AuthenticationFailed.MissingParameter', 'requestid': '9F595729-F7E7-7C6C-8136-33164F58B64F', 'message': '缺少参数：x-acs-dingtalk-access-token', 'statusCode': 400}

In [11]:
# from alibabacloud_dingtalk.attendance_1_0 import models as dingtalk_attendance_models

def get_y():
    try:
        data = api(
            "https://oapi.dingtalk.com/topapi/smartwork/hrm/roster/meta/get",
            {
            },
            method="GET"
        )
        return data

    except Exception as err:
        raise err

print(get_y())

Exception: Dingtalk request not ok: Missing required arguments:agentid