In [None]:
import pyLDAvis.gensim  # Library for interactive topic model visualization
import torch  # PyTorch library for deep learning and GPU acceleration
from torch.utils.data import DataLoader  # Provides an iterator over a dataset for efficient batch processing
from tqdm import tqdm  # Creates progress bars to visualize the progress of loops or tasks
from gensim.models import LdaModel  # Implements LDA for topic modeling using the Gensim library
from gensim.corpora import Dictionary  # Represents a collection of text documents as a bag-of-words corpus
from gensim.models import CoherenceModel  # Computes coherence scores for topic models

import pickle
import os  # Provides functions for interacting with the operating system, such as creating directories
import itertools  # Provides various functions for efficient iteration and combination of elements
import numpy as np  # Library for numerical computing in Python, used for array operations and calculations
from time import time, sleep # Measures the execution time of code snippets or functions
import pprint as pp  # Pretty-printing library, used here to format output in a readable way
import pandas as pd
import logging # Logging module for generating log messages
import sys # Provides access to some variables used or maintained by the interpreter and to functions that interact with the interpreter 
import shutil # High-level file operations such as copying and removal 
import zipfile # Provides tools to create, read, write, append, and list a ZIP file
from tqdm.notebook import tqdm  # Creates progress bars in Jupyter Notebook environment
import json
import random
import logging
import csv
from dask.distributed import as_completed


In [None]:

# Define the range of number of topics for LDA and step size
START_TOPICS = 1
END_TOPICS = 2
STEP_SIZE = 1

# define the decade that is being modelled 
DECADE = '2010s'

# In the case of this machine, since it has an Intel Core i9 processor with 8 physical cores (16 threads with Hyper-Threading), 
# it would be appropriate to set the number of workers in Dask Distributed LocalCluster to 8 or slightly lower to allow some CPU 
# resources for other tasks running on your system.
CORES = 6

THREADS_PER_CORE = 1

RAM_MEMORY_LIMIT = "100GB" 

# Specify the local directory path
DASK_DIR = '/_harvester/tmp-dask-out'

# specify the number of passes for Gensim LdaModel
PASSES = 15

# specify the number of iterations
ITERATIONS = 50

# specify the chunk size for LdaModel object
CHUNKSIZE = 4000

# Number of documents to process per iteration
BATCH_SIZE = 100


# Load data from the JSON file
DATA_SOURCE = "C:/_harvester/data/tokenized-sentences/10s/tokenized_sents-w-bigrams.json"
TRAIN_RATIO = 0.8

TIMEOUT = 90

EXTENDED_TIMEOUT = 120

In [None]:


# create folder structure
LOG_DIR = f"C:/_harvester/data/lda-models/{DECADE}_html/log/"
MODEL_DIR = f"C:/_harvester/data/lda-models/2010s_html/train-eval-data/"
IMAGE_DIR = f"C:/_harvester/data/lda-models/{DECADE}_html/visuals/"

# Check if the directories exist and contain data
if os.path.exists(LOG_DIR) and os.path.exists(MODEL_DIR) and os.path.exists(IMAGE_DIR):
    log_files = os.listdir(LOG_DIR)
    model_files = os.listdir(MODEL_DIR)
    image_files = os.listdir(IMAGE_DIR)

    # Check if the directories are not empty
    if log_files or model_files or image_files:
        # Find an available filename for the archive
        counter = 0
        while True:
            archive_file = f"C:/_harvester/data/lda-models/{DECADE}_html/archive{counter:04d}.zip"
            if not os.path.exists(archive_file):
                break
            counter += 1

        # Create the zip file for archiving existing folders
        with zipfile.ZipFile(archive_file, 'w') as zipf:
            # Add log files to the zip file
            for log_file in log_files:
                zipf.write(os.path.join(LOG_DIR, log_file), arcname=os.path.join("log", log_file))
            
            # Add model files to the zip file
            for model_file in model_files:
                zipf.write(os.path.join(MODEL_DIR, model_file), arcname=os.path.join("model", model_file))
            
            # Add image files to the zip file
            for image_file in image_files:
                zipf.write(os.path.join(IMAGE_DIR, image_file), arcname=os.path.join("image", image_file))

        # Remove existing subdirectories after archiving them
        for subdir in [LOG_DIR, MODEL_DIR, IMAGE_DIR]:
            if os.path.exists(subdir):
                subfiles = os.listdir(subdir)
                for subfile in subfiles:
                    filepath = os.path.join(subdir, subfile)
                    if os.path.isdir(filepath):
                        os.rmdir(filepath)

