Skip to content

Latest commit

 

History

History
731 lines (606 loc) · 14 KB

go-ztm-concurrency-exercises.md

File metadata and controls

731 lines (606 loc) · 14 KB

ZTM concurrency exercises

Functional literals

//--Summary:
//  Create a program that can create a report of rune information from
//  lines of text.
//
//--Requirements:
//* Create a single function to iterate over each line of text that is
//  provided in main().
//  - The function must return nothing and must execute a closure
//* Using closures, determine the following information about the text and
//  print a report to the terminal:
//  - Number of letters
//  - Number of digits
//  - Number of spaces
//  - Number of punctuation marks
//
//--Notes:
//* The `unicode` stdlib package provides functionality for rune classification

package main

import (
	"fmt"
	"unicode"
)

type LineCallback func(line string)

func lineIterator(lines []string, op LineCallback) {
	for _, l := range lines {
		op(l)
	}
}

func main() {
	lines := []string{
		"There are",
		"68 letters,",
		"five digits,",
		"12 spaces,",
		"and 4 punctuation marks in these lines of text!",
	}

	var (
		letters     = 0
		digits      = 0
		spaces      = 0
		punctuation = 0
	)
	analyseLine := func(line string) {
		for _, r := range line {
			switch {
			case unicode.IsLetter(r):
				letters++
			case unicode.IsDigit(r):
				digits++
			case unicode.IsSpace(r):
				spaces++
			case unicode.IsPunct(r):
				punctuation++
			}
		}
	}

	lineIterator(lines, analyseLine)

	fmt.Printf("Counts:\nLetters: %d\nDigits: %d\nSpaces: %d\nPunctuation: %d\n", letters, digits, spaces, punctuation)
}

Goroutines

//--Summary:
//  Create a program to read a list of numbers from multiple files,
//  sum the total of each file, then sum all the totals.
//
//--Requirements:
//* Sum the numbers in each file noted in the main() function
//* Add each sum together to get a grand total for all files
//  - Print the grand total to the terminal
//* Launch a goroutine for each file
//* Report any errors to the terminal
//
//--Notes:
//* This program will need to be ran from the `lectures/exercise/goroutines`
//  directory:
//    cd lectures/exercise/goroutines
//    go run goroutines
//* The grand total for the files is 4103109
//* The data files intentionally contain invalid entries
//* stdlib packages that will come in handy:
//  - strconv: parse the numbers into integers
//  - bufio: read each line in a file
//  - os: open files
//  - io: io.EOF will indicate the end of a file
//  - time: pause the program to wait for the goroutines to finish

package main

import (
	"bufio"
	"fmt"
	"os"
	"strconv"
	"time"
)

func sumFile(scanner *bufio.Scanner) int {
	sum := 0
	for scanner.Scan() {
		num, err := strconv.Atoi(scanner.Text())
		if err == nil {
			sum += num
		}
	}
	return sum
}

func main() {
	files := []string{"num1.txt", "num2.txt", "num3.txt", "num4.txt", "num5.txt"}

	sum := 0
	for _, file := range files {
		f, _ := os.Open(file)
		defer f.Close()
		scanner := bufio.NewScanner(f)
		go func() {
			sum += sumFile(scanner)
		}()
	}
	time.Sleep(time.Millisecond * 300)
	fmt.Println("sum", sum)
}

Channels

//--Summary:
//  Create a program that utilizes goroutines to run the provided calculation
//  function on a number of jobs. The results from the goroutines must be
//  communicated back to the main thread using a channel, and then added
//  together.
//
//--Requirements:
//* Run `longCalculation` for each job generated by the `makeJobs` function
//* Each job must be run in a separate goroutine
//* The result from `longCalculation` must be provided to the main function
//  using a channel
//* Sum the results from each job to generate a final result, and print it
//  to the terminal

package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Job int

func longCalculation(i Job) int {
	duration := time.Duration(rand.Intn(1000)) * time.Millisecond
	time.Sleep(duration)
	fmt.Printf("Job %d complete in %v\n", i, duration)
	return int(i) * 30
}

