# Replace Tasks with Dynamic Tables

This notebook identifies eligible tasks that can be converted to Dynamic tables.  Dynamic tables simplify data engineering in Snowflake by providing a reliable, cost-effective, and automated way to transform data. Not every stream/task can or should be replaced.  

This notebook will:
- check actively running tasks in a Snowflake account to find tasks that `INSERT` or 'MERGE' into an existing table (from a base table) or create a table using CTAS
- identify whether the target table is currently in a share
    - **NOTE:** Data Providers should take additional steps to ensure any affected shared tables don't impact Consumers before switching to Dynamic tables
- generate the DDL to create the Dynamic table that will replace the task
    - **NOTE:** this will be done for each task in the task graph
- execute the Dynamic table DDL and remove the existing stream/task (optional)
- remove the existing target table from the share, if applicable (optional)
- drop existing stream/task (optional)
- drop the existing target table (optional)

## Prerequisites:

- The user executing this notebook, must have access to the `SNOWFLAKE` database.
- The user must have the `CREATE DYNAMIC TABLE` privilge on the schema where the new Dynamic Table will be created.
- The user must own the tasks in the database(s) set in **STEP 3**.

## STEP 1: Initiaize Session

In [None]:
import json
import numpy as np
import pandas as pd
import re
import streamlit as st
from st_aggrid import AgGrid, GridUpdateMode, JsCode
from st_aggrid.grid_options_builder import GridOptionsBuilder
import sqlparse

session = get_active_session()

#tag session
session.sql(f"""ALTER SESSION SET QUERY_TAG = '{{"origin":"sf_sit","name":"dt_conversion_task","version":{{"major":1, "minor":0}},"attributes":"session_tag"}}'""").collect()

#get current_role
current_role = session.get_current_role().replace('"','')

st.success(f"Session initialized for role: {current_role} 🎉")

## STEP 2: Function definition

In [None]:
def paginate_data(df):
	st.divider()
			
	pagination = st.empty()
	batch_size = 20  # Set the number of items per page

	if len(df) > 0:
		bottom_menu = st.columns((4, 1, 1))
		with bottom_menu[2]:
			total_pages = (
    			int(len(df) / batch_size) if int(len(df) / batch_size) > 0 else 1
    		)
			current_page = st.number_input(
    			"Page", min_value=1, max_value=total_pages, step=1
    		)
		with bottom_menu[0]:
			st.markdown(f"Page **{current_page}** of **{total_pages}** ")
    
		pages = split_frame(df, batch_size)
		pagination.dataframe(data=pages[current_page - 1], use_container_width=True)
    
		st.divider()
	else:
		st.caption("No results to display.")

@st.cache_data(show_spinner=False)
def split_frame(input_df, rows):
	df = [input_df.loc[i : i + rows - 1, :] for i in range(0, len(input_df), rows)]
	return df

def operator_attributes(query_id, task_settings):
    #check if the task's definition updates table via a merge statement
    #NOTE: currently only supporting MERGE and INSERT(from base table(s)) statements
    df_target_operation = pd.DataFrame(session.sql(f"""
                                                SELECT
                                                    OPERATOR_ATTRIBUTES:table_name::varchar TARGET_TABLE
                                                    ,OPERATOR_ATTRIBUTES:table_names[0]::varchar TARGET_TABLES
                                                    ,ARRAY_SIZE(OPERATOR_ATTRIBUTES:table_names) TARGET_TABLES_LENGTH
                                                    ,OPERATOR_TYPE
                                                FROM TABLE(GET_QUERY_OPERATOR_STATS('{query_id}')) 
                                                WHERE LOWER(OPERATOR_TYPE) IN('insert', 'merge', 'update', 'delete', 'extensionfunction', 'createtableasselect')
                                                ;
                                                """).collect())
    
    if not df_target_operation.empty:
        #get attributes
        target_table = df_target_operation.iloc[0,0] if df_target_operation.iloc[0,0] else df_target_operation.iloc[0,1]
        target_table_length = df_target_operation.iloc[0,2]
        target_operation = df_target_operation.iloc[0,3]
        
        task_settings.update({"target_table" : f"{target_table}"})  
        task_settings.update({"target_table_length" : target_table_length})
        task_settings.update({"target_operation" : f"{target_operation}"})

        return "Success"
    else:
        return f"Operator Attributes not found for query_id: {query_id}"