# Create fresh directories for the new run
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(IMAGE_DIR, exist_ok=True)


In [None]:
# Configure root logger level (this will affect all loggers unless overridden)
logging.getLogger().setLevel(logging.ERROR)

# Create a file handler that logs messages to a file.
file_handler = logging.FileHandler('C:/_harvester/dask-logs/dask-logs.log')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# Get the logger for distributed.utils_perf and add the file handler.
perf_logger = logging.getLogger('distributed.utils_perf')
perf_logger.addHandler(file_handler)
perf_logger.setLevel(logging.INFO)  # Adjust this level as needed

# Remove all handlers associated with the root logger (including default StreamHandler)
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

In [None]:
# Dask dashboard throws deprecation warnings w.r.t. Bokeh
import warnings
from bokeh.util.deprecation import BokehDeprecationWarning

# Disable Bokeh deprecation warnings
warnings.filterwarnings("ignore", category=BokehDeprecationWarning)
# Filter out the specific warning message
warnings.filterwarnings("ignore", module="distributed.utils_perf")

In [None]:


# The parameter `alpha` in Latent Dirichlet Allocation (LDA) represents the concentration parameter of the Dirichlet 
# prior distribution for the topic-document distribution.
# It controls the sparsity of the resulting document-topic distributions.

# A lower value of `alpha` leads to sparser distributions, meaning that each document is likely to be associated with fewer topics.
# Conversely, a higher value of `alpha` encourages documents to be associated with more topics, resulting in denser distributions.

# The choice of `alpha` affects the balance between topic diversity and document specificity in LDA modeling.
alpha_values = np.arange(0.01, 1, 0.3).tolist()
alpha_values += ['symmetric', 'asymmetric']

# In Latent Dirichlet Allocation (LDA) topic analysis, the beta parameter represents the concentration 
# parameter of the Dirichlet distribution used to model the topic-word distribution. It controls the 
# sparsity of topics by influencing how likely a given word is to be assigned to a particular topic.

# A higher value of beta encourages topics to have a more uniform distribution over words, resulting in more 
# general and diverse topics. Conversely, a lower value of beta promotes sparser topics with fewer dominant words.

# The choice of beta can impact the interpretability and granularity of the discovered topics in LDA.
beta_values = np.arange(0.01, 1, 0.3).tolist()
beta_values += ['symmetric']


In [None]:
def futures_create_lda_datasets(filename, train_ratio):
    # Get the file size in bytes
    file_size = os.path.getsize(filename)

    # Get the last modified timestamp of the file
    last_modified = os.path.getmtime(filename)

    # Print the metadata
    print("\nFile Metadata:")
    print(f"Filename: {filename}")
    print(f"Size: {file_size} bytes")
    print(f"Last Modified: {last_modified}\n")
    
    with open(filename, 'r') as jsonfile:
        data = json.load(jsonfile)
    
    num_samples = len(data)  # Count the total number of samples
    num_train_samples = int(num_samples * train_ratio)  # Calculate the number of samples for training
    
    # Shuffle the data
    random.shuffle(data)

    train_data = data[:num_train_samples]  # Assign a portion of data for training
    eval_data = data[num_train_samples:]  # Assign the remaining data for evaluation

    print(f"Number of training samples: {len(train_data)}")
    print(f"Number of eval samples: {len(eval_data)}")

    # Create delayed objects for train and eval datasets
    future_train_data = dask.delayed(train_data)
    future_eval_data = dask.delayed(eval_data)

    return future_train_data, future_eval_data


