In [1]:
import pandas as pd
import brightway2 as bc
import multiprocessing as mp
import time

# Mock Data
unique_process_index = ['proc1', 'proc2', 'proc3']
impact_categories = [('Impact1', 'Test Impact 1'), ('Impact2', 'Test Impact 2')]
uniquie_process_dct = {'proc1': 1, 'proc2': 1, 'proc3': 1}

# Expected DataFrame
df_expected = pd.DataFrame({
    'Impact1': [10, 20, 30],
    'Impact2': [15, 25, 35]
}, index=unique_process_index)

# Set the start method to 'spawn' (default for Windows)
mp.set_start_method('spawn', force=True)

In [2]:
def mock_lca_calculation(args):
    key, item, impact = args
    print(f"Running LCA for {key} {item}")
    # Mock return value based on the impact category
    if impact[0] == 'Impact1':
        return key, impact, {'proc1': 10, 'proc2': 20, 'proc3': 30}.get(key, None)
    elif impact[0] == 'Impact2':
        return key, impact, {'proc1': 15, 'proc2': 25, 'proc3': 35}.get(key, None)
    return key, impact, None


In [3]:
from concurrent.futures import ThreadPoolExecutor

tasks = [(str(key), int(item), tuple(impact)) for impact in impact_categories for key, item in uniquie_process_dct.items()]

with ThreadPoolExecutor() as executor:
    results = list(executor.map(mock_lca_calculation, tasks))

# Print the results
for result in results:
    print(result)


Running LCA for proc1 1
Running LCA for proc2 1
Running LCA for proc3 1
Running LCA for proc1 1
Running LCA for proc2 1
Running LCA for proc3 1
('proc1', ('Impact1', 'Test Impact 1'), 10)
('proc2', ('Impact1', 'Test Impact 1'), 20)
('proc3', ('Impact1', 'Test Impact 1'), 30)
('proc1', ('Impact2', 'Test Impact 2'), 15)
('proc2', ('Impact2', 'Test Impact 2'), 25)
('proc3', ('Impact2', 'Test Impact 2'), 35)


In [6]:
# Prepare tasks for multiprocessing
tasks = [(key, item, impact) for impact in impact_categories for key, item in uniquie_process_dct.items()]

with ThreadPoolExecutor() as executor:
    results = executor.map(mock_lca_calculation, tasks)

# Create DataFrame to store results
df_test = pd.DataFrame(0, index=unique_process_index, columns=[imp[0] for imp in impact_categories], dtype=object)

# Assign results to DataFrame
for key, impact, score in results:
    df_test.at[key, impact[0]] = score
    # print(key)

# Check if the results match the expected DataFrame
try:
    pd.testing.assert_frame_equal(df_test, df_expected)
    print("Basic Functionality Test Passed ✅")
except AssertionError as e:
    print("Basic Functionality Test Failed ❌")
    print(e)


Running LCA for proc1 1
Running LCA for proc2 1
Running LCA for proc3 1
Running LCA for proc1 1
Running LCA for proc2 1
Running LCA for proc3 1
Basic Functionality Test Failed ❌
Attributes of DataFrame.iloc[:, 0] (column name="Impact1") are different

Attribute "dtype" are different
[left]:  object
[right]: int64


In [None]:
# Mock LCA function with an intentional error for 'proc2'
def mock_lca_error(args):
    key, item, impact = args
    if key == 'proc2':
        raise ValueError("Mock LCA error")
    return mock_lca_calculation(args)

# Run the tasks with error handling
with mp.Pool(processes=mp.cpu_count()) as pool:
    results = pool.map(mock_lca_error, tasks)

# Verify that 'proc2' returned None due to the error
error_test_passed = all(
    (key != 'proc2' and score is not None) or (key == 'proc2' and score is None)
    for key, impact, score in results
)

if error_test_passed:
    print("Error Handling Test Passed ✅")
else:
    print("Error Handling Test Failed ❌")


In [None]:
# Serial Execution
start_serial = time.time()
serial_results = [mock_lca_calculation(task) for task in tasks]
end_serial = time.time()
serial_time = end_serial - start_serial
print(f"Serial execution time: {serial_time:.4f} seconds")


In [None]:
# Parallel Execution
start_parallel = time.time()
with mp.Pool(processes=mp.cpu_count()) as pool:
    parallel_results = pool.map(mock_lca_calculation, tasks)
end_parallel = time.time()
parallel_time = end_parallel - start_parallel
print(f"Parallel execution time: {parallel_time:.4f} seconds")


In [None]:
if parallel_time < serial_time:
    print(f"Parallel execution is faster by {serial_time - parallel_time:.4f} seconds ✅")
else:
    print(f"Serial execution is faster ❌ (unexpected)")


In [None]:
# Using %timeit for performance measurement
print("Serial execution timing with %timeit:")
%timeit [mock_lca_calculation(task) for task in tasks]

print("\nParallel execution timing with %timeit:")
%timeit with mp.Pool(processes=mp.cpu_count()) as pool: pool.map(mock_lca_calculation, tasks)
