# Validate Monday.com tasks for integration issues

Runs through MDC tasks and checks for simple validation issues, then updates the MDC issues with a status message.

Useful for later processing and posting effort hours.

In [None]:
#%load_ext nb_black

In [None]:
import logging
import os, sys
import pandas as pd
import numpy as np

from prefect import task, flow

from datetime import timedelta, datetime
from box import Box

from mondaydotcom_utils.formatted_value import (
    FormattedValue,
    get_col_defs,
    get_items_by_board,
)

# uses the pretty okay SDK here: https://github.com/ProdPerfect/monday
from monday import MondayClient

import scrapbook as sb
import dotenv

from io import StringIO
from IPython import get_ipython

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

In [None]:
# fixed vars
TASKS_BOARD_ID = "1883170887"
PROJECTS_BOARD_ID = "1882404316"

In [None]:
environment = "dev"

In [None]:
assert environment, "No environment variable found;401"

In [None]:
# check the environment vars for secrets

env_file = f".env-{environment}"
logger.info("Loading the .env file from %s", env_file)
dotenv.load_dotenv(dotenv.find_dotenv(env_file))

assert os.environ.get("MONDAY_KEY"), f"MONDAY_KEY not found in {env_file};500"

In [None]:
conn = MondayClient(os.environ.get("MONDAY_KEY"))

In [None]:
# get projects to check for lifecycle => don't task to closed projects
projects_df = get_items_by_board(conn, PROJECTS_BOARD_ID).fillna("")

projects_df.rename(
    columns={
        "monday_id": "project_id",
        "monday_name": "Project Name",
        "Project Lifecycle__text": "Project Lifecycle",
    },
    inplace=True,
)

columns_to_drop = [
    "Project Lifecycle__changed_at",
    "Subitems",
    "Project Contacts",
    "Repo Description__mirror",
    "Timeline",
    "Timeline Days",
    "Dependency",
    "Date Added__default_formatter",
    "Project Tasks",
    "Tasks Status__mirror",
    "Timeline__to",
    "Timeline__from",
    "Timeline__changed_at",
    "Customer Source",
    "Grant Number",
    "Notes",
    "Date Added",
    "Account",
]
projects_df.drop(
    columns_to_drop,
    axis=1,
    inplace=True,
)

projects_df

In [None]:
# get done tasks
# tasks_df = get_items_by_board(conn, TASKS_BOARD_ID, "status", "Done")
tasks_df = get_items_by_board(conn, TASKS_BOARD_ID)

# Do not include Posted tasks
tasks_df = tasks_df.loc[
    ~tasks_df["Integration Message"].str.startswith("Posted", na=False)
]

tasks_df.rename(
    columns={
        "monday_id": "task_id",
        "monday_name": "Task Name",
        "Customer Project": "project_id",
    },
    inplace=True,
)

# add columns back if used in the validation
columns_to_drop = [
    "Subtasks",
    "Status__text",
    "Status__changed_at",
    "Integration Message",
    "Timeline__to",
    "Timeline__from",
    "Timeline__visualization_type",
    "Timeline Days",
    "Timeline Hours (Estimated)__formula",
    "Actual Time",
    "Total Actual Hours__formula",
    "Notes",
    "Dependencies",
    "Customer Repos",
    "Pull Request URL",
    "Issue URL",
    "Date Added",
    "Date Added__default_formatter",
    "Date Completed",
    "Extended Hours",
    "Project Status__mirror",
    "Project Closed Date__mirror",
    "Reported Month-end Date",
    "Actual Time__running",
    "Actual Time__duration",
    "Actual Time__startDate",
    "Timeline__changed_at",
    "Timeline",
    "Status",
    "Actual Time__changed_at",
]

# # remove columns to make it all easier to _see_
tasks_df.drop(
    columns_to_drop,
    axis=1,
    inplace=True,
)

tasks_df = tasks_df.explode(["project_id"], ignore_index=True)
tasks_df.head()

In [None]:
# merge in the project info for later rules
tasks_df = tasks_df.merge(projects_df, how="left", on="project_id")

