## Assignment II: Data Structures and Methods for HPC

# Exercise 1 - Pytest with Julia Set Code

Julia code with assert statement removed and replaced with a return of output for the function.

In [115]:
"""Julia set generator without optional PIL-based image drawing"""
import time
from functools import wraps

# area of complex space to investigate
x1, x2, y1, y2 = -1.8, 1.8, -1.8, 1.8
c_real, c_imag = -0.62772, -.42193


# decorator to time
def timefn(fn):

    @wraps(fn)
    def measure_time(*args, **kwargs):
        t1 = time.time()
        result = fn(*args, **kwargs)
        t2 = time.time()
        print(f"@timefn: {fn.__name__} took {t2 - t1} seconds")
        return result

    return measure_time


def calc_pure_python(desired_width, max_iterations):
    """Create a list of complex coordinates (zs) and complex parameters (cs),
    build Julia set"""
    x_step = (x2 - x1) / desired_width
    y_step = (y1 - y2) / desired_width
    x = []
    y = []
    ycoord = y2
    while ycoord > y1:
        y.append(ycoord)
        ycoord += y_step
    xcoord = x1
    while xcoord < x2:
        x.append(xcoord)
        xcoord += x_step
    # build a list of coordinates and the initial condition for each cell.
    # Note that our initial condition is a constant and could easily be removed,
    # we use it to simulate a real-world scenario with several inputs to our
    # function
    zs = []
    cs = []
    for ycoord in y:
        for xcoord in x:
            zs.append(complex(xcoord, ycoord))
            cs.append(complex(c_real, c_imag))

    print("Length of x:", len(x))
    print("Total elements:", len(zs))
    start_time = time.time()
    output = calculate_z_serial_purepython(max_iterations, zs, cs)
    end_time = time.time()
    secs = end_time - start_time
    print(calculate_z_serial_purepython.__name__ + " took", secs, "seconds")
    
    print("the sum is: ", sum(output))
    
    return output


def calculate_z_serial_purepython(maxiter, zs, cs):
    """Calculate output list using Julia update rule"""
    output = [0] * len(zs)
    for i in range(len(zs)):
        n = 0
        z = zs[i]
        c = cs[i]
        while abs(z) < 2 and n < maxiter:
            z = z * z + c
            n += 1
        output[i] = n
    return output


if __name__ == "__main__":
    # Calculate the Julia set using a pure Python solution with
    # reasonable defaults for a laptop
    calc_pure_python(desired_width=1000, max_iterations=300) 

Length of x: 1000
Total elements: 1000000
calculate_z_serial_purepython took 1.8283319473266602 seconds
the sum is:  33219980


Pytest of the function which replaced the assert statment and accounts for multiple test cases and dimensions.

In [123]:
import pytest
# from JuliaSet import calc_pure_python


# @pytest.mark.parametrize('width, iterations, expected', [(1000, 300, 33219980), (10, 3, 177)])
# def test_calc_pure_python(width, iterations, expected):
#     output = calc_pure_python(width, iterations)
#     assert sum(output) == expected

To include unit tests with different numbers or iterations, an expected value is necessary but easily implemented in the above code as another tuple in the list. Add one tuple after the two given cases, right of the (10, 3, 177), and it would be able to test the sum of output given different inputs.

# Exercise 2 - Python DGEMM Benchmark Operation#

### Task 2.1 Implement the DGEMM with matrices as NumPy array

Original C code:

In [None]:
// Multiplying first and second matrices and storing it in result
   for (int i = 0; i < N; ++i) {
      for (int j = 0; j < N; ++j) {
         for (int k = 0; k < N; ++k) {
            C[i][j] = C[i][j] + A[i][k] * B[k][j];
         }
      }
   }

Python translation of the C code:

In [117]:
import numpy as np
import numexpr as ne
import timeit


seed = 1
N = 100
if seed is not None:
    np.random.seed(seed)

A = np.random.randint(-N,N,(N,N))
B = np.random.randint(-N,N,(N,N))
C = np.random.randint(-N,N,(N,N))

# print("A is : \n", A)
# print("B is : \n", B)
# print("C is : \n", C)
@timer
def for_loop_matrix_multiplication(A, B, C, N):
    start = timeit.default_timer()
    for i in range(N):
        for j in range(N):
            for k in range(N):
                C[i][j] += A[i][k] * B[k][j]
    end = timeit.default_timer()
    
    return (C, end-start)

(output, for_loop_matrix_time) = for_loop_matrix_multiplication(A, B, C, N)
print("Time taken for normal multiplication: ", for_loop_matrix_time)
# print("Correct is: \n", output)

Time taken for normal multiplication:  0.3127814170002239


In [130]:
@timer
def numpy_matrix_multiplication(A, B, C):
    start = timeit.default_timer()
    C += np.dot(A, B)
    end = timeit.default_timer()
    return (C, end-start)

(output, np_matrix_time) = numpy_matrix_multiplication(A, B, C)
print("Time taken for normal multiplication: ", np_matrix_time)
# print("Correct is: \n", output)

Time taken for normal multiplication:  0.0006760419928468764


Below is the PyTest for the numpy matrix multiplication. The for-loop solution is seen as the correct solution for all cases:

In [153]:
import ipytest
ipytest.autoconfig()

# %%ipytest -v

@pytest.mark.parametrize('seed, size', [(1,10), (1,50), (1, 100), (2,10)])
def test_numpy_matrix_multiplication(seed=1, size=100):
    N = size
    np.random.seed(seed)
    A = np.random.randint(-N,N,(N,N))
    B = np.random.randint(-N,N,(N,N))
    C = np.random.randint(-N,N,(N,N))

    assert np.array_equal(
    numpy_matrix_multiplication(A, B, C)[0],
    for_loop_matrix_multiplication(A, B, C, N)[0]), f"Mismatch for seed={seed}, size={size}"

test_numpy_matrix_multiplication(1, 10)
test_numpy_matrix_multiplication(1, 50)
test_numpy_matrix_multiplication(1, 100)
test_numpy_matrix_multiplication(2, 10)
test_numpy_matrix_multiplication(2, 50)
test_numpy_matrix_multiplication(2, 100)

UsageError: Line magic function `%%ipytest` not found.


Below is the code which measured the execution time for the NumPY multiplication function and for-loop function. This is the same code for our assignment 1 decorator.py 

In [None]:
import timeit
from functools import wraps
from typing import Callable


def timer(func: Callable) -> Callable:

    @wraps(func)
    def wrapper(*args, **kwargs):
        arr = np.zeros((10,))
        result = None
        for n in range(10):
            start = timeit.default_timer()
            result = func(*args, **kwargs)
            end = timeit.default_timer()
            arr[n] = end - start
        print(f"Function: {func.__name__}")
        print(f"Average Execution Time: {arr.mean():.6f} seconds")
        print(f"Standard Deviation: {arr.std():.6f} seconds")
        return result

    return wrapper

In [151]:

N = 10
A1 = np.random.randint(-N,N,(N,N))
B1 = np.random.randint(-N,N,(N,N))
C1 = np.random.randint(-N,N,(N,N))

N = 25
A2 = np.random.randint(-N,N,(N,N))
B2 = np.random.randint(-N,N,(N,N))
C2 = np.random.randint(-N,N,(N,N))

N = 50
A3 = np.random.randint(-N,N,(N,N))
B3 = np.random.randint(-N,N,(N,N))
C3 = np.random.randint(-N,N,(N,N))