func makeJobs() []Job {
	jobs := make([]Job, 0, 100)
	for i := 0; i < 100; i++ {
		jobs = append(jobs, Job(rand.Intn(10000)))
	}
	return jobs
}

func runJob(resultChan chan<- int, i Job) {
	resultChan <- longCalculation(i)
}

func main() {
	rand.New(rand.NewSource(time.Now().UnixNano()))
	jobs := makeJobs()

	result := make(chan int)

	for _, job := range jobs {
		go runJob(result, job)
	}

	resultCount := 0
	sum := 0
	for resultCount < len(jobs) {
		sum += <-result
		resultCount++
	}
	fmt.Println("sum", sum)
	fmt.Println("resultCount", resultCount)
}

Synchronization

//--Summary:
//  Create a program that can read text from standard input and count the
//  number of letters present in the input.
//
//--Requirements:
//* Count the total number of letters in any chosen input
//* The input must be supplied from standard input
//* Input analysis must occur per-word, and each word must be analyzed
//  within a goroutine
//* When the program finishes, display the total number of letters counted
//
//--Notes:
//* Use CTRL+D (Mac/Linux) or CTRL+Z (Windows) to signal EOF, if manually
//  entering data
//* Use `cat FILE | go run ./exercise/sync` to analyze a file
//* Use any synchronization techniques to implement the program:
//  - Channels / mutexes / wait groups

package main

import (
	"bufio"
	"fmt"
	"os"
	"strings"
	"sync"
	"time"
	"unicode"
)

type counter struct {
	count int
	sync.Mutex
}

func splitLine(line string) []string {
	return strings.Split(line, " ")
}

func count(wg *sync.WaitGroup, counter *counter, word string) {
	counter.Lock()
	defer wg.Done()
	defer counter.Unlock()

	count := 0
	for _, r := range word {
		if unicode.IsLetter(r) {
			count++
		}
	}
	counter.count += count
	fmt.Printf("word: %s, lettercount: %d\n", word, count)
}

func main() {
	start := time.Now()
	scanner := bufio.NewScanner(os.Stdin)

	var wg sync.WaitGroup
	var counter counter

	for scanner.Scan() {
		for _, word := range splitLine(scanner.Text()) {
			wg.Add(1)
			w := word
			go count(&wg, &counter, w)
		}
	}

	wg.Wait()

	var total int
	counter.Lock()
	total = counter.count
	counter.Unlock()
	fmt.Println("total", total)
	fmt.Println("duration", time.Since(start))
}

Pipelines

// decode base64 image
package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"image"
	_ "image/gif"
	_ "image/jpeg"
	_ "image/png"
	"log"
	"os"
	"strings"

	"github.com/chai2010/webp"
	"github.com/google/uuid"
)

func makeWork(base64Images ...string) <-chan string {
	out := make(chan string)

	go func() {
		for _, encodedImg := range base64Images {
			out <- encodedImg
		}
		close(out)
	}()

	return out
}

func pipeline[I any, O any](input <-chan I, process func(I) O) <-chan O {
	out := make(chan O)

	go func() {
		for in := range input {
			out <- process(in)
		}
		close(out)
	}()
	return out
}

func base64ToRawImage(base64Img string) image.Image {
	reader := base64.NewDecoder(base64.StdEncoding, strings.NewReader(base64Img))

	img, _, err := image.Decode(reader)
	if err != nil {
		log.Fatal(err)
	}
	return img
}

func encodeToWebp(img image.Image) bytes.Buffer {
	var buf bytes.Buffer
	if err := webp.Encode(&buf, img, &webp.Options{Lossless: true}); err != nil {
		log.Fatal(err)
	}
	return buf
}

func saveToDisk(imgBuf bytes.Buffer) string {
	filename := fmt.Sprintf("%v.webp", uuid.New().String())
	os.WriteFile(filename, imgBuf.Bytes(), 0644)
	return filename
}

