In [None]:
library(bnlearn)
library(parallel)
library(jsonlite)
library(ggplot2)

In [None]:
options(warn = -1)  # Temporarily suppress warnings
rep <- 100          # Number of repetitions for the experiment
threshold <- 10     # Threshold for identifying large tasks based on the number of nodes

# Set file paths
folder_path <- ".../bif_file"
atom_path <- ".../atom"

# List of file names to be processed
file_names_to_load <- c('child', 'alarm', 'hailfinder', 'hepar2', 'win95pts', 'pigs')

# Initialize a data frame to store execution time records
time_records <- data.frame(
  File_Name = character(0),        # Column for file name
  Full_Model_Time = numeric(0),    # Column for full model execution time
  Con_Model_Time = numeric(0),     # Column for distributed model execution time
  Ratio = numeric(0)               # Column for speedup ratio
)

for (file_name in file_names_to_load) {
    # Construct file paths
    bif_file_path <- file.path(folder_path, paste0(file_name, ".bif"))
    atom_file_path <- file.path(atom_path, paste0(file_name, "_atom.txt"))
    
    if (file.exists(bif_file_path)) {
        # Load network structure and generate synthetic data
        BN <- read.bif(bif_file_path, debug = FALSE)
        BN_full <- empty.graph(nodes = nodes(BN))
        arcs(BN_full) <- arcs(BN)
        atom <- fromJSON(atom_file_path)
        data <- rbn(BN, n = 150000)
        
        # Prepare subnetworks and corresponding data
        BN_con_list <- lapply(atom, function(sub_nodes) {
            bnlearn::subgraph(BN_full, nodes = sub_nodes)
        })
        sub_data_list <- lapply(atom, function(sub_nodes) {
            data[, sub_nodes]
        })
        
        # ========== Distributed Learning with Parallel Execution ==========
        # Identify large and small tasks based on the threshold
        big_task <- which(sapply(atom, length) > threshold)
        small_tasks <- which(sapply(atom, length) <= threshold)
        
        # Initialize accumulators for total execution time
        total_time_full <- 0
        total_time_con <- 0
        
        for (j in 1:rep) {
            # Timing for full model learning
            start_full <- Sys.time()
            BN_full.mle <- bn.fit(BN_full, data = data, method = "mle", debug = FALSE)
            total_time_full <- total_time_full + (Sys.time() - start_full)
            full_model_time <- Sys.time() - start_full  # Record single run time for full model

            # Timing for parallel decomposition model
            # Dynamic grouping: small tasks are divided into up to 5 groups
            n_small_groups <- min(5, length(small_tasks))
            if (n_small_groups > 0) {
                small_task_groups <- split(
                    small_tasks,
                    cut(seq_along(small_tasks), breaks = n_small_groups, labels = FALSE)
                )
                task_list <- c(
                    if (length(big_task) > 0) list(big_task = big_task),
                    lapply(1:n_small_groups, function(i) small_task_groups[[i]])
                )
                names(task_list) <- c(
                    if (length(big_task)) "big_task",
                    paste0("small_group_", seq_along(small_task_groups))
                )
            } else {
                task_list <- list(big_task = big_task)
            }
            
            # Parallel execution
            cl <- makeCluster(min(9, length(task_list))) 
            clusterExport(cl, c("BN_con_list", "sub_data_list", "bn.fit"))
            
            start_con <- Sys.time()
            results <- parLapply(cl, task_list, function(task_group) {
                if (exists("big_task") && identical(task_group, big_task)) {
                    bn.fit(BN_con_list[[task_group]], sub_data_list[[task_group]], method = "mle")
                } else {
                    lapply(task_group, function(i) {
                        bn.fit(BN_con_list[[i]], sub_data_list[[i]], method = "mle")
                    })
                }
            })
            con_model_time <- Sys.time() - start_con  # Record single run time for parallel model
                         
            total_time_con <- total_time_con + (Sys.time() - start_con)
            stopCluster(cl)
            
            full_model_time_sec <- as.numeric(full_model_time)  # Convert to seconds
            con_model_time_sec <- as.numeric(con_model_time)    # Convert to seconds
            ratio <- round(full_model_time_sec / con_model_time_sec, 4)
            
            time_records <- rbind(time_records, data.frame(
                File_Name = file_name,  
                Full_Model_Time = round(full_model_time_sec, 4),
                Con_Model_Time = round(con_model_time_sec, 4),
                Ratio = ratio
            ))
        }
        
        # Output average timing results
        cat(sprintf(
            "%s | Full model: %.4fs | Parallel model: %.4fs | Speedup: %.4fx\n",
            file_name, 
            as.numeric(total_time_full)/rep, 
            as.numeric(total_time_con)/rep,
            as.numeric(total_time_full)/as.numeric(total_time_con)
        ))
    } else {
        cat("File not found:", bif_file_path, "\n")
    }
}

options(warn = 0)  # Restore warning display


In [None]:
write.csv(time_records, "time.csv", row.names = FALSE)