def convert_task(task_settings, df_shared_objs, shared_objs, eligible_tasks, ineligible_tasks):
    eligible_flag = True
    
    source_select = ""
    source_table = ""
    reason = ""
    stream = ""
    
    #get task settings
    task = task_settings["task"]
    task_type = task_settings["task_type"]
    root_task = task_settings["root_task"]
    warehouse = task_settings["warehouse"]
    schedule = task_settings["schedule"]
    predecessors = task_settings["predecessors"]
    step = task_settings["step"]
    definition = task_settings["definition"]
    condition = task_settings["condition"]
    target_table = task_settings["target_table"]
    target_table_length = task_settings["target_table_length"]
    target_operation = task_settings["target_operation"]

    #create dynamic table DDL prefix
    create_dt_ddl = f"""CREATE OR REPLACE DYNAMIC TABLE {target_table}_DT
                        TARGET_LAG = '{schedule}'
                        WAREHOUSE = {warehouse}
                        COMMENT = '{{"origin":"sf_sit","name":"dt_conversion_task","version":{{"major":1, "minor":0}},"attributes":{{"source":"task", "name":"{task}"}}}}'
                        AS
                        """

    #get stream from condition, if not null.
    if condition != 'None':
        cond_prefix = "SYSTEM$STREAM_HAS_DATA('"
        cond_suffix = "')"

        stream = "".join(condition.split(cond_prefix)[1].split(cond_suffix)[0])

    #create task details dict
    task_details = {}

    task_details.update({"task_type" : f"{task_type}"})
    task_details.update({"target_table" : f"{target_table}"})
    task_details.update({"target_operation" : f"{target_operation}"})
    task_details.update({"root_task" : f"{root_task}"})
    task_details.update({"child_tasks" : None})
    task_details.update({"schedule" : f"{schedule}"})
    task_details.update({"step" : step})
    task_details.update({"predecessors" : f"{predecessors}"})
    task_details.update({"child_objects" : None})
    task_details.update({"stream" : f"{stream}"})
    task_details.update({"child_streams" : None})

    #set share flag whether target is in a share:
    share_details = {}
    flag_target_shared = "Y" if target_table in shared_objs else "N"

    share_details.update({"target_shared" : f"{flag_target_shared}"})

    if flag_target_shared == "Y":
        shares_target = []

        df_shared_objs_filtered = df_shared_objs.query(f"""object == '{target_table}'""")

        for index, row in df_shared_objs_filtered.iterrows():
            share_details.update({"object" : f"""{row["object"]}"""})
            share_details.update({"object_type" : f"""{row["object_type"]}"""})
            shares_target.append(row["share"])
            
        share_details.update({"shares" : shares_target})

    #CTAS
    if target_operation.lower() =='createtableasselect':
        #if CTAS source is a select statement
        if re.search(r"(?s)(?=SELECT)(.*?\s+FROM.*)", definition):
            #get source select statement
            source_select = re.search(r"(?s)(?=SELECT)(.*?\s+FROM.*)", definition).group(1)
        
            #create dynamic table DDL prefix
            create_dt_ddl += f"{source_select};"
            
            #beautify dynamic table DDL
            create_dt_ddl = sqlparse.format(create_dt_ddl, reindent=True, keyword_case="upper")

            #check if create DT statement is valid using EXPLAIN
            try:
                explain_dt_statement = pd.DataFrame(session.sql(f"""EXPLAIN USING JSON {create_dt_ddl}""").collect()).iloc[0,0]
            except Exception as e:
                eligible_flag = False
                reason = str(e)
        else:
            eligible_flag = False
            create_dt_ddl = "N/A"
            reason = "The task definition does not contain a base table"
        
    
    
    #INSERT
    if target_operation.lower() == 'insert':
        if target_table_length > 1:
            eligible_flag = False
            reason = "Converting tasks with multi-table inserts is currently not supported."
        else:
            #check if insert statement contains base table
            if re.search(r"(?s)(?=SELECT)(.*?\s+FROM.*)", definition):
                #check whether insert columns are specified (and equal the number of columns in target table)
                #if insert columns are not specified, assuming the task's insert statement inserts the values correctly
                if (re.search(r"(?s)INTO(.*?)(?<=\()(.+?)(\))", definition) and 
                    len(re.search(r"(?s)INTO(.*?)(?<=\()(.+?)(\))", definition).group(2).split(",")) == len(df_target_table_clmns)
                    ) or re.search(r"(?s)INTO(.*?)(?<=\()(.+?)(\))", definition) is None:
                    
                    #get source select statement
                    source_select = re.search(r"(?s)(?=SELECT)(.*?\s+FROM.*)", definition).group(1)
    
                    #if select statement contains the stream, get it's source table and replace the stream in the select statement
                    if stream and stream.lower() in source_select.lower():
                        session.sql(f"""DESCRIBE STREAM {stream};""").collect()
                        source_table = pd.DataFrame(session.sql(f"""SELECT "table_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))""").collect()).iloc[0,0]
                        source_select = source_select.replace(stream, source_table)
    
                    #create dynamic table DDL prefix
                    create_dt_ddl += f"{source_select};"
                    
                    #beautify dynamic table DDL
                    create_dt_ddl = sqlparse.format(create_dt_ddl, reindent=True, keyword_case="upper")

                    #check if create DT statement is valid using EXPLAIN
                    try:
                        explain_dt_statement = pd.DataFrame(session.sql(f"""EXPLAIN USING JSON {create_dt_ddl}""").collect()).iloc[0,0]
                    except Exception as e:
                        eligible_flag = False
                        reason = str(e)
                else:
                    eligible_flag = False
                    create_dt_ddl = "N/A"
                    reason = "The number of columns in the INSERT column list does not match the number of columns in the target table definition. Important columns could be missing, if converted to a dynamic table."
            else:
                eligible_flag = False
                create_dt_ddl = "N/A"
                reason = "The task definition does not contain a base table"

    #MERGE
    if target_operation.lower() == 'merge':        
        #if merge source is a select statement
        if re.search(r"(?s)(?<=USING)(\(*.*?\))(?=\s+\w+\s+ON\s)", definition):
            source = re.search(r"(?s)(?<=USING)(\(*.*?\))(?=\s+\w+\s+ON\s)", definition).group(1)
            #get select statement
            source_select = re.search(r"(?s)(?<=\()(.*?)(?!.+\))", source).group(1).strip()

            #if select statement contains the stream, get it's source table and replace the stream in the select statement
            if stream and stream.lower() in source_select.lower():
                session.sql(f"""DESCRIBE STREAM {stream};""").collect()
                source_table = pd.DataFrame(session.sql(f"""SELECT "table_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))""").collect()).iloc[0,0]
                source_select = source_select.replace(stream, source_table)
        
        #if merge source is a table
        elif re.search(r"(?s)(?<=USING)([^\(].*?[^/)])(?=\s+\w+\s+ON\s)", definition):
            #get source table/stream
            source = re.search(r"(?s)(?<=USING)([^\(].*?[^/)])(?=\s+\w+\s+ON\s)", definition).group(1)
            source_table = source.replace("(","").replace(")","").strip()

            #if source is a stream, get source table
            if stream and source_table.lower() == stream.lower():
                session.sql(f"""DESCRIBE STREAM {stream};""").collect()
                source_table = pd.DataFrame(session.sql(f"""SELECT "table_name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))""").collect()).iloc[0,0]

            #get source table columns
            session.sql(f"""DESCRIBE TABLE {source_table};""").collect()
            df_source_table_clmns = pd.DataFrame(session.sql(f"""SELECT "name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))""").collect())

            source_select = f"""SELECT 
                """
            for index, row in df_source_table_clmns.iterrows():
                if index == 0:
                    source_select += f"""{row["name"]} 
                    """
                else:
                    source_select += f""",{row["name"]} 
                    """
            source_select += f"""FROM {source_table}"""
            
        #update dynamic table DDL and beautify it
        create_dt_ddl += f"{source_select};"
        create_dt_ddl = sqlparse.format(create_dt_ddl, reindent=True, keyword_case="upper")

        #check if create DT statement is valid using EXPLAIN
        try:
            explain_dt_statement = pd.DataFrame(session.sql(f"""EXPLAIN USING JSON {create_dt_ddl}""").collect()).iloc[0,0]
        except Exception as e:
            eligible_flag = False
            reason = str(e)

    #UPDATE
    if target_operation.lower() == 'update':
        eligible_flag = False
        create_dt_ddl = "N/A"
        reason = "Tasks that use UPDATE statements are not eligible to be converted to dynamic tables"
    #DELETE
    if target_operation.lower() == 'delete':
        eligible_flag = False
        create_dt_ddl = "N/A"
        reason = "Tasks that use DELETE statements are not eligible to be converted to dynamic tables"

    #EXTENSIONFUNCTION (Stored Proc)
    if target_operation.lower() == 'extensionfunction':
        eligible_flag = False
        create_dt_ddl = "N/A"
        reason = "Tasks that call Stored Prodedures are not eligible to be converted to dynamic tables"
    
    if eligible_flag:
        #append task conversion details to eligible list
        eligible_tasks.append([False, task, json.dumps(task_details, indent=2), json.dumps([share_details], indent=2), definition, create_dt_ddl])
    else:
        #append task conversion details to ineligible list
        ineligible_tasks.append([task, json.dumps(task_details, indent=2), definition, f"{task} ({task_type.capitalize()} task): {reason}"])

    return [task, task_details, [share_details], definition, create_dt_ddl]

