# Assignment 10: Parallel Computing

## Due 27 June 2025

### Introduction

This assignment is about parallel computing with Dask. You should use Python to implement the calculations. If possible, please submit your answers in PDF or HTML format. Also, please use the virtual environment we created in class to avoid dependency issues.

1. Explain the concept of "overhead" in parallel computing with `joblib`. Why might running a very simple task (like adding 1 to a number) in parallel with `joblib` be slower than running it serially?

Overhead in parallel computing pertains to the processes needed to set up parallel computing. Parallel computing may be faster in more computationally intensive tasks as the overhead time only represents a small fraction of the total time required to run the whole code. However, in simpler tasks, overhead now constitutes a larger fraction of the total time relative to the time it takes to perform the computation itself.

1. Write a Python function `count_vowels(text)` that counts the vowels (a, e, i, o, u, case-insensitive) in a given string. Then, use the `Parallel` and `delayed` functions from the `joblib` library to apply your function in parallel. Use all available cores.  The function should return a list of integers, where each integer corresponds to the number of vowels in the respective sentence.

```python
sentences = [
    "Joblib makes parallel computing easy",
    "Dask scales Python code effectively",
    "Parallelism can speed up computations",
    "Always consider the overhead"
]
```

In [1]:
## Your answer here
from joblib import Parallel, delayed

sentences = [
    "Joblib makes parallel computing easy",
    "Dask scales Python code effectively",
    "Parallelism can speed up computations",
    "Always consider the overhead"
]

def count_vowels(text):
    count = 0
    for i in range(0, len(text)):
        if text[i] in ["a", "e", "i", "o", "u", "A", "E", "I", "O", "U"]:
            count = count + 1
    return count

Parallel(n_jobs=6)(delayed(count_vowels)(sentence) for sentence in sentences)

[12, 10, 13, 10]

3. Write a function called `get_length` that takes a word as input and returns its length. Then, using the provided list `words`, do the following:

* Use a standard (sequential) for loop to calculate the length of each word by calling your function.
* Use the `joblib` library to calculate the length of each word in parallel, also calling your function. Use `Parallel` and `delayed` from `joblib` again.
* Compare the syntax of the sequential and parallel approaches. How do they differ when writing the loop?

```python
words = ["joblib", "parallel", "computing", "example"]
```

In [2]:
## Your answer here

words = ["joblib", "parallel", "computing", "example"]

def get_length(word):
    return len(word)

for word in words:
    get_length(word)

Parallel(n_jobs=6)(delayed(get_length)(word) for word in words)

# Though it uses 2 lines, the sequential code is easier to read. The parallel code, while only being 1 line, has more parameters and is harder to read.

[6, 8, 9, 7]

3. Create a 10000x10000 Dask array `da_a` filled with random integers between 0 and 100, chunked into (500, 1000) blocks. Use `RandomState(350)` to make your code reproducible. Create a second Dask array `da_b` of the same shape and chunks, filled with ones. Compute `da_c = (da_a + da_b) * 2` and its mean value.

In [3]:
## Your answer here

import numpy as np
import dask.dataframe as dd
import dask.array as da

seed = np.random.RandomState(350)
numbers = seed.randint(100, size = (10000,10000))
da_a = da.from_array(numbers, chunks=(500,1000))

ones = np.full((10000,10000), 1)
da_b = da.from_array(ones, chunks=(500,1000))

da_c = (da_a + da_b) * 2
da_c.mean().compute()

100.99471242

4. What is the difference between `dask.dataframe.compute()` and `dask.dataframe.persist()`? When would you typically use `.persist()`?

Your answer here.

Both `dask.dataframe.compute()` and `dask.dataframe.persist()` can be used to make a computation on a dask dataframe. However, `.compute()` merely returns the computation while `.persist()` keeps the result of the computation within memory, which is akin to creating a new variable as it can be useful if you want to reference that result later instead of having to make the entire computation again. 

5. In this question, you will compare the performance of a regular `for` loop and `dask` for a simple computation. First, create a function called `intensive_task` as follows:

```python
import numpy as np
import time
import dask

def intensive_task(n):
    loop_limit = 10_000_000 # How many iterations inside the function
    total = 0
    for i in range(loop_limit):
        total += i*i
    return total
```

Then, create a list called `inputs` with 6 values:

```python
inputs = [1, 2, 3, 4, 5, 6] 
```

Now, use the function `time.time()` to measure the time it takes to run the function `intensive_task` for each value in the list `inputs` using a regular `for` loop. Store the results in a list called `results`. Remember to create the `start_time` and `end_time` variables to measure the time taken for the computation. The result, which is the difference between `end_time` and `start_time`, should be printed.

Repeat the same task using `dask`. However, instead of using the `@dask.delayed` decorator, use the code below:

```python
tasks = [dask.delayed(intensive_task)(i) for i in inputs]
```

Then, use `dask.compute()` to compute the results. Again, measure the time taken for the computation and print the result. Which one is faster?

In [4]:
## Your answer here

import numpy as np
import time
import dask

def intensive_task(n):
    loop_limit = 10_000_000 # How many iterations inside the function
    total = 0
    for i in range(loop_limit):
        total += i*i
    return total

inputs = [1, 2, 3, 4, 5, 6] 

start = time.time()
results = []
for i in inputs:
    results.append(intensive_task(i))
print(time.time()-start)

start2 = time.time()
dask.compute([dask.delayed(intensive_task)(i) for i in inputs])
print(time.time()-start2)

2.6655237674713135
2.662086248397827


6. In the same folder as this notebook, you will find a Parquet file named `data.parquet`. It is available here: <https://github.com/danilofreire/qtm350-summer/blob/main/assignments/data.parquet>. This file contains student records with the following columns:

* `emory_id` (integer) 
* `student_name` (string)
* `major` (string)
* `gpa` (float)

Write Python code using `dask.dataframe` to read the `data.parquet` file, but only load the `major` and `gpa` columns. Then, print the first 5 rows of the resulting Dask DataFrame using the `.head()` method, and calculate the average GPA by major.

You will need a Parquet engine to read the file. If you don't have one installed, you can use `pyarrow`. You can install it using conda (or pip):

```bash
conda install pyarrow
```

In [5]:
## Your answer here
import dask

data = dask.dataframe.read_parquet('data.parquet', columns = ("major", "gpa"))
print(data.head())
data_grouped = data.groupby("major").agg(avg_gpa=("gpa", "mean")).compute()
print(data_grouped.head())

       major   gpa
0    History  2.98
1  Chemistry  3.16
2  Chemistry  3.83
3        QTM  3.67
4    CompSci  3.33
            avg_gpa
major              
History    3.098750
Chemistry  3.320000
QTM        2.957500
CompSci    3.352857
Biology    3.044000


7. You have two CSV files in this directory:

* `students.csv`: Contains columns `student_id`, `student_name`. Available here: <https://github.com/danilofreire/qtm350-summer/blob/main/assignments/students.csv>.
* `grades.csv`: Contains columns `student_id`, `course`, `grade`. Available here: <https://github.com/danilofreire/qtm350-summer/blob/main/assignments/grades.csv>.

Write Python code using dask.dataframe to:

* Read `students.csv` into a Dask DataFrame called `ddf_students`.
* Read `grades.csv` into a Dask DataFrame called `ddf_grades`.
* Merge these two DataFrames together based on the common `student_id` column. An inner merge is recommended (only include students present in both files).
* From the merged DataFrame, select only the `student_name`, `course`, and `grade` columns. Save it as `ddf_final`.
* Compute and print the first 5 rows of this final merged DataFrame using `.head()`.

In [17]:
import dask.dataframe as dd
from dask_sql import Context

ddf_students = dd.read_csv("students.csv")
ddf_grades = dd.read_csv("grades.csv")
merged = ddf_students.merge(ddf_grades, on='student_id', how='inner').compute()
ddf_final = merged[['student_name','course','grade']]
ddf_final.head()

Unnamed: 0,student_name,course,grade
0,Alice,QTM100,A
1,Alice,QTM200,B
2,Bob,QTM100,B
3,Bob,QTM300,C
4,Charlie,QTM100,A


Good luck! 😃