func main() {
	// load data into pipeline
	base64Images := makeWork(img1, img2, img3)

	// decode base64 into image format
	rawImages := pipeline(base64Images, base64ToRawImage)

	// encode as webp
	webpImages := pipeline(rawImages, encodeToWebp)

	// save images to disk
	filenames := pipeline(webpImages, saveToDisk)

	for name := range filenames {
		fmt.Println(name)
	}
}

Pipelines with cancellation (quit)

package main

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"image"
	_ "image/gif"
	_ "image/jpeg"
	_ "image/png"
	"log"
	"os"
	"strings"

	"github.com/chai2010/webp"
	"github.com/google/uuid"
)

func makeWork(base64Images ...string) <-chan string {
	// create output channel
	out := make(chan string)

	// spawn goroutine so we don't need to wait
	go func() {
		for _, encodedImg := range base64Images {
			out <- encodedImg
		}
		// use `close` to indicate that nothing
		// else will be sent on the channel
		close(out)
	}()

	// return the output channel, which will be populated
	// with the images by the goroutine
	return out
}

func pipeline[I any, O any](quit <-chan struct{}, input <-chan I, process func(I) O) <-chan O {
	out := make(chan O)
	go func() {
		defer close(out)
		for in := range input {
			select {
			case out <- process(in):
			case <-quit:
				return
			}
		}
	}()
	return out
}

func base64ToRawImage(base64Img string) image.Image {
	// we decode the encoded Base64 image to an image.Image
	reader := base64.NewDecoder(base64.StdEncoding, strings.NewReader(base64Img))
	// second return value is the type of image, we don't need it for this demo
	img, _, err := image.Decode(reader)
	if err != nil {
		log.Fatal(err)
	}
	return img
}

func encodeToWebp(img image.Image) bytes.Buffer {
	var buf bytes.Buffer
	if err := webp.Encode(&buf, img, &webp.Options{Lossless: true}); err != nil {
		log.Fatal(err)
	}
	return buf
}

func saveToDisk(imgBuf bytes.Buffer) string {
	filename := fmt.Sprintf("%v.webp", uuid.New().String())
	os.WriteFile(filename, imgBuf.Bytes(), 0644)
	return filename
}

func main() {
	base64Images := makeWork(img1, img2, img3)

	quit := make(chan struct{})
	var signal struct{}

	rawImages := pipeline(quit, base64Images, base64ToRawImage)
	webpImages := pipeline(quit, rawImages, encodeToWebp)

	quit <- signal

	filenames := pipeline(quit, webpImages, saveToDisk)
	for name := range filenames {
		fmt.Println(name)
	}
}

Pipeline fan-in

package main

// We are starting with the same code from the
// `pipeline` demo.

import (
	"bytes"
	"encoding/base64"
	"fmt"
	"image"
	_ "image/gif"
	_ "image/jpeg"
	_ "image/png"
	"log"
	"os"
	"strings"
	"sync"

	"github.com/chai2010/webp"
	"github.com/google/uuid"
)

func makeWork(base64Images ...string) <-chan string {
	// create output channel
	out := make(chan string)

	// spawn goroutine so we don't need to wait
	go func() {
		for _, encodedImg := range base64Images {
			out <- encodedImg
		}
		// use `close` to indicate that nothing
		// else will be sent on the channel
		close(out)
	}()

	// return the output channel, which will be populated
	// with the images by the goroutine
	return out
}

func pipeline[I any, O any](input <-chan I, process func(I) O) <-chan O {
	out := make(chan O)
	go func() {
		for in := range input {
			out <- process(in)
		}
		close(out)
	}()
	return out
}

func base64ToRawImage(base64Img string) image.Image {
	// we decode the encoded Base64 image to an image.Image
	reader := base64.NewDecoder(base64.StdEncoding, strings.NewReader(base64Img))
	// second return value is the type of image, we don't need it for this demo
	img, _, err := image.Decode(reader)
	if err != nil {
		log.Fatal(err)
	}
	return img
}

