# Imports

In [4]:
import time
import random
import requests
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor , as_completed
from pprint import pprint

In [10]:
from pprint import pprint
import time 
import requests 
import random

def get_post(post_id: int) -> dict:
    # Value check - Posts on the API only go up to ID of 100
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    # API URL
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    # Sleep to imitate a long-running process
    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    # Fetch the data and return it
    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    # To indicate how much time fetching took
    result["fetch_time"] = time_to_sleep
    # Remove the longest key-value pair for formatting reasons
    del result["body"]
    return result

pprint(get_post(1))

{'fetch_time': 7,
 'id': 1,
 'title': 'sunt aut facere repellat provident occaecati excepturi optio '
          'reprehenderit',
 'userId': 1}


In [6]:
def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    r = requests.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result


if __name__ == "__main__":
    # Measure the time
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    # Simple iteration
    for post_id in range(1, 11):
        post = get_post(post_id)
        pprint(post)

    # Print total duration
    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Starting to fetch posts...

{'fetch_time': 10,
 'id': 1,
 'title': 'sunt aut facere repellat provident occaecati excepturi optio '
          'reprehenderit',
 'userId': 1}
{'fetch_time': 8, 'id': 2, 'title': 'qui est esse', 'userId': 1}
{'fetch_time': 4,
 'id': 3,
 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut',
 'userId': 1}
{'fetch_time': 2, 'id': 4, 'title': 'eum et est occaecati', 'userId': 1}
{'fetch_time': 3, 'id': 5, 'title': 'nesciunt quas odio', 'userId': 1}
{'fetch_time': 1,
 'id': 6,
 'title': 'dolorem eum magni eos aperiam quia',
 'userId': 1}
{'fetch_time': 3, 'id': 7, 'title': 'magnam facilis autem', 'userId': 1}
{'fetch_time': 8, 'id': 8, 'title': 'dolorem dolore est ipsam', 'userId': 1}
{'fetch_time': 1,
 'id': 9,
 'title': 'nesciunt iure omnis dolorem tempora et accusantium',
 'userId': 1}
{'fetch_time': 2, 'id': 10, 'title': 'optio molestias id quia eum', 'userId': 1}

All posts fetched! Took: 48 seconds.


## ThreadpoolExecuter

You’ll want to import ThreadPoolExecutor and the as_completed() function. The custom get_post() function remains unchanged.

The real magic happens below.

Essentially, you’re creating a new ThreadPoolExecutor through the context manager syntax (the most common approach). Inside, you’re using the submit() method to add tasks to the executor. The first parameter of this method is your function name, followed by its parameter values. You can dynamically iterate over a range of values for post_id using Python’s list comprehension.

A Future object is returned by the submit() function.

The as_completed() function will extract and print the result as individual threads finish with execution

In [7]:
if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    # Run post fetching concurrently
    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
        # Process task results
        for future in as_completed(futures):
            # Get and display the result
            result = future.result()
            print(result)

    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Starting to fetch posts...

{'userId': 1, 'id': 5, 'title': 'nesciunt quas odio', 'fetch_time': 1}
{'userId': 1, 'id': 4, 'title': 'eum et est occaecati', 'fetch_time': 2}
{'userId': 1, 'id': 2, 'title': 'qui est esse', 'fetch_time': 4}
{'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'fetch_time': 4}
{'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'fetch_time': 7}
{'userId': 1, 'id': 7, 'title': 'magnam facilis autem', 'fetch_time': 8}
{'userId': 1, 'id': 9, 'title': 'nesciunt iure omnis dolorem tempora et accusantium', 'fetch_time': 8}
{'userId': 1, 'id': 8, 'title': 'dolorem dolore est ipsam', 'fetch_time': 9}
{'userId': 1, 'id': 6, 'title': 'dolorem eum magni eos aperiam quia', 'fetch_time': 9}
{'userId': 1, 'id': 10, 'title': 'optio molestias id quia eum', 'fetch_time': 10}

All posts fetched! Took: 10 seconds.


## Scaling things Up 

In [8]:
if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects - NOW 100 POSTS IN TOTAL
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 101)]
        for future in as_completed(futures):
            result = future.result()
            pprint(result)

    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Starting to fetch posts...

