In [None]:
import json

load_dependencies_json_string = """{
  "RDV_PIT_CREDITCARD_Loader": [
    "RDV_SAT_CREDITCARD_Loader"
  ],
  "RDV_PIT_CUSTOMER_Loader": [
    "RDV_SAT_CUSTOMER_Loader",
    "RDV_SAT_CUSTOMERCRM_Loader"
  ],
  "RDV_PIT_ORDER_Loader": [
    "RDV_SAT_ORDER_Loader",
    "RDV_SAT_ORDER_CURRENCY_Loader"
  ],
  "RDV_PIT_ORDERDETAIL_Loader": [
    "RDV_SAT_ORDERDETAIL_Loader"
  ],
  "RDV_HUB_CREDITCARD_Loader": [
    "begin"
  ],
  "RDV_SAT_CREDITCARD_Loader": [
    "begin"
  ],
  "RDV_ReferenceTable_CURRENCY_Loader": [
    "begin"
  ],
  "RDV_HUB_CUSTOMER_Loader": [
    "begin"
  ],
  "RDV_SAT_CUSTOMER_Loader": [
    "begin"
  ],
  "RDV_LNK_CUSTOMER_ORDER_Loader": [
    "begin"
  ],
  "RDV_SAT_CUSTOMERCRM_Loader": [
    "begin"
  ],
  "RDV_HUB_ORDER_Loader": [
    "begin"
  ],
  "RDV_SAT_ORDER_Loader": [
    "begin"
  ],
  "RDV_LNK_ORDER_CREDITCARD_Loader": [
    "begin"
  ],
  "RDV_SAT_ORDER_CURRENCY_Loader": [
    "begin"
  ],
  "RDV_LNK_ORDER_ORDERDETAIL_Loader": [
    "begin"
  ],
  "RDV_HUB_ORDERDETAIL_Loader": [
    "begin"
  ],
  "RDV_SAT_ORDERDETAIL_Loader": [
    "begin"
  ],
  "end": [
    "RDV_HUB_CREDITCARD_Loader",
    "RDV_PIT_CREDITCARD_Loader",
    "RDV_ReferenceTable_CURRENCY_Loader",
    "RDV_HUB_CUSTOMER_Loader",
    "RDV_PIT_CUSTOMER_Loader",
    "RDV_LNK_CUSTOMER_ORDER_Loader",
    "RDV_HUB_ORDER_Loader",
    "RDV_PIT_ORDER_Loader",
    "RDV_LNK_ORDER_CREDITCARD_Loader",
    "RDV_LNK_ORDER_ORDERDETAIL_Loader",
    "RDV_HUB_ORDERDETAIL_Loader",
    "RDV_PIT_ORDERDETAIL_Loader"
  ]
}"""

load_dependencies = json.loads(load_dependencies_json_string)



In [None]:
from graphlib import TopologicalSorter
import threading
from threading import Lock, Thread
import queue

from pyspark.sql.functions import explode_outer

from datetime import datetime as dt
import time


#########################################################################
# properties to customize
bg_loadtimestamp = dt.now()
bg_loadtimestamp_str = bg_loadtimestamp.strftime('%Y-%m-%d %H:%M:%S.%f')
bg_loadtimestamp_dt = dt.strptime(bg_loadtimestamp_str, '%Y-%m-%d %H:%M:%S.%f')

threads_amount = 4
#########################################################################

# lock for writing print messages out of threads, otherwise they can appear on the same line
LOCK = Lock()

topological_sorter = TopologicalSorter()

task_queue = queue.Queue()
finalized_tasks_queue = queue.Queue()

failed_tasks = []

dependencies = {}

stop_threads = True

def stop_existing_threads():
    global stop_threads
    for thread in threading.enumerate():
        if thread.name.startswith('worker_'):
            info('all', f'waiting for already running thread: {thread.name} to stop')
            thread.join()
            info('all', f'already running thread: {thread.name} completed')
    stop_threads = False

def clear_queue(queue):
    while not queue.empty():
        queue.get_nowait()

def setup_dependencies():
 
    for task_name, dependent_on_tasks in load_dependencies.items():

        for dependent_on_task_name in dependent_on_tasks:
            topological_sorter.add(task_name, dependent_on_task_name)      

            # prepare dependencies for beeing able to deactivate dependent objects on errors
            if dependent_on_task_name not in dependencies:
               dependencies[dependent_on_task_name] = []

            dependencies[dependent_on_task_name].append(task_name)

def info(task_name, message):
    log_message('INFO', task_name, message)

def error(task_name, message):
    log_message('ERROR', task_name, message)

def log_message(log_level, task_name, message):
    log_df = spark.createDataFrame([(dt.now(), log_level, 'multi-threading orchestrator', 'C3-X-DB-DV', '{loadcontrol#loadcontrol#application_name}', '{loadcontrol#loadcontrol#application_environment_name}', bg_loadtimestamp_dt, '', task_name, '', '', '',  message)], ['log_timestamp', 'log_level', 'execution_unit', 'project_name', 'application_name', 'application_environment_name', 'load_timestamp', 'statement_name', 'task_name', 'target_object_database_name', 'target_object_schema_name', 'target_object_name', 'message'])
    log_df.write.format('delta').mode('append').saveAsTable('`{loadcontrol#loadcontrol#database_name}`.`{loadcontrol#loadcontrol#schema_name}`.`{loadcontrol#loadcontrol#log_table_name}`')

    with LOCK:
        print(f"{dt.now().strftime('%Y/%m/%d, %H:%M:%S')} - {task_name}: {message}")


def set_task_failed(task_name, root_task_name):
    failed_tasks.append(task_name)
    if task_name == root_task_name:
        info(task_name, f'disabling task {task_name} and all its depending tasks (successors)')
    else:
        info(task_name, f'disabling task {task_name} as successor of {root_task_name}')
    successors = dependencies.get(task_name, [])
    for successor in successors:
        if successor not in failed_tasks:
            set_task_failed(successor, root_task_name)

def worker():
    while not stop_threads:
        item = task_queue.get()
        
        if item is None and not stop_threads:
            info('worker', 'Termination signal received. Exiting thread.')
            task_queue.task_done()
            break
        
        if item != 'begin' and item != 'end' and item not in failed_tasks and not stop_threads:
            try:
                if not stop_threads:
                    info(item, 'starting task')
                    return_value = dbutils.notebook.run(f'{notebook_orchestration#notebook_orchestration#loader_folder_path}{item}', {notebook_orchestration#notebook_orchestration#run_notebook_timeout}, {'bg_loadtimestamp': bg_loadtimestamp_str})
                    info(item, return_value)
                    info(item, 'finished task')
            except Exception as e:
                if 'FAILED: Unable to access the notebook' in str(e):
                    error(item, f'task not found: {e}')
                info(item, 'task failed')
                set_task_failed(item, item)        

        else:
            info(item, 'task skipped, nothing to do here')
        
        if not stop_threads:
            task_queue.task_done()
            finalized_tasks_queue.put(item)

stop_existing_threads()
clear_queue(task_queue)
clear_queue(finalized_tasks_queue)
failed_tasks.clear()

setup_dependencies()

threads = list()

for i in range(threads_amount):
    thread = Thread(name=f'worker_{i}', target=worker, daemon=True)
    threads.append(thread)

for thread in threads:
    thread.start()

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)
    
for thread in threads:
    task_queue.put(None)

task_queue.join()

for thread in threads:
    thread.join()

info('all', 'work completed')

return_value = 0
if len(failed_tasks) > 0:
    return_value = 1

dbutils.notebook.exit(return_value)