st.success(f"Functions created 🎉")

## STEP 3: Set the database(s) where the tasks reside.

In [None]:
df_task_dbs = pd.DataFrame(session.sql(f"SHOW DATABASES").collect())
    
if not df_task_dbs.empty :
    select_task_db = st.multiselect("Select Database(s):", df_task_dbs["name"], placeholder="Choose an option", key="task_select_db")
task_dbs = str(select_task_db)

st.write("#")
st.write("#")
st.write("#")
st.write("#")
st.write("#")
st.write("#")

## STEP 4: Get all shared tables/views

This step compiles a list of all tables/views shared by your role.  Any target tables updated in existing tasks that are in the list will be flagged and optionally removed.

In [None]:
list_shares_objs = []

#show all shares
session.sql(f"""SHOW SHARES;""").collect()

#get outbound shares only
df_outbound_shares = pd.DataFrame(session.sql(f"""SELECT "name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) WHERE LOWER("kind") = 'outbound' AND LOWER("owner") = '{current_role.lower()}';""").collect())

for index, row in df_outbound_shares.iterrows():
    share = row["name"]

    try:
        #describe shares
        session.sql(f"""DESCRIBE SHARE {share};""").collect()
        
        #get shared objects
        df_shared_objs = pd.DataFrame(session.sql(f"""SELECT "name", "kind" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) 
                                                        WHERE 
                                                            LOWER("kind") IN ('table', 'view', 'materialized view') AND
                                                            SPLIT_PART("name", '.', 1) IN ({task_dbs.strip('][')});""").collect())

        #add each object to the list_obj list
        for index, row in df_shared_objs.iterrows():
            name = row["name"]
            kind = row["kind"]

            if not row.empty:
                list_shares_objs.append([share, kind, name])
    except:
        pass

