# Export Monday.com as parquet files

Monday.com requires either a key or oauth. In this example we use a key that's either been sent as a parameter via Papermill, or uses a set of files with connection info called secret-<environment>.yaml. The environment code is set a parameter, defaulted to 'dev'.

In [2]:
#%load_ext nb_black

import os
import logging
import pandas as pd
import prefect

from prefect import task, Flow, Parameter, unmapped
from prefect.executors import LocalExecutor, LocalDaskExecutor
from datetime import timedelta
from typing import List, Dict
from box import Box

# Homemade monday helper classes and functions
from mondaydotcom_utils.formatted_value import FormattedValue, get_col_defs
from mondaydotcom_utils.time_block import TimeBlock
from mondaydotcom_utils.utilities import validate_task_record, get_items_by_board

# uses the Monday SDK here: https://github.com/ProdPerfect/monday
from monday import MondayClient

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

BUILD_FOLDER = "_build"

In [3]:
# If this cell has the `parameters` tag, these can be set as parameters with papermill.

AGREEMENTS_BOARD_ID = "1882423671"
GRANT_FTE_ALLOC_BOARD_ID = "1916646665"
GRANTS_BOARD_ID = "1941740920"
PROJECTS_BOARD_ID = "1882404316"
TASKS_BOARD_ID = "1883170887"
ACCOUNTS_BOARD_ID = "1882424009"

# don't set this here for development work... use the secrets-<environment>.yaml files instead.
MONDAY_KEY = ""
environment = "dev"

In [4]:
def explode_assign(df: pd.DataFrame, columns: List):
    """
    Takes a dataframe of two columns, and "explodes" it by the 2nd column.
    This is used for many-to-many foreign keys.
    """ 
    temp_df = (df[columns].explode([columns[1]], ignore_index=True)).dropna()
    return temp_df

In [5]:
@task
def get_monday_client(key):
    logger = prefect.context.get("logger")
    conn = MondayClient(key)
    logger.info("Monday.com client created.")

    return conn

In [6]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_users(monday_conn) -> Dict:
    logger = prefect.context.get("logger")

    tables = {}

    users = monday_conn.users.fetch_users()["data"]["users"]
    df = pd.DataFrame(users)
    df.rename(columns={"id": "user_id"}, inplace=True)

    df.drop(
        columns=[
            "teams",
        ],
        inplace=True,
    )
    tables["users"] = df

    return tables

In [7]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_tasks(monday_conn) -> Dict:
    """
    This only exports tasks that are marked as Done.
    """
    logger = prefect.context.get("logger")

    tables = {}

    # only getting done tasks
    df = get_items_by_board(monday_conn, TASKS_BOARD_ID, "status", "Done")
    df.rename(
        columns={
            "monday_id": "task_id",
            "Customer Project": "project_id",
            "Customer Repos": "source_repo_id",
            "Owner": "user_id",
        },
        inplace=True,
    )

    # Customer Project is a foreign key to projects.
    df = df.explode(["project_id"], ignore_index=True)

    # Only include Ready tasks
    # df = df.loc[df["Integration Message"].str.startswith("Ready", na=False)]

    tables["task_repos"] = explode_assign(df, ["task_id", "source_repo_id"])
    tables["task_owners"] = explode_assign(df, ["task_id", "user_id"])

    df.drop(
        columns=["source_repo_id", "user_id"],
        inplace=True,
    )

    tables["tasks"] = df

    return tables

In [8]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_projects(monday_conn):
    logger = prefect.context.get("logger")

    tables = {}

    df = get_items_by_board(monday_conn, PROJECTS_BOARD_ID)
    df.rename(
        columns={
            "monday_id": "project_id",
            "Customer Source": "source_repo_id",
            "Project Contacts": "contact_id",
            "Agreements": "agreement_id",
            "Project Tasks": "task_id",
            "SET Resource": "user_id",
        },
        inplace=True,
    )

    tables["project_agreements"] = explode_assign(df, ["project_id", "agreement_id"])
    tables["project_customer_sources"] = explode_assign(
        df, ["project_id", "source_repo_id"]
    )
    tables["project_contacts_sources"] = explode_assign(
        df, ["project_id", "contact_id"]
    )
    tables["project_tasks"] = explode_assign(df, ["project_id", "task_id"])
    tables["project_resources"] = explode_assign(df, ["project_id", "user_id"])

    df.drop(
        columns=[
            "agreement_id",
            "source_repo_id",
            "contact_id",
            "task_id",
            "user_id",
        ],
        inplace=True,
    )
    tables["projects"] = df

    return tables