In [None]:
def save_model_and_log(model_data, n_topics, alpha, beta, lda_model, model_dir, log_dir, train_or_eval=None):
    # Normalize alpha and beta values into strings suitable for filenames
    alpha_str = '_'.join(map(str, alpha)) if isinstance(alpha, list) else str(alpha)
    beta_str = '_'.join(map(str, beta)) if isinstance(beta, list) else str(beta)

    # Construct a unique filename for each model using its parameters
    if train_or_eval:
        filename = f"train-lda_model_topics{n_topics}_alpha{alpha_str}_beta{beta_str}.model"
    else:
        filename = f"eval-lda_model_topics{n_topics}_alpha{alpha_str}_beta{beta_str}.model"

    # Ensure that any special characters are removed or replaced in filename components
    filename = filename.replace('.', 'p').replace('/', '-').replace('\\', '-')

    filepath = os.path.join(model_dir, filename)

    # Save the model
    lda_model.save(filepath)

    # Specify the filename for the CSV file
    if train_or_eval:
        csv_filename = "train-lda-model-train-data.csv"
    else:
        csv_filename = 'eval-lda-model-train-data.csv'

    csv_path = os.path.join(log_dir, csv_filename)

    # Check if the CSV file exists
    file_exists = os.path.isfile(csv_path)

    # Write or append data to the CSV file using a context manager
    with open(csv_path, mode='a', newline='') as csvfile:
        writer = csv.writer(csvfile)

        # Write header row if file doesn't exist
        if not file_exists:
            writer.writerow(model_data.keys())

        # Append data rows
        writer.writerow(model_data.values())

In [None]:

"""
This method trains a Latent Dirichlet Allocation (LDA) model using the Gensim library. Here is a breakdown of the steps involved:

    (1)The method takes in parameters such as the number of topics (n_topics), alpha and beta hyperparameters, data (a list of documents), 
        and train_eval (a boolean indicating whether it's training or evaluation).

    (2)If train_eval is True, a logging configuration is set up to log training information to a file named "train-model.log". 
        Otherwise, it logs to "eval-model.log".

    (3) Two empty lists, combined_corpus and combined_text, are initialized to store the combined corpus and text.

    (4) The number of passes for training the LDA model is set to 11.

    (5) A loop iterates over each document in the data list. Inside the loop:
            - A Gensim Dictionary object is created from the current document.
            - The document is converted into a bag-of-words representation using doc2bow().
            - A PerplexityMetric object is created to track perplexity during training.
            - If combined_text is empty, indicating that it's the first iteration:
                The initial LDA model is trained using LdaModel() with parameters such as corpus, 
                id2word (the dictionary), num_topics, alpha, beta, random_state, passes, iterations, chunksize, and per_word_topics.

            - Otherwise:
                The existing LDA model is updated with new data using lda_model_gensim.update(corpus).
                The current document's text and corpus are added to combined_text and combined_corpus respectively.

    (6) Logging is shut down.

    (7) Finally, the trained LDA model (lda_model_gensim), combined_corpus, and combined_text are returned.
"""
def train_model(n_topics: int, alpha: list, beta: list, data: list, chunksize=BATCH_SIZE):
    combined_corpus = []  # Initialize list to store combined corpus
    combined_text = []
    
    # Convert the Delayed object to a Dask Bag and compute it to get the actual data
    streaming_documents = data.compute()
    
    # Load or create a dictionary outside the loop to track word IDs across batches
    dictionary_global = Dictionary()

    num_documents = len(streaming_documents)
    
    model_data = {
        'text': [],  # combined_text,
        'convergence': [],  # convergence_score,
        'perplexity': [],  # perplexity_score,
        'coherence': [],  # coherence_score,
        'dictionary': [],  # dictionary_global,
        'topics': [],  # n_topics,
        'alpha': [],  # alpha,
        'beta': []  # beta ,
    }

    batch_size = chunksize  # Number of documents to process per iteration

    for start_index in range(0, num_documents, batch_size):
        end_index = min(start_index + batch_size, num_documents)

        batch_documents = streaming_documents[start_index:end_index]

        for texts_out in batch_documents:
            dictionary_global.add_documents([texts_out])
            corpus_single_doc = [dictionary_global.doc2bow(texts_out)]

            lda_model_gensim = LdaModel(corpus=corpus_single_doc,
                                        id2word=dictionary_global,
                                        num_topics=n_topics,
                                        alpha=alpha,
                                        eta=beta,
                                        random_state=75,
                                        passes=PASSES,
                                        iterations=ITERATIONS,
                                        chunksize=CHUNKSIZE,
                                        per_word_topics=True)

            combined_text.extend(texts_out)
            combined_corpus.extend(corpus_single_doc)

            convergence_score = lda_model_gensim.bound(corpus_single_doc)

            try:
                perplexity_score = lda_model_gensim.log_perplexity(corpus_single_doc)
            except RuntimeWarning:
                perf_logger.warn("Perplexity calculation encountered an overflow error. Setting perplexity_score = float('inf')")
                perplexity_score = float('inf')

            with np.errstate(divide='ignore', invalid='ignore'):
                coherence_model = CoherenceModel(model=lda_model_gensim, texts=texts_out, dictionary=dictionary_global, coherence='c_v')
                coherence_score = coherence_model.get_coherence()

            model_data['text'].append(combined_text)
            model_data['convergence'].append(convergence_score)
            model_data['perplexity'].append(perplexity_score)
            model_data['coherence'].append(coherence_score)
            model_data['dictionary'].append(dictionary_global)
            model_data['topics'].append(n_topics)
            model_data['alpha'].append(alpha)
            model_data['beta'].append(beta)

    # Verify that combined_text contains all the original text
    original_tokens = sum((len(doc) for doc in streaming_documents), 0)
    try:
        assert len(combined_text) == original_tokens, "Combined text does not yet contain all the original text"
    except AssertionError as e:
        perf_logger.info(str(e))
    
    return (model_data, n_topics, alpha, beta, lda_model_gensim,)

In [None]:
# Training models
def callback_train(future):
    try:
        # Retrieve the result of the training future
        result_train = future.result()
        # Save the trained model and log data immediately
        save_model_and_log(*result_train[:], model_dir=MODEL_DIR, log_dir=LOG_DIR, train_or_eval=True)
    except Exception as e:
        print(f"Error occurred during training: {e}")
    else:
        # Update progress bar after saving each model (only if no exception occurred)
        progress_bar.update(1)

# Evaluation models
def callback_eval(future):
    try:
        # Retrieve the result of the evaluation future
        result_eval = future.result()
        # Save the evaluation model and log data immediately
        save_model_and_log(*result_eval[:], model_dir=MODEL_DIR, log_dir=LOG_DIR, train_or_eval=False)
    except Exception as e:
        print(f"Error occurred during evaluation: {e}")
    else:
        # Update progress bar after saving each model (only if no exception occurred)
        progress_bar.update(1)

In [None]:
"""
                    - The `process_completed_future` function is called when all futures in a batch complete within the specified timeout. It 
                        can be used to continue with your program using both completed training and evaluation futures.
                    - The `retry_processing` function is called when there are incomplete futures after iterating through a batch of 
                        data. It can be used to retry processing with those incomplete futures.
                    - The code checks if there are any remaining futures in the lists after completing all iterations. If so, it 
                        waits for them to complete and handles them accordingly.
"""
# List to store parameters of models that failed to complete even after a retry
failed_model_params = []

