<a href="https://colab.research.google.com/github/ashkanradjou/GO-ing/blob/main/Worker_Pool_go.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Initial setup

In [2]:
#download binaries for go
!wget https://dl.google.com/go/go1.19.6.linux-amd64.tar.gz

#extract binaries from tarball and place in /usr/local directory
!sudo tar -C /usr/local -xzf go1.19.6.linux-amd64.tar.gz

#remove tarball
!rm go1.19.6.linux-amd64.tar.gz

--2025-10-27 09:08:28--  https://dl.google.com/go/go1.19.6.linux-amd64.tar.gz
Resolving dl.google.com (dl.google.com)... 173.194.217.91, 173.194.217.93, 173.194.217.190, ...
Connecting to dl.google.com (dl.google.com)|173.194.217.91|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 149006448 (142M) [application/x-gzip]
Saving to: ‘go1.19.6.linux-amd64.tar.gz’


2025-10-27 09:08:33 (31.6 MB/s) - ‘go1.19.6.linux-amd64.tar.gz’ saved [149006448/149006448]



In [3]:
import os
#add the go executable to path
os.environ['PATH'] += ':/usr/local/go/bin'

In [4]:
!go version

go version go1.19.6 linux/amd64


#Code

In [9]:
%%writefile main.go

package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"net/http"
	"sync"
	"time"
)

/*
Part A: Worker Pool (Image thumbnailer)
- input: paths (string) -> buffered channel (backpressure)
- workers: n goroutines that create thumbnails (simulation)
- output: thumbnails -> buffered channel
- clean shutdown: when the producer closes the input channel, the workers finish cleanly and then the output is closed
*/

// thumbnailResult is the output schema of each worker
type thumbnailResult struct {
	Path     string
	Thumb    string
	Duration time.Duration
	Err      error
}

func workerThumbnail(ctx context.Context, id int, in <-chan string, out chan<- thumbnailResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			//We exit when the entire context is canceled.
			fmt.Printf("worker-%d: context canceled, exiting\n", id)
			return
		case path, ok := <-in:
			if !ok {
				// Input channel closed => job done
				fmt.Printf("worker-%d: input channel closed, exiting\n", id)
				return
			}

			// Thumbnail processing simulation (may have I/O)
			start := time.Now()
			// simulate variable work time
			time.Sleep(time.Duration(100+rand.Intn(300)) * time.Millisecond)

			// Error simulation for some paths
			var err error
			if rand.Float32() < 0.05 {
				err = errors.New("failed to create thumbnail (simulated)")
			}

			res := thumbnailResult{
				Path:     path,
				Thumb:    fmt.Sprintf("%s.thumb.jpg", path),
				Duration: time.Since(start),
				Err:      err,
			}

			select {
			case <-ctx.Done():
				return
			case out <- res:
			}
		}
	}
}

/*
Part B: Rate-limited fetcher + retry with backoff
- A simple token-bucket with a ticker that fills tokens (burst capability)
- retryWithBackoff functions that respect context and can choose between fixed or exponential backoff
*/

func newTokenBucket(ctx context.Context, ratePerSec int, burst int) <-chan struct{} {
	// The channel where the token is placed
	tokens := make(chan struct{}, burst)

	for i := 0; i < burst; i++ {
		tokens <- struct{}{}
	}

	ticker := time.NewTicker(time.Second / time.Duration(ratePerSec))
	go func() {
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				close(tokens)
				return
			case <-ticker.C:
				// Try adding a token (don't add if buffer is full)
				select {
				case tokens <- struct{}{}:
				default:
				}
			}
		}
	}()

	return tokens
}