#create list of shares, object types, and objs shared
list_shares = [item[0] for item in list_shares_objs]
list_obj_types = [item[1] for item in list_shares_objs]
list_objs = [item[2] for item in list_shares_objs]
   
df_shared_objs = pd.DataFrame({'share': list_shares, 'object_type': list_obj_types, 'object': list_objs} )

#show shared objects
paginate_data(df_shared_objs)

## STEP 5: Find the latest completed tasks

This step compiles a list of latest completed tasks within the last 24 hours.

In [None]:
task_history_range_list = ['Choose a Date Range', 'Last day', 'Last 7 days', 'Last 14 days']
st.write("")
st.selectbox("Select Task History Date Range:", task_history_range_list, key="sb_task_history_range")

date_time_part = ""
increment = ""
df_task_history_range = None

if st.session_state.sb_task_history_range == "Last day":
    date_time_part = "hours"
    increment = "24"
elif st.session_state.sb_task_history_range == "Last 7 days":
    date_time_part = "days"
    increment = "7"
elif st.session_state.sb_task_history_range == "Last 14 days":
    date_time_part = "days"
    increment = "14"

if st.session_state.sb_task_history_range == "Choose a Date Range":
    st.write("#")
    st.write("#")
    st.write("#")
    st.write("#")
    st.write("#")
    st.write("#")