# Mapping from futures to their corresponding parameters (n_topics, alpha_value, beta_value)
future_to_params = {}

# Function to process completed futures
def process_completed_futures(completed_train_futures, completed_eval_futures):
    # Continue with the program using both completed train and eval futures
    pass

# Function to retry processing with incomplete futures
def retry_processing(incomplete_train_futures, incomplete_eval_futures, timeout):
    # Retry processing with incomplete futures using an extended timeout
    done, not_done = dask.wait(incomplete_train_futures , incomplete_eval_futures, timeout=timeout)
    
    # Process completed ones
    process_completed_futures([f for f in done if f in incomplete_train_futures],
                              [f for f in done if f in incomplete_eval_futures])
    
    # Record parameters of still incomplete futures for later review
    failed_model_params.extend(future_to_params[future] for future in not_done)

## Asynchronous Execution as said by Brunhilda:

Asynchronous execution allows you to execute tasks concurrently, without waiting for each task to complete before moving on \
to the next one. This can improve the overall efficiency and speed of your program.

In the given code snippet, asynchronous execution is achieved using Dask's as_completed function. This function takes a list \
of futures (representing tasks) and returns an iterator that yields futures as they complete.

Here's how it works:

&nbsp;&nbsp;&nbsp;&nbsp;(1) First, you submit all your training and evaluation tasks using client.submit(). These tasks are represented by futures. \
&nbsp;&nbsp;&nbsp;&nbsp;(2) You add callback functions (callback_train and callback_eval) to these futures using the add_done_callback() method. These callbacks will be executed when their respective futures complete.\
&nbsp;&nbsp;&nbsp;&nbsp;(3) You create two lists, train_futures and eval_futures, to store the futures for training and evaluation models respectively.\
&nbsp;&nbsp;&nbsp;&nbsp;(4) After submitting all the tasks, you enter a loop where you iterate over the range of values for n_topics, alpha_value, and beta_value.\
&nbsp;&nbsp;&nbsp;&nbsp;(5) Inside this loop, you submit the training and evaluation tasks for each combination of parameters using client.submit(). These new futures are added to their respective lists.\
&nbsp;&nbsp;&nbsp;&nbsp;(6) Next, you use the as_completed function to iterate over both lists of futures (train_futures and eval_futures). This function returns an iterator that yields completed futures as they become available.\
&nbsp;&nbsp;&nbsp;&nbsp;(7) As each future completes, its associated callback function (callback_train or callback_eval) is executed.\
&nbsp;&nbsp;&nbsp;&nbsp;(8) Inside these callback functions, you retrieve the result of the completed future using .result(). You can then save the trained or evaluated model using the provided save_model_and_log function.\
&nbsp;&nbsp;&nbsp;&nbsp;(9) The loop continues until all combinations of parameters have been processed.\
&nbsp;&nbsp;&nbsp;&nbsp;(10) Finally, after all models have been saved and logged, you close the Dask client. 

By utilizing asynchronous execution with Dask's as_completed, your program can process multiple tasks concurrently while still ensuring that each model is saved once its associated task has completed.

In [None]:

