In [1]:
import asyncio
import pandas as pd
import random
import time
import logging


# Create a logger and set its level
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# Create a handler with a formatter and add it to the logger
handler = logging.StreamHandler()
handler.setFormatter(
    logging.Formatter(fmt='%(asctime)s:%(msecs)03d[%(funcName)s]: %(message)s', datefmt='%H:%M:%S')
)

logger.addHandler(handler)



In [7]:
async def process_row(index, row, col_number):
    # Randomize a flag
    flag = random.randint(0, 1)
    
    # Apply transformation based on the flag
    if flag == 0:
        processed_number = row[col_number]
    else:
        processed_number = row[col_number] * 0.5
    
    # Artificially wait
    # await asyncio.sleep(0)  # Yield control to other coroutines
    logging.debug(f"[P#{index}]Start halt {processed_number} seconds...")
    await asyncio.sleep(processed_number)  # Simulate I/O wait using time.sleep()
    
    logging.debug(f"[P#{index}]Done halt.")
    return processed_number + 0.1

async def process_batch(batch, col_number):
    logging.debug(f"Begin process_batch()...")
    logging.debug(f"Processing batch of {len(batch)} rows")
    tasks = []
    for index, row in batch.iterrows():
        task = process_row(index, row, col_number)
        tasks.append(task)
    logging.debug(f"End process_batch(), returning {len(tasks)} tasks")
    return await asyncio.gather(*tasks)


In [10]:
# Create a DataFrame
logging.debug(f"Creating DataFrame...")
df = pd.DataFrame({'Numbers': [random.randint(4, 8) for _ in range(1000)]})
logging.debug(f"Dataframe created: {df.shape}")


02:50:04:585[<cell line: 2>]: Creating DataFrame...
02:50:04:587[<cell line: 4>]: Dataframe created: (1000, 1)


In [12]:
processed_values = []
MAX_RETRIES = 3  # Define the maximum number of retries

# Process 5 rows at a time
for i in range(0, len(df), 5):
    logging.debug(f"Processing rows {i} to {i+5}...")
    batch = df.iloc[i:i+5]
        
    # try:
    #     logging.debug(f"Puting batch {i} to process_batch(), assign to processed_batch, and timeout after 5 seconds...")
    #     processed_batch = await asyncio.wait_for(process_batch(batch, 'Numbers'), timeout=5)
    #     print("Processed batch:", processed_batch)
    #     logging.debug(f"Extend storage with processed_batch...")
    #     processed_values.extend(processed_batch)
    # except asyncio.TimeoutError:
    #     print("Timeout occurred for batch")
    

    for i in range(MAX_RETRIES):
        try:
            logging.debug(f"Putting batch {i} to process_batch(), assign to processed_batch, and timeout after 5 seconds...")
            processed_batch = await asyncio.wait_for(process_batch(batch, 'Numbers'), timeout=5)
            print("Processed batch:", processed_batch)
            logging.debug(f"Extend storage with processed_batch...")
            processed_values.extend(processed_batch)
            break  # If the process_batch function completes without a TimeoutError, break the loop
        except asyncio.TimeoutError:
            print(f"Timeout occurred for batch {i}, retrying...")
            if i == MAX_RETRIES - 1:  # If this was the last retry
                print(f"Failed to process batch {i} after {MAX_RETRIES} attempts")
        
    break

print("All processed values:", processed_values)


03:04:41:624[<cell line: 5>]: Processing rows 0 to 5...
03:04:41:625[<cell line: 5>]: Putting batch 0 to process_batch(), assign to processed_batch, and timeout after 5 seconds...
03:04:41:626[process_batch]: Begin process_batch()...
03:04:41:626[process_batch]: Processing batch of 5 rows
03:04:41:627[process_batch]: End process_batch(), returning 5 tasks
03:04:41:627[process_row]: [P#0]Start halt 4 seconds...
03:04:41:628[process_row]: [P#1]Start halt 6 seconds...
03:04:41:629[process_row]: [P#2]Start halt 2.5 seconds...
03:04:41:629[process_row]: [P#3]Start halt 2.0 seconds...
03:04:41:630[process_row]: [P#4]Start halt 2.0 seconds...
03:04:43:631[process_row]: [P#3]Done halt.
03:04:43:631[process_row]: [P#4]Done halt.
03:04:44:138[process_row]: [P#2]Done halt.
03:04:45:645[process_row]: [P#0]Done halt.
03:04:46:643[<cell line: 5>]: Putting batch 1 to process_batch(), assign to processed_batch, and timeout after 5 seconds...
03:04:46:644[process_batch]: Begin process_batch()...
03:04:

Timeout occurred for batch 0, retrying...


03:04:48:663[process_row]: [P#4]Done halt.
03:04:49:157[process_row]: [P#2]Done halt.
03:04:49:650[process_row]: [P#1]Done halt.
03:04:50:661[process_row]: [P#0]Done halt.
03:04:50:661[process_row]: [P#3]Done halt.
03:04:50:662[<cell line: 5>]: Extend storage with processed_batch...


Processed batch: [4.1, 3.1, 2.6, 4.1, 2.1]
All processed values: [4.1, 3.1, 2.6, 4.1, 2.1]