if st.session_state.sb_task_history_range != "Choose a Date Range":

    df_task_history_range = pd.DataFrame(session.sql(f"""
                                                SELECT 
                                                    DISTINCT(DATABASE_NAME || '.' || SCHEMA_NAME || '.' || NAME) AS TASK_FQN
                                                    ,MAX(QUERY_ID) QUERY_ID
                                                    ,MAX(COMPLETED_TIME) COMPLETED_TIME
                                                    ,MAX(STATE) STATE
                                                FROM 
                                                    SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY
                                                WHERE
                                                    LOWER(DATABASE_NAME) IN (
                                                        SELECT LOWER(value) FROM TABLE(FLATTEN(INPUT => {task_dbs}))
                                                        ) AND
                                                    LOWER(STATE) = 'succeeded' AND
                                                    COMPLETED_TIME > DATEADD(hours, -24, CURRENT_TIMESTAMP())
                                                GROUP BY
                                                    TASK_FQN
                                                ;
                                                """).collect())

    #preview dataframe
    st.write("")
    st.subheader(f"Latest successful tasks ({st.session_state.sb_task_history_range})")
    st.dataframe(df_task_history_range, hide_index=True, use_container_width=True)

## STEP 6: Get task details

This step: compiles a list of tasks eligible to be converted to dynamic tables (currently only those with `MERGE` and `INSERT` (from a base table) operations), along with whether the target table is included in a data share.

In [None]:
#for each task, check if it's either a single task or the root task in a multi-task graph
list_eligible_tasks = []
list_ineligible_tasks = []