// If ctx.Done() is triggered, it returns immediately.
func doWithRetry(ctx context.Context, op func() error, maxRetries int, initialDelay time.Duration, multiplier float64, jitter bool) error {
	delay := initialDelay
	for attempt := 0; attempt <= maxRetries; attempt++ {
		// Check whether the context has been canceled before executing the attempt.
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

		err := op()
		if err == nil {
			return nil
		}

		// If it was the last attempt, return an error.
		if attempt == maxRetries {
			return err
		}

		//Preparing the next delay
		sleep := delay
		if jitter {
			j := 0.5 + rand.Float64()
			sleep = time.Duration(float64(sleep) * j)
		}

		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(sleep):
		}

		delay = time.Duration(float64(delay) * multiplier)
		if delay > 10*time.Second {
			delay = 10 * time.Second
		}
	}
	return errors.New("unreachable")
}

// Rate Limited Fetcher: Fetching URLs from in-chan, sends each request to the token bucket with respect.
// After fetch, it sends the result to out (in this example it is only simulated)
type fetchResult struct {
	URL      string
	BodySize int
	Err      error
}

func rateLimitedFetcher(ctx context.Context, id int, in <-chan string, out chan<- fetchResult, tokens <-chan struct{}, fetchTimeout time.Duration, wg *sync.WaitGroup) {
	defer wg.Done()
	client := &http.Client{
		Timeout: fetchTimeout,
	}
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("fetcher-%d: context canceled, exiting\n", id)
			return
		case url, ok := <-in:
			if !ok {
				fmt.Printf("fetcher-%d: input closed, exiting\n", id)
				return
			}

			// Receive tokens (rate limiting). If the tokens channel is closed, respect and exit.
			select {
			case <-ctx.Done():
				return
			case _, ok := <-tokens:
				if !ok {
					// token bucket closed => stop
					fmt.Printf("fetcher-%d: tokens closed, exiting\n", id)
					return
				}
				// got token -> proceed
			}

			// define op to fetch (simple GET)
			op := func() error {
				req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
				if err != nil {
					return err
				}
				// in demo we don't read body fully to keep it fast
				resp, err := client.Do(req)
				if err != nil {
					return err
				}
				defer resp.Body.Close()
				// simulate reading body size
				n := 100 + rand.Intn(2000)
				// treat non-2xx as error
				if resp.StatusCode < 200 || resp.StatusCode >= 300 {
					return fmt.Errorf("bad status %d", resp.StatusCode)
				}
				// simulate some processing time
				time.Sleep(time.Duration(50+rand.Intn(200)) * time.Millisecond)
				outRes := fetchResult{URL: url, BodySize: n, Err: nil}
				// send result with respect to ctx
				select {
				case <-ctx.Done():
					return ctx.Err()
				case out <- outRes:
					return nil
				}
			}

			//Run op with retries and exponential backoff (example: max 3 retries)
			err := doWithRetry(ctx, op, 3, 200*time.Millisecond, 2.0, true)
			if err != nil {
				// Sending errors to output
				select {
				case <-ctx.Done():
				case out <- fetchResult{URL: url, Err: err}:
				}
			}
		}
	}
}

/*
main: An example of using both systems:
- Send video/image path to inputThumbnailCh (buffered -> backpressure)
- n number of workers
- Read and print the thumbnails output
- Also an example of a rate-limited fetcher that grabs URLs, limits them with tokens, and sends the results
*/