In [22]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_agreements(monday_conn):
    logger = prefect.context.get("logger")

    tables = {}

    df = get_items_by_board(monday_conn, AGREEMENTS_BOARD_ID)
    df["NTE Hours"] = pd.to_numeric(df["NTE Hours"])

    df.rename(
        columns={
            "monday_id": "agreement_id",
            "Accounts": "account_id",
            "Contacts": "contact_id",
            "Deal AE": "user_id",
            "Grant FTE Allocation": "grant_fte_alloc_id",
        },
        inplace=True,
    )

    # Account is foreign key
    df = df.explode(["account_id"], ignore_index=True)

    # all the many to many relationships
    tables["agreement_contacts"] = explode_assign(df, ["agreement_id", "contact_id"])
    tables["agreement_users"] = explode_assign(df, ["agreement_id", "user_id"])
    tables["agreement_grant_fte_alloc"] = explode_assign(
        df, ["agreement_id", "grant_fte_alloc_id"]
    )

    df.drop(
        columns=["contact_id", "user_id", "grant_fte_alloc_id"],
        inplace=True,
    )

    tables["agreements"] = df

    return tables

In [23]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_grants(monday_conn) -> Dict:
    logger = prefect.context.get("logger")

    tables = {}

    df = get_items_by_board(monday_conn, GRANTS_BOARD_ID)
    df.rename(columns={"monday_id": "grant_id"}, inplace=True)

    tables["grants"] = df

    return tables

In [24]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_accounts(monday_conn) -> Dict:
    logger = prefect.context.get("logger")

    tables = {}

    df = get_items_by_board(monday_conn, ACCOUNTS_BOARD_ID)
    df.rename(
        columns={
            "monday_id": "account_id",
            "Agreements": "agreement_id",
            "Contacts": "contact_id",
        },
        inplace=True,
    )

    tables["accounts_agreements"] = explode_assign(df, ["account_id", "agreement_id"])
    tables["accounts_contacts"] = explode_assign(df, ["account_id", "contact_id"])

    df.drop(
        columns=["agreement_id", "contact_id"],
        inplace=True,
    )

    tables["accounts"] = df

    return tables

In [25]:
@task(max_retries=3, retry_delay=timedelta(seconds=15))
def get_grant_fte_alloc(monday_conn):
    logger = prefect.context.get("logger")

    tables = {}

    df = get_items_by_board(monday_conn, GRANT_FTE_ALLOC_BOARD_ID)
    df.rename(
        columns={
            "monday_id": "grant_fte_alloc_id",
            "Grant Proposal #": "grant_id",
            "Agreements": "agreement_id",
            "Contact": "contact_id",
        },
        inplace=True,
    )

    df = df.explode(["contact_id"], ignore_index=True)
    df = df.explode(["grant_id"], ignore_index=True)

    tables["grant_fte_alloc_agreements"] = explode_assign(
        df, ["grant_fte_alloc_id", "agreement_id"]
    )

    df.drop(
        columns=["agreement_id"],
        inplace=True,
    )

    tables["grant_fte_alloc"] = df

    return tables

In [26]:
@task
def save_off(df_dict, build_folder):
    logger = prefect.context.get("logger")

    for k, v in df_dict.items():
        filename = f"{k}.parquet"
        logger.info(f"Saving to {filename}")
        v.to_parquet(os.path.join(build_folder,filename), index=False)

In [27]:
@task
def get_build_folder():
    logger = prefect.context.get("logger")
    if not os.path.exists(BUILD_FOLDER):
        logger.debug(f"{BUILD_FOLDER} doesn't exist... creating it.")
        os.mkdir(BUILD_FOLDER)
    return BUILD_FOLDER

In [28]:
with Flow("monday.com board export") as flow:

    key = Parameter("key")

    conn = get_monday_client(key)
    build_folder = get_build_folder()

    users_dict = get_users(conn)
    save_off(users_dict, build_folder)

    grant_fte_alloc_dict = get_grant_fte_alloc(conn)
    save_off(grant_fte_alloc_dict, build_folder)

    grant_dict = get_grants(conn)
    save_off(grant_dict, build_folder)

    agreements_dict = get_agreements(conn)
    save_off(agreements_dict, build_folder)

    accounts_dict = get_accounts(conn)
    save_off(accounts_dict, build_folder)

    projects_dict = get_projects(conn)
    save_off(projects_dict, build_folder)

    tasks_dict = get_tasks(conn)
    save_off(tasks_dict, build_folder)

In [29]:
if not MONDAY_KEY:
    # key hasn't been passed as a papermill parameter... get it from a file?
    secrets = Box.from_yaml(filename=f"secrets-{environment}.yaml")
    MONDAY_KEY = secrets.apps.monday.API_KEY
    
# Using Prefect for this is mostly overkill, but setting the executor to LocalDaskExecutor speeds things up.
params = {"key": MONDAY_KEY}
state = flow.run(parameters=params, executor=LocalExecutor())

[2022-04-24 10:38:24-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'monday.com board export'
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'get_build_folder': Starting task run...
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'get_build_folder': Finished task run for task with final state: 'Success'
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'key': Starting task run...
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'key': Finished task run for task with final state: 'Success'
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'get_monday_client': Starting task run...
[2022-04-24 10:38:24-0600] INFO - prefect.get_monday_client | Monday.com client created.
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'get_monday_client': Finished task run for task with final state: 'Success'
[2022-04-24 10:38:24-0600] INFO - prefect.TaskRunner | Task 'get_users': Starting task run...
[2022-04-24 10:38:26-0600] IN