for index, row in df_task_history_range.iterrows():
    task = row["TASK_FQN"]
    query_id = row["QUERY_ID"]
    completed_time = row["COMPLETED_TIME"]
    state = row["STATE"]
    
    task_db = task.split(".")[0]
    task_sch = task.split(".")[1]
    task_name = task.split(".")[2]

    #check if task hasn't been dropped within the last 24 hours
    session.sql(f"""SHOW TASKS IN {task_db}.{task_sch}""").collect()
    
    #get outbound shares only
    df_tasks_not_dropped = pd.DataFrame(session.sql(f"""SELECT "name" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) WHERE LOWER("name") = '{task_name.lower()}';""").collect())

    #if the task no longer exists, skip to the next task in df_task_history_range
    if df_tasks_not_dropped.empty:
        continue
    
    df_tasks_dependents = pd.DataFrame(session.sql(f"""
                                                    SELECT
                                                        CREATED_ON
                                                        ,DATABASE_NAME || '.' || SCHEMA_NAME || '.' || NAME AS TASK_FQN
                                                        ,OWNER
                                                        ,WAREHOUSE
                                                        ,SCHEDULE
                                                        ,PREDECESSORS
                                                        ,DEFINITION
                                                        ,CONDITION
                                                    FROM 
                                                        TABLE({task_db}.INFORMATION_SCHEMA.TASK_DEPENDENTS(task_name => '{task.upper()}'))
                                                    ;
                                                    """).collect())

    #CONDITION 1 (1-task graph): the task is the only one in the task graph and does not have any predecessors      
    #CONDITION 2 (multi-task graph): the first NAME in df_tasks_dependents = TASK AND PREDECESSORS = [], then it's the root task, with at least one child
    if df_tasks_dependents.iloc[0,1] == task and df_tasks_dependents.iloc[0,5] == "[]":
        root_task = df_tasks_dependents.iloc[0,1]
        list_multi_eligible_tasks = []
        list_multi_ineligible_tasks = []
        
        list_task_graph_definitions = []
        list_child_tasks = []
        list_task_graph_child_objs_details = []
        list_task_graph_child_streams_details = []
        list_task_graph_share_details = []
        list_ineligible_tasks_reasons = []
        list_create_dt_ddl = []

        root_task_details = None
        
        for index, row in df_tasks_dependents.iterrows():
            converted_task = None
            task_type = "root" if index == 0 else "child"
            definition = sqlparse.format(df_tasks_dependents.iloc[index,6], reindent=True, keyword_case="upper")

            dict_task_settings = {}
            dict_task_settings.update({"task" : f"{df_tasks_dependents.iloc[index,1]}"})
            dict_task_settings.update({"task_type" : f"{task_type}"})
            dict_task_settings.update({"root_task" : f"{task}"}) 
            dict_task_settings.update({"warehouse" : f"{df_tasks_dependents.iloc[index,3]}"})   

            #for child tasks, use root task schedule
            schedule = df_tasks_dependents.iloc[index,4] if df_tasks_dependents.iloc[index,4] else df_tasks_dependents.iloc[0,4]
            dict_task_settings.update({"schedule" : f"{schedule}"}) 
            
            dict_task_settings.update({"step" : index+1}) 
            dict_task_settings.update({"predecessors" : f"{df_tasks_dependents.iloc[index,5]}"})
            dict_task_settings.update({"definition" : f"""{definition}"""})
            dict_task_settings.update({"condition" : f"{df_tasks_dependents.iloc[index,7]}"})

            #append DDL to list_task_graph_definitions
            list_task_graph_definitions.append(definition)

            #get operator attributes for the query_id
            query_id = df_task_history_range.query(f"""TASK_FQN == '{df_tasks_dependents.iloc[index,1]}'""").iloc[0,1]
            oper_attr = operator_attributes(query_id, dict_task_settings)
    
            if oper_attr.lower() == "success":
                if len(df_tasks_dependents) == 1:
                    #call convert tasks
                    converted_task = convert_task(dict_task_settings, df_shared_objs, list_objs, list_eligible_tasks, list_ineligible_tasks)
                if len(df_tasks_dependents) > 1:
                    #call convert tasks
                    converted_task = convert_task(dict_task_settings, df_shared_objs, list_objs, list_multi_eligible_tasks, list_multi_ineligible_tasks)
    
                    #set root_task_details
                    if index == 0: root_task_details = converted_task[1]

                    #add child object
                    if task_type == "child":
                        #update root task details
                        list_task_graph_child_objs_details.append(dict_task_settings["target_table"])
                        root_task_details.update({"child_objects": list_task_graph_child_objs_details})
                        
                        if converted_task[1]["stream"] != "":
                            list_task_graph_child_streams_details.append(converted_task[1]["stream"])
                            root_task_details.update({"child_streams": list_task_graph_child_streams_details})

                        list_child_tasks.append(dict_task_settings["task"])
                        root_task_details.update({"child_tasks": list_child_tasks})
    
                    #add to list_task_graph_share_details
                    list_task_graph_share_details.append(converted_task[2][0])

        if len(df_tasks_dependents) > 1:        
            #if any of the tasks in the task graph are ineligible, add the root/child task to list_ineligible_tasks, along with offending task and reason
            if list_multi_ineligible_tasks:
                for ie_task in list_multi_ineligible_tasks:
                    reason = ie_task[3]
                    list_ineligible_tasks_reasons.append(f"{reason}")
    
                #append root task to ineligible list
                list_ineligible_tasks.append([root_task, json.dumps(root_task_details, indent=2), "\n\n".join(list_task_graph_definitions), "\n\n".join(list_ineligible_tasks_reasons)])
    
            if list_multi_eligible_tasks and not list_multi_ineligible_tasks:
                root_schedule = ""
                #append the successful DT DDL
                for idx, e_task in enumerate(list_multi_eligible_tasks):
                    dt_ddl = e_task[5]
                    target_table = json.loads(e_task[2])["target_table"]

                    #get the root task's schedule (the root task should always be the first in the eligible list, but check to be sure)
                    if idx == 0 and json.loads(e_task[2])["task_type"].lower() == "root":
                        root_schedule = json.loads(e_task[2])["schedule"]

                    if len(list_multi_eligible_tasks) > 1:
                        #if this table is referenced downstream, set target_lag to DOWNSTREAM, else use the root task's schedule
                        if (idx < len(list_multi_eligible_tasks) - 1) and (target_table.lower() in list_multi_eligible_tasks[idx+1][5].lower()):
                            dt_ddl = re.sub(r"(?<=TARGET_LAG\s=\s)(\'.*\')", "DOWNSTREAM", dt_ddl)
                        else:
                            dt_ddl = re.sub(r"(?<=TARGET_LAG\s=\s)(\'.*\')", f"'{root_schedule}'", dt_ddl)

                    list_create_dt_ddl.append(dt_ddl)
                    
                #append root task to eligible list
                list_eligible_tasks.append([False, root_task, json.dumps(root_task_details, indent=2), json.dumps(list_task_graph_share_details, indent=2), "\n\n".join(list_task_graph_definitions), "\n\n".join(list_create_dt_ddl)])
    