func main() {
	rand.Seed(time.Now().UnixNano())

	rootCtx, cancelRoot := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancelRoot()

	// ------------ Worker Pool: Thumbnailer ------------
	inThumb := make(chan string, 5)
	outThumb := make(chan thumbnailResult)
	var wgThumb sync.WaitGroup
	numThumbWorkers := 4

	// start workers
	for i := 0; i < numThumbWorkers; i++ {
		wgThumb.Add(1)
		go workerThumbnail(rootCtx, i+1, inThumb, outThumb, &wgThumb)
	}

	// producer: Simulate sending file paths
	go func() {
		paths := []string{
			"img1.jpg", "img2.jpg", "img3.jpg", "img4.jpg", "img5.jpg",
			"img6.jpg", "img7.jpg", "img8.jpg", "img9.jpg", "img10.jpg",
		}
		for _, p := range paths {
			select {
			case <-rootCtx.Done():
				fmt.Println("producer thumbnails: context canceled, stop producing")
				close(inThumb)
				return
			case inThumb <- p: // If the buffer is full, this send will be blocked and backpressure will occur.
				fmt.Println("producer thumbnails: enqueued", p)
			}
		}
		// When finished, we close the input channel so that workers know the work is done.
		close(inThumb)
		fmt.Println("producer thumbnails: closed input channel")
	}()

	// consumer of thumbnails — close outThumb after workers are finished
	go func() {
		wgThumb.Wait()
		close(outThumb)
		fmt.Println("thumbnail consumer: closed outThumb after workers finished")
	}()

	// reader for outThumb
	go func() {
		for tr := range outThumb {
			if tr.Err != nil {
				fmt.Printf("[thumbnail ERROR] %s -> %v (took %s)\n", tr.Path, tr.Err, tr.Duration)
			} else {
				fmt.Printf("[thumbnail READY] %s -> %s (took %s)\n", tr.Path, tr.Thumb, tr.Duration)
			}
		}
	}()

	// ------------ Rate-limited Fetcher ------------
	// In channel for URLs and out channel for results (buffered to allow some decoupling)
	inFetch := make(chan string, 10)
	outFetch := make(chan fetchResult, 10)

	// token bucket: rate 2 req/sec with burst 3
	tokenCtx, tokenCancel := context.WithCancel(rootCtx)
	tokens := newTokenBucket(tokenCtx, 2, 3)

	var wgFetch sync.WaitGroup
	numFetchers := 3
	for i := 0; i < numFetchers; i++ {
		wgFetch.Add(1)
		go rateLimitedFetcher(rootCtx, i+1, inFetch, outFetch, tokens, 2*time.Second, &wgFetch)
	}

	// produce some URLs
	go func() {
		urls := []string{
			"https://example.com/a",
			"https://example.com/b",
			"https://example.com/c",
			"https://example.com/d",
			"https://httpbin.org/status/500", // simulate failure
			"https://httpbin.org/status/403", // simulate non-2xx
		}
		for _, u := range urls {
			select {
			case <-rootCtx.Done():
				close(inFetch)
				return
			case inFetch <- u:
				fmt.Println("enqueued url:", u)
			}
		}
		// close input to signal fetchers to finish
		close(inFetch)
		fmt.Println("closed inFetch")
	}()

	// consumer for fetch results
	go func() {
		for fr := range outFetch {
			if fr.Err != nil {
				fmt.Printf("[fetch ERROR] %s -> %v\n", fr.URL, fr.Err)
			} else {
				fmt.Printf("[fetch OK] %s -> bytes=%d\n", fr.URL, fr.BodySize)
			}
		}
		fmt.Println("outFetch closed, fetch consumer exiting")
	}()

	// wait for fetchers and then close outFetch
	go func() {
		wgFetch.Wait()
		// stop token supplier
		tokenCancel()
		// close outFetch to signal consumer
		close(outFetch)
	}()

	// Waiting for everything to finish or a general timeout
	<-rootCtx.Done()
	fmt.Println("root context done — main exiting")
}


Writing main.go


In [10]:
!go run main.go