In [None]:
def validate_task_record(record):
    """
    Validate checks individual records
    and we'll use those rules to create journal records later.

    Rules:
      1. Either actual hours or sessions times are used, but not both.
         If both are found, this is an error.
      2. If actual hours is used, then the number of owners dictates the number
         of journal records. E.g., actual hours = 15, with 3 owners, yields
         three journal entries at 5 each (actual hours / owner count).
         If no owners are found, this is an error.
      3. If no time fields, either actual or sessions, this is a problem.

      If session times are used, then a journal entry is created for each
         session.
    """

    if isinstance(record["Actual Time__additional_value"], list):
        sessions_list = record["Actual Time__additional_value"]
    else:
        sessions_list = []

    if isinstance(record["Owner"], list):
        owners_list = record["Owner"]
    else:
        owners_list = []

    actual_hours = record["Actual Hours"]
    len_sessions_list = len(sessions_list)
    len_owners_list = len(owners_list)
    title = record["Task Name"]
    project_id = record["project_id"]
    project_lifecycle = record["Project Lifecycle"]

    logger.debug(
        "actual_hours:%s, len(session_list):%s, len(owners_list):%s",
        actual_hours,
        len_sessions_list,
        len_owners_list,
    )

    # project is required
    if pd.isna(project_id) or project_id == "" or not project_id:
        record["integration_state"] = "STOP"
        record["integration_state_rule"] = "project_is_required"
        logger.warning("%s: %s", record["integration_state_rule"], title)

    # rule 1
    elif not pd.isna(actual_hours) and len_sessions_list > 0:
        record["integration_state"] = "STOP"
        record["integration_state_rule"] = "actual_hours_and_sessions"
        logger.warning("%s: %s", record["integration_state_rule"], title)

    # rule 2 - using actual hours requires at least one owner
    elif not pd.isna(actual_hours) and len_owners_list == 0:
        record["integration_state"] = "STOP"
        record["integration_state_rule"] = "actual_hours_and_no_owners"
        logger.warning("%s: %s", record["integration_state_rule"], title)

    # rule 3
    elif pd.isna(actual_hours) and len_sessions_list == 0:
        record["integration_state"] = "STOP"
        record["integration_state_rule"] = "no_actual_hours_and_no_sessions"
        logger.warning("%s: %s", record["integration_state_rule"], title)

    elif project_lifecycle == "Closed":
        record["integration_state"] = "STOP"
        record["integration_state_rule"] = "project_is_closed"
        logger.warning("%s: %s", record["integration_state_rule"], title)

    else:
        record["integration_state"] = "Ready"
        record["integration_state_rule"] = "Ready"

    return record

In [None]:
# validate each record
records = tasks_df.reset_index().to_dict("records")

vald_recs = []

for record in records:
    # validate the records
    vald_rec = validate_task_record(record)
    if vald_rec:
        vald_recs.append(vald_rec)

df = pd.DataFrame(vald_recs).set_index("index")
# clean up the dataframe since Prefect is real specific about JSON pickling
# and nan messes that up hard.
df.drop(
    [
        "Actual Hours",
        "Actual Time__additional_value",
        "integration_state",
        "project_id",
        "Owner",
    ],
    axis=1,
    inplace=True,
)
df.head()

Use prefect and mapping to update the task/item integration status in MDC.

In [None]:
@task(name="print task info")
def print_task_info(monday_conn, record):
    print(record)

In [None]:
@task(retries=3, retry_delay_seconds=15)
def update_task_integration_status(monday_conn, record: pd.core.series.Series):
    """
    Updates MDC integration state info.

    Params:
    - monday_conn is the MDC client connection
    - record is an individual row from a dataframe
    """
    logger.debug(f"Updating Monday.com record for {record['Task Name']}")

    monday_conn.items.change_item_value(
        TASKS_BOARD_ID,
        record["task_id"],
        "text01",
        f"{record['integration_state_rule']} - {datetime.now()}",
    )

In [None]:
@flow(name="update monday.com tasks")
def main_flow(monday_conn, validated_tasks):

    for index, row in validated_tasks.iterrows():
        # send updates back to Monday.com
        update_task_integration_status(monday_conn, row)

In [None]:
state = main_flow(conn, df)
state

In [None]:
# save off an output
sb.glue("updated_task_count", len(df))