if __name__=="__main__":
    
    import dask   # Parallel computing library that scales Python workflows across multiple cores or machines 
    from dask.distributed import Client, LocalCluster, wait   # Distributed computing framework that extends Dask functionality 
    from dask.diagnostics import ProgressBar   # Visualizes progress of Dask computations
    from dask.distributed import progress
    from dask.delayed import Delayed # Decorator for creating delayed objects in Dask computations
    from dask.distributed import as_completed
    from dask.bag import Bag
    from dask import delayed
    import dask.config
    from dask.distributed import wait

    cluster = LocalCluster(
            n_workers=CORES,
            threads_per_worker=THREADS_PER_CORE,
            processes=False,
            memory_limit=RAM_MEMORY_LIMIT,
            local_directory=DASK_DIR,
            dashboard_address=None,
            #dashboard_address=":8787",
            #protocol="tcp",
    )


    # Create the distributed client
    client = Client(cluster)

    # Get information about workers from scheduler
    workers_info = client.scheduler_info()["workers"]

    # Iterate over workers and set their memory limits
    for worker_id, worker_info in workers_info.items():
        worker_info["memory_limit"] = RAM_MEMORY_LIMIT

    # Verify that memory limits have been set correctly
    #for worker_id, worker_info in workers_info.items():
    #    print(f"Worker {worker_id}: Memory Limit - {worker_info['memory_limit']}")

    # Check if the Dask client is connected to a scheduler:
    if client.status == "running":
        print("Dask client is connected to a scheduler.")
        # Scatter the embedding vectors across Dask workers
    else:
        print("Dask client is not connected to a scheduler.")

    # Check if Dask workers are running:
    if len(client.scheduler_info()["workers"]) > 0:
        print("Dask workers are running.")
    else:
        print("No Dask workers are running.")


    # create training and evaluation data
    print("Creating training and evaluation samples...")
    started = time()
    future = client.submit(futures_create_lda_datasets, DATA_SOURCE, TRAIN_RATIO)
    print(f"Completed creation of training and evaluation samples in {round((time()- started)/60,2)} minutes.\n")

    # Wait for future to complete and retrieve results
    train_data, eval_data = future.result()

    # Scatter the computed training and evaluation data across workers
    print("Beginning data scatter...")
    scattered_train_data_future = client.scatter(train_data)
    scattered_eval_data_future = client.scatter(eval_data)
    print("Data scatter complete...\n")
    

    train_futures = []  # List to store futures for training
    eval_futures = []  # List to store futures for evaluation
    
    num_topics = len(range(START_TOPICS, END_TOPICS + 1, STEP_SIZE))
    num_alpha_values = len(alpha_values)
    num_beta_values = len(beta_values)

    TOTAL_MODELS = (num_topics * num_alpha_values * num_beta_values) * 2

    progress_bar = tqdm(total=TOTAL_MODELS, desc="Creating and saving models")
    
    for n_topics in range(START_TOPICS, END_TOPICS + 1, STEP_SIZE):
        for alpha_value in alpha_values:
            for beta_value in beta_values:
                future_train = client.submit(train_model, n_topics, alpha_value, beta_value,
                                             scattered_train_data_future)
                future_eval = client.submit(train_model, n_topics, alpha_value, beta_value,
                                            scattered_eval_data_future)
                
                # Map the created futures to their parameters so we can identify them later if needed
                future_to_params[future_train] = (n_topics, alpha_value, beta_value)
                future_to_params[future_eval] = (n_topics, alpha_value, beta_value)

                #future_train.add_done_callback(callback_train)
                #future_eval.add_done_callback(callback_eval)

                train_futures.append(future_train)
                eval_futures.append(future_eval)


                if len(train_futures) >= BATCH_SIZE:
                    # Wait for all futures in both lists
                    done, not_done = dask.wait(train_futures + eval_futures, timeout=TIMEOUT)
                    
                    # Separate out completed train and eval futures
                    completed_train_futures = [f for f in done if f in train_futures]
                    completed_eval_futures = [f for f in done if f in eval_futures]
                    
                    # Process only the completed ones
                    process_completed_futures(completed_train_futures, completed_eval_futures)
                    progress_bar.update(len(completed_train_futures) + len(completed_eval_futures))
                    
                    # Record parameters of still incomplete futures for later review
                    failed_model_params.extend(future_to_params[future] for future in not_done)

                    # Clear the lists for the next batch regardless of completion status
                    train_futures.clear()
                    eval_futures.clear()

    # After all loops have finished running...
    if len(train_futures) > 0 or len(eval_futures) > 0:
        retry_processing(train_futures, eval_futures, TIMEOUT)


    # Now give one more chance with extended timeout only to those that were incomplete previously
    if len(failed_model_params) > 0:
        print("Retrying incomplete models with extended timeout...")
        
        # Create new lists for retrying futures
        retry_train_futures = []
        retry_eval_futures = []

        # Resubmit tasks only for those that failed in the first attempt
        for params in failed_model_params:
            n_topics, alpha_value, beta_value = params
            
            future_train_retry = client.submit(train_model, n_topics, alpha_value, beta_value,
                                            scattered_train_data_future)
            future_eval_retry = client.submit(train_model, n_topics, alpha_value, beta_value,
                                            scattered_eval_data_future)

            retry_train_futures.append(future_train_retry)
            retry_eval_futures.append(future_eval_retry)

            # Keep track of these new futures as well
            future_to_params[future_train_retry] = params
            future_to_params[future_eval_retry] = params

        # Clear the list of failed model parameters before reattempting
        failed_model_params.clear()

        # Wait for all reattempted futures with an extended timeout (e.g., 120 seconds)
        done, not_done = dask.wait(retry_train_futures + retry_eval_futures, timeout=EXTENDED_TIMEOUT)

        # Process completed ones after reattempting
        process_completed_futures([f for f in done if f in retry_train_futures],
                                [f for f in done if f in retry_eval_futures])
        
        progress_bar.update(len(done))

        # Record parameters of still incomplete futures after reattempting for later review
        for future in not_done:
            failed_model_params.append(future_to_params[future])

        # At this point `failed_model_params` contains the parameters of all models that didn't complete even after a retry

    progress_bar.close()
    client.close()

    # You can now review `failed_model_params` to see which models did not complete successfully.
    print("The following model parameters did not complete even after a second attempt:")
    perf_logger.info("The following model parameters did not complete even after a second attempt:")
    for params in failed_model_params:
        print(params)
        perf_logger.info(params)