producer thumbnails: enqueued img1.jpg
producer thumbnails: enqueued img2.jpg
producer thumbnails: enqueued img3.jpg
producer thumbnails: enqueued img4.jpg
producer thumbnails: enqueued img5.jpg
producer thumbnails: enqueued img6.jpg
producer thumbnails: enqueued img7.jpg
producer thumbnails: enqueued img8.jpg
producer thumbnails: enqueued img9.jpg
enqueued url: https://example.com/a
enqueued url: https://example.com/b
enqueued url: https://example.com/c
enqueued url: https://example.com/d
enqueued url: https://httpbin.org/status/500
enqueued url: https://httpbin.org/status/403
closed inFetch
producer thumbnails: enqueued img10.jpg
producer thumbnails: closed input channel
[thumbnail READY] img4.jpg -> img4.jpg.thumb.jpg (took 209.677962ms)
[thumbnail READY] img3.jpg -> img3.jpg.thumb.jpg (took 279.093469ms)
[thumbnail READY] img1.jpg -> img1.jpg.thumb.jpg (took 312.083248ms)
[thumbnail READY] img2.jpg -> img2.jpg.thumb.jpg (took 382.522967ms)
[thumbnail READY] img5.jpg -> img5.jpg.thu

#Readme

##Go Worker Pool & Rate-Limited Fetcher Example

This project demonstrates two distinct concepts implemented in Go:


1.   **Worker Pool for Image Thumbnailing**

2.   **Rate-Limited Fetcher with Retry Mechanism**

Both are combined in a main application that processes tasks concurrently with proper error handling, backpressure, and rate limiting. The example uses Go’s concurrency model (goroutines and channels) to handle multiple tasks efficiently.

##Overview

This project simulates a worker pool to generate image thumbnails and a rate-limited fetcher for HTTP requests, both controlled by context cancellation, error handling, and retries. Here's a summary of the two main parts:

**Part A**: Worker Pool (Image Thumbnailing): Workers simulate the generation of image thumbnails concurrently. The worker pool uses buffered channels to control backpressure and ensures clean shutdown when the producer finishes processing.

**Part B**: Rate-Limited Fetcher: A fetcher simulates HTTP requests to a list of URLs, controlling the request rate using a token bucket. It retries failed requests with exponential backoff and handles errors.

##Concepts Implemented

**Part A: Worker Pool (Thumbnailer)**

This part of the code simulates the process of creating image thumbnails. Here's how it works:


*   Producer: A list of image paths is sent to the worker pool via a buffered input channel (inThumb).
*   Worker Pool: n worker goroutines process the paths concurrently. Each worker simulates generating a thumbnail for an image.


* Backpressure: The input channel is buffered, so if the buffer is full, the producer will block until there’s space.

* Graceful Shutdown: The producer closes the input channel once all tasks are sent, signaling the workers to finish. Workers close the output channel after processing is done.

Key features:

* Context cancellation ensures that workers stop when required.

* Errors in thumbnail creation are simulated and handled.

**Part B: Rate-Limited Fetcher**

In this part, an HTTP client simulates fetching URLs at a controlled rate using a token bucket algorithm. The fetcher respects rate limits and retries failed requests with exponential backoff.

* Token Bucket: A token bucket is created with a specified rate (requests per second) and burst capacity. Each fetcher waits for a token before making an HTTP request.

* Retry with Exponential Backoff: Failed fetch requests are retried with an increasing delay (with optional jitter).

* Graceful Shutdown: Fetchers stop when the input channel is closed or the context is canceled.

Key features:

* Context cancellation allows early termination.

* Retries failed requests with exponential backoff and optional jitter.

* Fetch results are processed in a consumer goroutine.

##How It Works

**1.Thumbnail Worker Pool:**

* A producer goroutine simulates paths of image files and sends them to a buffered channel (inThumb).

* Multiple worker goroutines fetch paths from this channel, simulate thumbnail creation, and send results to an output channel (outThumb).

* The input channel is closed after all paths are sent, signaling the workers to finish processing.

* The results (thumbnails or errors) are printed by a consumer goroutine.

**2.Rate-Limited Fetcher:**

* A producer goroutine enqueues URLs to be fetched onto a buffered channel (inFetch).

* A token bucket channel is used to limit the rate of HTTP requests.

* Multiple fetcher goroutines wait for tokens before performing HTTP requests.

* The fetchers respect context cancellation and retry failed requests with exponential backoff.