st.write("")
st.subheader("Ineligible Root Tasks:")
st.write("The following tasks cannot be converted to Dynamic Tables")

#create a dataframe from list_ineligible_tasks
df_inelibible_task_clmns = ['Root Task'
                             ,'Task Details'
                             ,'Task Graph DDL'
                             ,'Reason(s)'
                            ]

df_inelibible_task = pd.DataFrame(list_ineligible_tasks, columns = df_inelibible_task_clmns)

#dynamically set data_editor height, based on number of rows in data frame
de_inelibible_task_height = int((len(df_inelibible_task) + 1.5) * 35 + 3.5)

de_inelibible_task = st.dataframe(
    df_inelibible_task
    ,height=de_inelibible_task_height
    ,hide_index=True
    ,use_container_width=True
)

st.write("#")
st.subheader("Eligible Root Tasks:")
st.write("Please choose task(s) to convert, using the `Convert` checkbox.  Any task selected will be converted in Step 6.")

#create a dataframe from list_eligible_tasks
df_convert_task_clmns = ['Convert'
                         ,'Root Task'
                         ,'Task Details'
                         ,'Shared Objects'
                         ,'Task Graph DDL'
                         ,'Dynamic Table DDL'
                        ]

df_convert_task = pd.DataFrame(list_eligible_tasks, columns = df_convert_task_clmns)

#dynamically set data_editor height, based on number of rows in data frame
de_convert_task_height = int((len(df_convert_task) + 1.5) * 35 + 3.5)

de_convert_task = st.data_editor(
    df_convert_task
    ,height=de_convert_task_height
    ,disabled=('Root Task','Task Details','Shared Objects','Task Graph DDL','Dynamic Table DDL')
    ,hide_index=True
    ,use_container_width=True
    ,num_rows="fixed"
)

## STEP 7: Convert tasks (optional)

This step converts the chosen root/child tasks from STEP 6 to dynamic tables

In [None]:
df_selected_tasks = de_convert_task.query('Convert == True')

flag_disable_convert_btn = True

if True in set(de_convert_task['Convert']):
    flag_disable_convert_btn = False

btn_convert = st.button("Convert", disabled=flag_disable_convert_btn, type="primary")

if btn_convert:
    for index, row in df_selected_tasks.iterrows():
        ddl = row["Dynamic Table DDL"]

        for stmt in ddl.rstrip(';').split(";"):
            #get dt table name
            dt =  re.search(r"(?<=CREATE OR REPLACE DYNAMIC TABLE\s)(.*?)(?=\s+TARGET_LAG)", stmt).group(1)
            
            #create dynamic table(s)
            session.sql(f"""{stmt}""").collect()
            st.success(f"Dynamic Table: {dt} successfully created 🎉")

## STEP 8: Cleanup (optional)