{'fetch_time': 1,
 'id': 12,
 'title': 'in quibusdam tempore odit est dolorem',
 'userId': 2}
{'fetch_time': 2, 'id': 5, 'title': 'nesciunt quas odio', 'userId': 1}
{'fetch_time': 4,
 'id': 3,
 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut',
 'userId': 1}
{'fetch_time': 5,
 'id': 9,
 'title': 'nesciunt iure omnis dolorem tempora et accusantium',
 'userId': 1}
{'fetch_time': 6, 'id': 10, 'title': 'optio molestias id quia eum', 'userId': 1}
{'fetch_time': 7, 'id': 4, 'title': 'eum et est occaecati', 'userId': 1}
{'fetch_time': 7, 'id': 8, 'title': 'dolorem dolore est ipsam', 'userId': 1}
{'fetch_time': 7, 'id': 7, 'title': 'magnam facilis autem', 'userId': 1}
{'fetch_time': 6,
 'id': 13,
 'title': 'dolorum ut in voluptas mollitia et saepe quo animi',
 'userId': 2}
{'fetch_time': 2,
 'id': 16,
 'title': 'sint suscipit perspiciatis velit dolorum rerum ipsa laboriosam odio',
 'userId': 2}
{'fetch_time': 8,
 'id': 1,
 'title': 'sunt aut fac

## How to Handle Failure
Sometimes, the function you want to run concurrently will fail. Inside the for future in as_completed(futures): block, you can add a try/except block to implement exception handling.

To demonstrate, try submitting futures for post_ids up to 150 — as the get_post() function will raise an error for any post_id above 100

In [9]:
if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    with ThreadPoolExecutor() as tpe:
        # Submit tasks and get future objects - NOW 150 POSTS IN TOTAL - 50 WILL FAIL
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 151)]
        # Process task results
        for future in as_completed(futures):
            # Your typical try/except block
            try:
                result = future.result()
                print(result)
            except Exception as e:
                print(f"Exception raised: {str(e)}")

    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Starting to fetch posts...

{'userId': 1, 'id': 7, 'title': 'magnam facilis autem', 'fetch_time': 1}
{'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'fetch_time': 1}
{'userId': 1, 'id': 4, 'title': 'eum et est occaecati', 'fetch_time': 2}
{'userId': 2, 'id': 11, 'title': 'et ea vero quia laudantium autem', 'fetch_time': 3}
{'userId': 2, 'id': 13, 'title': 'dolorum ut in voluptas mollitia et saepe quo animi', 'fetch_time': 2}
{'userId': 1, 'id': 8, 'title': 'dolorem dolore est ipsam', 'fetch_time': 4}
{'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'fetch_time': 4}
{'userId': 2, 'id': 16, 'title': 'sint suscipit perspiciatis velit dolorum rerum ipsa laboriosam odio', 'fetch_time': 1}
{'userId': 2, 'id': 14, 'title': 'voluptatem eligendi optio', 'fetch_time': 4}
{'userId': 1, 'id': 9, 'title': 'nesciunt iure omnis dolorem tempora et accusantium', 'fetch_time': 6}
{'userId': 1, 'id

## Addding Callback Functions

In [11]:
def future_callback_fn(future):
    print(f"[{datetime.now()}] Custom future callback function!")
    # You have access to the future object
    print(future.result())


if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    with ThreadPoolExecutor() as tpe:
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 11)]
        for future in as_completed(futures):
            # Custom callback
            future.add_done_callback(future_callback_fn)

    # Print total duration
    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")

Starting to fetch posts...

[2024-08-05 14:46:23.409002] Custom future callback function!
{'userId': 1, 'id': 4, 'title': 'eum et est occaecati', 'fetch_time': 1}
[2024-08-05 14:46:24.394794] Custom future callback function!
{'userId': 1, 'id': 7, 'title': 'magnam facilis autem', 'fetch_time': 2}
[2024-08-05 14:46:25.353951] Custom future callback function!
{'userId': 1, 'id': 5, 'title': 'nesciunt quas odio', 'fetch_time': 3}
[2024-08-05 14:46:26.353645] Custom future callback function!
{'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'fetch_time': 4}
[2024-08-05 14:46:27.413601] Custom future callback function!
{'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'fetch_time': 5}
[2024-08-05 14:46:28.397078] Custom future callback function!
{'userId': 1, 'id': 10, 'title': 'optio molestias id quia eum', 'fetch_time': 6}
[2024-08-05 14:46:29.471027] Custom future callback function!
{

## Rate Limiting

Rate Limiting — How to Get Around That Pesky HTTP 429 Error
Every data professional works with REST APIs. While calling their endpoints concurrently is a good way to reduce the overall runtime, it can result in aHTTP 429 Too Many Request status.

The reason is simple — the API owner doesn’t want you making thousands of requests every second for the sake of performance. Or maybe, they’re restricting the traffic volume based on your subscription tear.

Whatever the case might be, an easy way around it is to install the requests-ratelimiter library and limit how many requests can be made per day, hour, minute, or second

In [None]:
import time
import random
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# New import
from requests_ratelimiter import LimiterSession

# Limit to max 2 calls per second
request_session = LimiterSession(per_second=2)


def get_post(post_id: int) -> dict:
    if post_id > 100:
        raise ValueError("Parameter `post_id` must be less than or equal to 100")

    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"

    time_to_sleep = random.randint(1, 10)
    time.sleep(time_to_sleep)

    # Use the request_session now
    r = request_session.get(url)
    r.raise_for_status()
    result = r.json()
    result["fetch_time"] = time_to_sleep
    del result["body"]
    return result


if __name__ == "__main__":
    time_start = datetime.now()
    print("Starting to fetch posts...\n")

    # Everything here stays the same
    with ThreadPoolExecutor() as tpe:
        futures = [tpe.submit(get_post, post_id) for post_id in range(1, 16)]
        for future in as_completed(futures):
            result = future.result()
            print(result)

    time_end = datetime.now()
    print(f"\nAll posts fetched! Took: {(time_end - time_start).seconds} seconds.")