In [None]:
logging.shutdown()

The provided code snippet is part of a larger program that appears to be running machine learning model training and evaluation tasks in parallel using Dask, a flexible parallel computing library for analytic computing. The code manages the execution of these tasks, handling retries for incomplete tasks, and tracking failures.

The script begins by initializing an empty list called failed_model_params to store the parameters of models that fail to complete even after a retry. It also creates a dictionary named future_to_params to map "futures" (a representation of an asynchronous execution) to their corresponding model parameters.

Two functions are defined: process_completed_futures, which processes completed futures, and retry_processing, which attempts to reprocess incomplete futures with an extended timeout period.

The main part of the script sets up multiple training and evaluation tasks across different combinations of hyperparameters (n_topics, alpha_value, and beta_value). These tasks are submitted to a Dask client asynchronously using the client.submit method. Each task returns a future, which is then mapped to its parameters in the future_to_params dictionary for later reference.

The script uses batch processing controlled by a variable called BATCH_SIZE. Once enough futures have been accumulated, or when all loops have finished running, it waits for all futures within each batch to complete using Dask's wait function with a specified timeout (TIMEOUT). Completed futures are processed while those that remain incomplete are recorded in the failed_model_params list for further action.

After processing each batch, if there are any remaining futures (either from incomplete batches or from the final iteration), they are retried using the previously defined retry_processing function with the same timeout value.

If there are still models that failed after this first attempt, they get one more chance. The script prints out a message indicating it will retry these incomplete models with an extended timeout (EXTENDED_TIMEOUT). It resubmits these tasks and waits again for completion. Any models that remain incomplete after this second attempt are added back into the failed_model_params.

Finally, once all retries have been exhausted and progress has been tracked via a progress bar (tqdm), the Dask client is closed. The script prints out and logs information about any model parameters that did not complete successfully even after two attempts.

In summary, this code automates the process of submitting parallelized machine learning training and evaluation jobs over various hyperparameter combinations, handles timeouts by retrying incomplete jobs,