In [None]:
import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
from nbconvert.preprocessors.execute import CellExecutionError
import time
import logging
import os
import asyncio
from queue import Queue
from threading import Thread
import glob
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
# Cell 2: Notebook Execution Function
def run_notebook(notebook_path, timeout=3600):  # Default timeout set to 1 hour
    with open(notebook_path) as f:
        nb = nbformat.read(f, as_version=4)
    
    ep = ExecutePreprocessor(timeout=timeout, kernel_name='python3')
    
    try:
        start_time = time.time()
        ep.preprocess(nb, {'metadata': {'path': os.path.dirname(notebook_path)}})
        end_time = time.time()
        logging.info(f"Notebook {notebook_path} executed successfully in {end_time - start_time:.2f} seconds")
    except CellExecutionError as e:
        logging.error(f"Error executing the notebook {notebook_path}")
        logging.error(str(e))
        return False
    
    return True

In [None]:
# Cell 3: Notebook List Management Functions
def get_notebook_list():
    return sorted(glob.glob("*.ipynb"))

def display_and_edit_notebooks(notebooks):
    while True:
        print("\nCurrent list of notebooks to run:")
        for i, notebook in enumerate(notebooks, 1):
            print(f"{i}. {notebook}")
        
        action = input("\nEnter the number of a notebook to remove, 'a' to add a notebook, or 'r' to run with the current list: ").lower()
        
        if action == 'r':
            return notebooks
        elif action == 'a':
            new_notebook = input("Enter the filename of the notebook to add: ")
            if new_notebook.endswith('.ipynb') and os.path.exists(new_notebook):
                notebooks.append(new_notebook)
                notebooks.sort()
            else:
                print("Invalid notebook filename. Please try again.")
        elif action.isdigit() and 1 <= int(action) <= len(notebooks):
            removed = notebooks.pop(int(action) - 1)
            print(f"Removed {removed} from the list.")
        else:
            print("Invalid input. Please try again.")

In [None]:
# Cell 4: ETL Pipeline Runner
def run_etl_pipeline(notebooks):
    for notebook in notebooks:
        logging.info(f"Starting execution of {notebook}")
        success = run_notebook(notebook)
        if not success:
            user_input = input(f"Error occurred in {notebook}. Enter 'c' to continue, 'r' to retry, or 'q' to quit: ")
            if user_input.lower() == 'c':
                continue
            elif user_input.lower() == 'r':
                success = run_notebook(notebook)
                if not success:
                    logging.error(f"Retry failed for {notebook}. Stopping execution.")
                    break
            else:
                logging.info("Stopping execution as requested.")
                break
    
    logging.info("ETL pipeline execution completed.")

In [None]:
# Cell 5: Main Execution
# Run this cell to start the ETL pipeline
etl_notebooks = get_notebook_list()

In [None]:
final_notebooks = display_and_edit_notebooks(etl_notebooks)

In [None]:
run_etl_pipeline(final_notebooks)