<a href="https://colab.research.google.com/gist/alexandralht0413/33f5612a9ee96406bacf95497ed39e9d/assignment-4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 4
## Understaning scaling of linear algebra operations on Apache Spark using Apache SystemML

In this assignment we want you to understand how to scale linear algebra operations from a single machine to multiple machines, memory and CPU cores using Apache SystemML. Therefore we want you to understand how to migrate from a numpy program to a SystemML DML program. Don't worry. We will give you a lot of hints. Finally, you won't need this knowledge anyways if you are sticking to Keras only, but once you go beyond that point you'll be happy to see what's going on behind the scenes.

So the first thing we need to ensure is that we are on the latest version of SystemML, which is 1.2.0:

The steps are:
- pip install
- start execution at the cell with the version - check

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))
if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')

In [3]:
!pip install pyspark==3.3.0



In [4]:
!pip install systemds==2.2.1

Collecting systemds==2.2.1
  Downloading systemds-2.2.1-py3-none-any.whl (50.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.9/50.9 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: systemds
Successfully installed systemds-2.2.1


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/08 01:52:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import numpy as np
u = np.random.rand(1000,10000)
s = np.random.rand(10000,1000)
w = np.random.rand(1000,1000)

Now we implement a short one-liner to define a very simple linear algebra operation

In case you are unfamiliar with matrxi-matrix multiplication: https://en.wikipedia.org/wiki/Matrix_multiplication

sum(U' * (W . (U * S)))


| Legend        |            |   
| ------------- |-------------|
| '      | transpose of a matrix |
| * | matrix-matrix multiplication      |  
| . | scalar multiplication      |   



In [7]:
import time
start = time.time()
res = np.sum(u.T.dot(w * u.dot(s)))
print (time.time()-start)

1.2109904289245605


As you can see this executes perfectly fine. Note that this is even a very efficient execution because numpy uses a C/C++ backend which is known for it's performance. But what happens if U, S or W get such big that the available main memory cannot cope with it? Let's give it a try:

In [None]:
#u = np.random.rand(10000,100000)
#s = np.random.rand(100000,10000)
#w = np.random.rand(10000,10000)

After a short while you should see a memory error. This is because the operating system process was not able to allocate enough memory for storing the numpy array on the heap. Now it's time to re-implement the very same operations in SystemDS, and this is your task. Just replace all ###your_code_goes_here sections with proper code, please consider the following table which contains all syntax you need:

| Syntax        |            |   
| ------------- |-------------|
| M.t()      | transpose of a matrix, where M is the matrix |
| `M.__matmul__(N)` | matrix-matrix multiplication between M and N      |  
| M * N | scalar multiplication between matrix M and N     |   


## Task

We use SystemDSContext to interface with Apache SystemDS (formerly SystemML). Note that we passed a SparkSession object as parameter to SystemDSContext so now it knows how how to talk to the Apache Spark cluster

In [8]:
# Option 1 Eldo - FAILED, USING (U.t().__matmul__(W.__matmul__(U.__matmul__(S)))).sum()  
from systemds.context import SystemDSContext
with SystemDSContext(spark) as sds:
    # Now we create some large random matrices to have numpy and SystemDS crunch on it
    U = sds.rand(rows=1000,cols=10000)
    S = sds.rand(rows=10000,cols=1000)
    W = sds.rand(rows=1000,cols=1000)
    # res = (U.###your_code_goes_here.###your_code_goes_here((W * (U.###your_code_goes_here(S))))).sum()
    res = (U.t().__matmul__(W.__matmul__(U.__matmul__(S)))).sum()           
    print(res.compute())

6244508396425750.0


In [18]:
# Option 2 Eldo - FAILED, USING Matrix-Matrix Multiplication , Transpose & Scalar Multiplication
from systemds.context import SystemDSContext

with SystemDSContext(spark) as sds:
    # Create large random matrices
    U = sds.rand(rows=1000, cols=10000)
    S = sds.rand(rows=10000, cols=1000)
    W = sds.rand(rows=1000, cols=1000)

    # Define the expression
    part1 = U.__matmul__(S)  # Matrix-matrix multiplication
    part2 = W * part1  # Scalar multiplication
    res = U.t().__matmul__(part2)  # Transpose and matrix-matrix multiplication

    # Compute the result
    result = res.sum()

    # Print the result
    print(result.compute())

6251130091127.43


In [15]:
# Option 3 Eldo - FAILED, USING U.t() @ (W @ (U @ S))
from systemds.context import SystemDSContext
import numpy as np
with SystemDSContext(spark) as sds:
    # Create large random matrices
    U = sds.rand(rows=1000, cols=10000)
    S = sds.rand(rows=10000, cols=1000)
    W = sds.rand(rows=1000, cols=1000)
    
    # Define the expression
    expr = U.t() @ (W @ (U @ S))
   
    # Compute the result
    res = expr.sum()
    
    # Print the result
    print(res.compute())

6250550179480467.0


In [19]:
# Option 4 Eldo - FAILED, USING np.matmul(U_np.T, np.matmul(W_np, np.matmul(U_np, S_np))).sum()
from systemds.context import SystemDSContext
import numpy as np
with SystemDSContext(spark) as sds:
    # Create large random matrices
    U = sds.rand(rows=1000, cols=10000)
    S = sds.rand(rows=10000, cols=1000)
    W = sds.rand(rows=1000, cols=1000)

    # Convert U, S, and W to numpy arrays
    U_np = U.compute()
    S_np = S.compute()
    W_np = W.compute()

    # Perform matrix multiplications using numpy functions
    res = np.matmul(U_np.T, np.matmul(W_np, np.matmul(U_np, S_np))).sum()

    # Print the result
    print(res)

6256639508741154.0


In [16]:
# Option 5 Eldo - FAILED, USING (U @ (S @ W @ U)).sum() 
from systemds.context import SystemDSContext    
with SystemDSContext(spark) as sds:
    U = sds.rand(rows=1000,cols=10000)
    S = sds.rand(rows=10000,cols=1000)
    W = sds.rand(rows=1000,cols=1000)
    res = (U @ (S @ W @ U)).sum() 

    print (res.compute())

6253453282248347.0


In [47]:
#Option 6 Eldo - FAILED, using  (U.t() @ (W @ (U @ S))).sum()
from systemds.context import SystemDSContext
with SystemDSContext(spark) as sds:

    # Now we create some large random matrices to have numpy and SystemDS crunch on it

    U = sds.rand(rows=1000, cols=10000)
    S = sds.rand(rows=10000, cols=1000)
    W = sds.rand(rows=1000, cols=1000)

    expr = (U.t() @ (W @ (U @ S))).sum()
    # Compute the result
    res = expr
    # Print the result
    print(res.compute()) 

6249622544674791.0


In [None]:
#Option 7 Eldo - FAILED OUT OF MEMORY
from systemds.context import SystemDSContext

with SystemDSContext(spark) as sds:
    # Create large random matrices
    U = sds.rand(rows=1000, cols=10000)
    S = sds.rand(rows=10000, cols=1000)
    W = sds.rand(rows=1000, cols=1000)
    
    # Define the expression
    res = sum(U.t().__matmul__(W * (U.__matmul__(S))))
    
    # Print the result
    print(res.compute())


In order to show you the advantage of SystemML over numpy we've blown up the sizes of the matrices. Unfortunately, on a 1-2 worker Spark cluster it takes quite some time to complete. Therefore we've stripped down the example to smaller matrices below, but we've kept the code, just in case you are curious to check it out. But you might want to use some more workers which you easily can configure in the environment settings of the project within Watson Studio. Just be aware that you're currently limited to free 50 capacity unit hours per month wich are consumed by the additional workers.

To get consistent results we switch from a random matrix initialization to something deterministic

If everything runs fine you should get *6252492444241.075* as result (or something in that bullpark). Feel free to submit your solutionto the grader now!

### Submission

In [10]:
!rm -f rklib.py
!wget https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py

--2023-11-08 00:50:35--  https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2289 (2.2K) [text/plain]
Saving to: ‘rklib.py’


2023-11-08 00:50:35 (27.7 MB/s) - ‘rklib.py’ saved [2289/2289]



In [11]:
!pip install rklib

Collecting rklib
  Downloading rklib-0.1.0-py2.py3-none-any.whl (20 kB)
Collecting green
  Downloading green-3.4.3.tar.gz (73 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.3/73.3 kB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting colorama
  Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Collecting coverage
  Downloading coverage-7.3.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (227 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.5/227.5 kB[0m [31m43.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting unidecode
  Downloading Unidecode-1.3.7-py3-none-any.whl (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m51.1 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: green
  Building wheel for green (setup.py) ... [?25ldone
[?25h  Created wheel for green: filen

In [21]:
from rklib import submit
key = "esRk7vn-Eeej-BLTuYzd0g"
part = "fUxc8"

email = 'eldoma@gmail.com'

In [22]:
part = "fUxc8"
token = "ba7vmJwZpIwOKBU5" ###your_code_goes_here #you can obtain it from the grader page on Coursera (have a look here if you need more information on how to obtain the token https://youtu.be/GcDo0Rwe06U?t=276)
submit(email, token, key, part, [part], str(res).replace('\n','x'))

Submission successful, please check on the coursera grader page for the status
-------------------------
{"elements":[{"itemId":"P1p3F","id":"tE4j0qhMEeecqgpT6QjMdA~P1p3F~1H1qNn3cEe66DRLtNxmX4Q","courseId":"tE4j0qhMEeecqgpT6QjMdA"}],"paging":{},"linked":{}}
-------------------------