func encodeToWebp(img image.Image) bytes.Buffer {
	var buf bytes.Buffer
	if err := webp.Encode(&buf, img, &webp.Options{Lossless: true}); err != nil {
		log.Fatal(err)
	}
	return buf
}

func saveToDisk(imgBuf bytes.Buffer) string {
	filename := fmt.Sprintf("%v.webp", uuid.New().String())
	os.WriteFile(filename, imgBuf.Bytes(), 0644)
	return filename
}

func fanIn[T any](channels ...<-chan T) <-chan T {
	var wg sync.WaitGroup
	out := make(chan T)
	wg.Add(len(channels))

	for _, ch := range channels {
		go func(in <-chan T) {
			for i := range in {
				out <- i
			}
			wg.Done()
		}(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	base64Images := makeWork(img1, img2, img3)

	// stage 1
	rawImages1 := pipeline(base64Images, base64ToRawImage)
	rawImages2 := pipeline(base64Images, base64ToRawImage)
	rawImages3 := pipeline(base64Images, base64ToRawImage)
	rawImages := fanIn(rawImages1, rawImages2, rawImages3)

	// stage 2
	webpImages1 := pipeline(rawImages, encodeToWebp)
	webpImages2 := pipeline(rawImages, encodeToWebp)
	webpImages3 := pipeline(rawImages, encodeToWebp)
	webpImages := fanIn(webpImages1, webpImages2, webpImages3)

	filenames1 := pipeline(webpImages, saveToDisk)
	filenames2 := pipeline(webpImages, saveToDisk)
	filenames3 := pipeline(webpImages, saveToDisk)
	filenames := fanIn(filenames1, filenames2, filenames3)

	for name := range filenames {
		fmt.Println(name)
	}
}

Generator

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generateRandIntn(count, min, max int) <-chan int {
	out := make(chan int, 1)

	go func() {
		for i := 0; i < count; i++ {
			out <- rand.Intn(max-min+1) + min
		}
		close(out)
	}()

	return out
}

func generateRandInt(min, max int) <-chan int {
	out := make(chan int, 3)

	go func() {
		for {
			out <- rand.Intn(max-min+1) + min
		}
	}()

	return out
}

func main() {
	rand.New(rand.NewSource(time.Now().UnixNano()))

	randInt := generateRandInt(1, 100)

	fmt.Println("generateRandInt infinite")
	fmt.Println(<-randInt)
	fmt.Println(<-randInt)
	fmt.Println(<-randInt)
	fmt.Println(<-randInt)
	fmt.Println(<-randInt)

	randIntn := generateRandIntn(2, 1, 10)

	fmt.Println("generateRandInt n")
	for i := range randIntn {
		fmt.Println("randIntn:", i)
	}

	randIntn2 := generateRandIntn(3, 1, 10)
	for {
		n, open := <-randIntn2
		if !open {
			break
		}
		fmt.Println("randIntn2:", n)
	}
}

Context

package main

import (
	"context"
	"fmt"
	"time"
)

func sampleOperation(ctx context.Context, msg string, msDelay time.Duration) <-chan string {
	out := make(chan string)

	go func() {
		for {
			select {
			case <-time.After(msDelay * time.Millisecond):
				out <- fmt.Sprintf("%v operation completed", msg)
				return
			case <-ctx.Done():
				out <- fmt.Sprintf("%v aborted", msg)
				return
			}
		}
	}()
	return out
}

func main() {
	ctx := context.Background()
	ctx, cancelCtx := context.WithCancel(ctx)

	webserver := sampleOperation(ctx, "webserver", 200)
	microservice := sampleOperation(ctx, "microservice", 500)
	database := sampleOperation(ctx, "database", 900)

MainLoop:
	for {
		select {
		case val := <-webserver:
			fmt.Println(val)
		case val := <-microservice:
			fmt.Println(val)
			fmt.Println("cancel context")
			cancelCtx()
			break MainLoop
		case val := <-database:
			fmt.Println(val)
		}
	}

	fmt.Println(<-database)
}