This step can perform the following:
- removes target table(s) from any shares, including tables updated via child tasks
- suspends and drops the existing root/child tasks
- drops the stream associated with each root/chid task, if applicable
- drops the target table and tables updated via child tasks

In [None]:
flag_disable_cleanup_btn = True
flag_disable_checkbox = True

if (True in set(de_convert_task['Convert'])):
    flag_disable_checkbox = False


st.checkbox("Remove target table(s) from shares", key="cb_remove_from_share", disabled=flag_disable_checkbox)
st.checkbox("Suspend and drop existing task(s)", key="cb_drop_task", disabled=flag_disable_checkbox)
st.checkbox("Drop stream(s)", key="cb_drop_stream", disabled=flag_disable_checkbox)
st.checkbox("Drop target table(s)", key="cb_drop_target_table", disabled=flag_disable_checkbox)

if (True in set(de_convert_task['Convert'])) and (st.session_state["cb_remove_from_share"] or 
    st.session_state["cb_drop_task"] or 
    st.session_state["cb_drop_stream"] or
    st.session_state["cb_drop_target_table"]):
    
    flag_disable_cleanup_btn = False

btn_cleanup = st.button("Cleanup", disabled=flag_disable_cleanup_btn, type="primary")

if btn_cleanup:
    list_tbls_drop = []
    for index, row in df_selected_tasks.iterrows():
        shared_objs = json.loads(row["Shared Objects"])
        root_task = row["Root Task"]

        if st.session_state["cb_remove_from_share"]:
            #REMOVE FROM SHARE(S)
            for dict_obj in shared_objs:
                flag_shared = dict_obj["target_shared"]
    
                if flag_shared.lower() == 'y':
                    obj = dict_obj["object"]
                    obj_type = dict_obj["object_type"]
                    list_share = dict_obj["shares"]
    
                    #add obj/type to list
                    list_tbls_drop.append([obj, obj_type])
    
                    for share in list_share:
                        #remove obj from share
                        session.sql(f"""REVOKE SELECT ON {obj_type} {obj} FROM SHARE {share}""").collect()
                        st.success(f"{obj_type} {obj} successfully removed from share: {share}. 🎉")
                else:
                    st.warning(f"Parent/child objects for root task {root_task} are not currently shared", icon="⚠️")


        if st.session_state["cb_drop_task"]:
            #SUSPEND AND DROP TASKS
            list_drop_tasks = []
            
            #append root task
            list_drop_tasks.append(row["Root Task"])
            
            #append child tasks
            child_tasks = json.loads(row["Task Details"])["child_tasks"]
            
            if child_tasks:
                list_drop_tasks.extend(child_tasks)
                
            for task in reversed(list_drop_tasks):
                session.sql(f"""ALTER TASK {task} SUSPEND""").collect()
                session.sql(f"""DROP TASK {task}""").collect()
                st.success(f"Task: {task} successfully dropped. 🎉")

        
        if st.session_state["cb_drop_stream"]:
            #DROP STREAMS
            list_drop_streams = []
            
            #append root stream
            list_drop_streams.append(json.loads(row["Task Details"])["stream"])
    
            #append child streams
            child_streams = json.loads(row["Task Details"])["child_streams"]
            
            if child_streams:
                list_drop_streams.extend(child_streams)

                for stream in reversed(list_drop_streams):
                    session.sql(f"""DROP STREAM {stream}""").collect()
                    st.success(f"Stream: {stream} successfully dropped. 🎉")
            else:
                st.warning(f"There are no streams for the root task {root_task} or its child tasks", icon="⚠️")
        
        if st.session_state["cb_drop_target_table"]:
            #DROP PARENT/CHILD OBJECTS
            list_drop_objs = []
    
            #append root object
            list_drop_objs.append(json.loads(row["Task Details"])["target_table"])
    
            #append child streams
            child_objs = json.loads(row["Task Details"])["child_objects"]
    
            if child_objs:
                list_drop_objs.extend(child_objs)
                
            for obj in reversed(list_drop_objs):
                session.sql(f"""DROP TABLE {obj}""").collect()
                st.success(f"Table: {obj} successfully dropped. 🎉")