# STA 141B Lecture 18
March 02, 2022

### Annoucement
* Final presentation
* Presentation grading link [here](https://docs.google.com/forms/d/e/1FAIpQLSfNJ5o9Ye26MUthZoGpVsxb6si1O1lruWh9CtNbZTHZFGd4vg/viewform)
* Please complete the course evaluation survey. 
  * For the question in the survey "provide additional comments below", I __highly encourage__ you to comment on 
      - which part of the course you __liked the most__ and would like me to keep for teaching this course in the future;
      - which part of the course you __did not like__ and think can be changed in the future.
  
### Topics

* Parallel computing in python
* Spark
  
  

### Parallel computing in python

#### Computer architecture

* Each computer can have multiple CPUs, each CPU has multiple cores (e.g., two quad-core CPUs)
* All the CPUs are connected to memory (e.g., 64G memory)
* CPU cores can execute in parallel



<div>
<img src="fig/fig1.png" alt="Drawing" style="width: 500px;"/>
</div>

#### Multicore programming

* Execute tasks simultaneously on many CPU cores

<div>
<img src="fig/fig2.png" alt="Drawing" style="width: 800px;"/>
</div>

A __thread__ is a path of execution within a process. A process can contain multiple threads.

* In single-threaded processes, the process contains one thread.

* In multithreaded processes, the process contains more than one thread, and the process is accomplishing a number of things at the same time. 

* Threads are sometimes called lightweight processes because they have their own stack but can access shared data. 

<div>
<img src="fig/fig3.png" alt="Drawing" style="width: 500px;"/>
</div>

A __processes__ are "share nothing", which are independent executing without sharing memory or state. This makes it easier to turn into a distributed application.

<div>
<img src="fig/fig4.png" alt="Drawing" style="width: 500px;"/>
</div>

#### Python thread

* In python, package "threading", which allows you to have different parts of your program run concurrently and can simplify your design. [Reference](https://realpython.com/intro-to-python-threading/#:~:text=Python%20threading%20allows%20you%20to,this%20tutorial%20is%20for%20you!)
* Unfortunately, python only allows a single thread to be executing at once - due to GIL (global interpreter lock)
* Usually no or little speedup - only useful when you want to interleave I/O and CPU execution

#### Python process

* Package [multiprocessing](https://docs.python.org/3/library/multiprocessing.html)
* You can create multiple processes
    - Automatically run on multiple CPU cores
    - Default no shared memory, each process has its own memory space (larger memory overhead)
    - Can declare some part of memory to be shared (but often harder to use)
* You can also check some tutorials: [T1](http://sebastianraschka.com/Articles/2014_multiprocessing.html) [T2](https://pymotw.com/2/multiprocessing/basics.html)    
    

In [1]:
import multiprocessing as mp

mp.set_start_method("fork")

In [2]:
def helloworld(x):
    print('Hello World %d\n'%x)

# set up a list of processes
plist = []

for x in range(4):
    plist.append(mp.Process(target=helloworld, args = (x, )))

# Run processes
for p in plist:
    p.start()

# exist the completed processes
for p in plist:
    p.join()

Hello World 0

Hello World 1
Hello World 2


Hello World 3



Multiprocessing supports two types of communication channels between processes:
* Queue
* Pool

#### Queue

* The Queue class in multiprocessing module of Python Standard Library provides a mechanism to pass data between a parent process and the descendent processes of it.
* mp.Queue is a concurrent and "first in first out" data structure

<div>
<img src="fig/fig5.png" alt="Drawing" style="width: 500px;"/>
</div>

* Can be used to communicate, or gather the results from the processes
* Queue.put(): insert an object to the end of queue
* Queue.get(): remove the first element in the queue
* Queue.empty: check whether the queue is empty

In [3]:
def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__' :
    q = mp.Queue()
    p = mp.Process(target = f, args = (q, ))
    p.start()
    print(q.get()) 
    p.join()

[42, None, 'hello']


"if \_\_name\_\_ == '\_\_main\_\_'" is used to allow or prevent parts of code from being run when the modules are imported. When the Python interpreter reads a file, the \_\_name\_\_ variable is set as \_\_main\_\_ if the module being run, or as the module's name if it is imported. 

See more details [here](https://stackoverflow.com/questions/419163/what-does-if-name-main-do).

#### Pool

* mp.pool is another and more convenient approach for parallel processing in python.
* Use mp.Pool(processes=4) to create 4 processes
* Use $[r_1, r_2, \dots, r_k]$ = pool.map(f, $[x_1, x_2, \dots, x_k]$) to run multiple processes and get the results
     - $f$ is the function to run for the processes
     - $[x_1, x_2, \dots, x_k]$ are the input arguments we want to run for the function (this is a size $k$ list)
     - $[r_1, r_2, \dots, r_k]$ are the output arguments we get after running the functions for each input (this is a size $k$ list)
     - $k$ may be larger than number of processes

In [5]:
def f(x):
    return x*x

if __name__ == '__main__':
    with mp.Pool(4) as p:
        print(p.map(f, [1, 2, 3, 4, 5]))

[1, 4, 9, 16, 25]


## Spark

Spark is great for scaling up data science tasks and workloads. 

To run spark, you need to use Spark data frames and libraries that operate on these data structures, you can scale to massive data sets that distribute across a cluster.

* __Native Spark__: if you’re using Spark data frames and libraries (e.g. MLlib), then your code will be parallelized and distributed natively by Spark
* __Thread Pools__: The multiprocessing library can be used to run concurrent Python threads, and even perform operations with Spark data frames.
* __Pandas UDFs__: A new feature in Spark that enables parallelized processing on Pandas data frames within a Spark environment.



#### Single thread example 

Reference [here](https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473)

Goal: using Boston housing data set to build a regression model for predicting house prices using 13 different features. 

In [8]:
import numpy as np
import pandas as pd

# load the boston data set
from sklearn.datasets import load_boston
boston = load_boston()

# convert to a Pandas Data Frame
boston_pd = pd.DataFrame(data= np.c_[boston['data'],boston['target']], 
              columns= np.append(boston['feature_names'], 'target')).sample(frac=1)
print(boston_pd.shape)
boston_pd.head(5)

(506, 14)


Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,target
228,0.29819,0.0,6.2,0.0,0.504,7.686,17.0,3.3751,8.0,307.0,17.4,377.51,3.92,46.7
7,0.14455,12.5,7.87,0.0,0.524,6.172,96.1,5.9505,5.0,311.0,15.2,396.9,19.15,27.1
314,0.3692,0.0,9.9,0.0,0.544,6.567,87.3,3.6023,4.0,304.0,18.4,395.69,9.28,23.8
99,0.0686,0.0,2.89,0.0,0.445,7.416,62.5,3.4952,2.0,276.0,18.0,396.9,6.19,33.2
88,0.0566,0.0,3.41,0.0,0.489,7.007,86.3,3.4217,2.0,270.0,17.8,396.9,5.5,23.6


* Split the data set into _training_ and _testing_ groups and separate the features from the labels for each group. 
* Use the LinearRegression class to fit the training data set and create predictions for the test data set. 
* Calculate the correlation coefficient between the actual and predicted house prices.

In [9]:
from sklearn.linear_model import LinearRegression
from scipy.stats.stats import pearsonr

np.random.seed(2022) # set seed

# split into data and label arrays 
y = boston_pd['target']
X = boston_pd.drop(['target'], axis=1)

# create training (~80%) and test data sets
X_train = X[:400]
X_test = X[400:]
y_train = y[:400]
y_test = y[400:]

# train a classifier 
lr = LinearRegression()
model = lr.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

# error metrics
r = pearsonr(y_pred, y_test)
mae = sum(abs(y_pred - y_test))/len(y_test) 
print('R-sqaured: ' + str(r[0]**2))
print("MAE: " + str(mae))

R-sqaured: 0.7714906558491852
MAE: 3.3652865859490313


#### Native Spark

If you use Spark data frames and libraries, then Spark will natively parallelize and distribute your task. First, we’ll need to convert the Pandas data frame to a Spark data frame, and then transform the features into the spark vector representation required for MLlib. The snippet below shows how to perform this task for the housing data set.


In [6]:
!pip install pyspark



In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
#sc = SparkContext('local')
#spark = SparkSession(sc)

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

[Row(CRIM=0.29819, ZN=0.0, INDUS=6.2, CHAS=0.0, NOX=0.504, RM=7.686, AGE=17.0, DIS=3.3751, RAD=8.0, TAX=307.0, PTRATIO=17.4, B=377.51, LSTAT=3.92, target=46.7),
 Row(CRIM=0.14455, ZN=12.5, INDUS=7.87, CHAS=0.0, NOX=0.524, RM=6.172, AGE=96.1, DIS=5.9505, RAD=5.0, TAX=311.0, PTRATIO=15.2, B=396.9, LSTAT=19.15, target=27.1),
 Row(CRIM=0.3692, ZN=0.0, INDUS=9.9, CHAS=0.0, NOX=0.544, RM=6.567, AGE=87.3, DIS=3.6023, RAD=4.0, TAX=304.0, PTRATIO=18.4, B=395.69, LSTAT=9.28, target=23.8),
 Row(CRIM=0.0686, ZN=0.0, INDUS=2.89, CHAS=0.0, NOX=0.445, RM=7.416, AGE=62.5, DIS=3.4952, RAD=2.0, TAX=276.0, PTRATIO=18.0, B=396.9, LSTAT=6.19, target=33.2),
 Row(CRIM=0.0566, ZN=0.0, INDUS=3.41, CHAS=0.0, NOX=0.489, RM=7.007, AGE=86.3, DIS=3.4217, RAD=2.0, TAX=270.0, PTRATIO=17.8, B=396.9, LSTAT=5.5, target=23.6)]

In [15]:
from pyspark.ml.feature import VectorAssembler
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
# sc = SparkContext('local')
# spark = SparkSession(sc)

# convert to a Spark data frame
boston_sp = spark.createDataFrame(boston_pd)
display(boston_sp.take(5))

np.random.seed(2022)

# split into training and test spark data frames
boston_train = spark.createDataFrame(boston_pd[:400])
boston_test = spark.createDataFrame(boston_pd[400:])

# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols = boston_train.schema.names[:(boston_pd.shape[1] - 1)], outputCol="features" )
boston_train = assembler.transform(boston_train).select('features', 'target') 
boston_test = assembler.transform(boston_test).select('features', 'target') 

[Row(CRIM=0.29819, ZN=0.0, INDUS=6.2, CHAS=0.0, NOX=0.504, RM=7.686, AGE=17.0, DIS=3.3751, RAD=8.0, TAX=307.0, PTRATIO=17.4, B=377.51, LSTAT=3.92, target=46.7),
 Row(CRIM=0.14455, ZN=12.5, INDUS=7.87, CHAS=0.0, NOX=0.524, RM=6.172, AGE=96.1, DIS=5.9505, RAD=5.0, TAX=311.0, PTRATIO=15.2, B=396.9, LSTAT=19.15, target=27.1),
 Row(CRIM=0.3692, ZN=0.0, INDUS=9.9, CHAS=0.0, NOX=0.544, RM=6.567, AGE=87.3, DIS=3.6023, RAD=4.0, TAX=304.0, PTRATIO=18.4, B=395.69, LSTAT=9.28, target=23.8),
 Row(CRIM=0.0686, ZN=0.0, INDUS=2.89, CHAS=0.0, NOX=0.445, RM=7.416, AGE=62.5, DIS=3.4952, RAD=2.0, TAX=276.0, PTRATIO=18.0, B=396.9, LSTAT=6.19, target=33.2),
 Row(CRIM=0.0566, ZN=0.0, INDUS=3.41, CHAS=0.0, NOX=0.489, RM=7.007, AGE=86.3, DIS=3.4217, RAD=2.0, TAX=270.0, PTRATIO=17.8, B=396.9, LSTAT=5.5, target=23.6)]

In [16]:
assembler

VectorAssembler_e822f6418b09

Now that we have the data prepared in the Spark format, we can use MLlib to perform parallelized fitting and model prediction. The snippet below shows how to instantiate and train a linear regression model and calculate the correlation coefficient for the estimated house prices.

In [17]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression

# linear regression 
lr = LinearRegression(maxIter=10, regParam=0.1, 
                      elasticNetParam=0.5, labelCol="target")

# Fit the model
model = lr.fit(boston_train)
boston_pred = model.transform(boston_test)

# calculate results 
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7680557734660175


We now have a model fitting and prediction task that is parallelized. However, what if we also want to concurrently try out different hyperparameter configurations?

* use the CrossValidator class that performs this operation natively in Spark

In [18]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=LinearRegression(labelCol = "target"),  
                         estimatorParamMaps=ParamGridBuilder().addGrid(
                           LinearRegression.elasticNetParam, [0, 0.5, 1.0]).build(),
                         evaluator=RegressionEvaluator(
                           labelCol = "target", metricName = "r2"),
                         numFolds=10)

# cross validate the model and select the best fit
cvModel = crossval.fit(boston_train) 
model = cvModel.bestModel

# calculate results 
boston_pred = model.transform(boston_test)
r = boston_pred.stat.corr("prediction", "target")
print("R-sqaured: " + str(r**2))

R-sqaured: 0.7714906558491647


#### Thread Pools

One of the ways that you can achieve parallelism in Spark without using Spark data frames is by using the multiprocessing library.

The library provides a thread abstraction that you can use to create concurrent threads of execution. However, by default all of your code will run on the driver node. 

The snippet below shows how to create a set of threads that will run in parallel, are return results for different hyperparameters for a random forest.

In [19]:
# sklearn version 
from sklearn.ensemble import RandomForestRegressor as RFR
from multiprocessing.pool import ThreadPool

# allow up to 5 concurrent threads
pool = ThreadPool(5)

# hyperparameters to test out (n_trees)
parameters = [ 10, 20, 50]

# define a function to train a RF model and return metrics 
def sklearn_random_forest(trees, X_train, X_test, y_train, y_test):

    # train a random forest regressor with the specified number of trees
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)

    # return the number of trees, and the R value 
    return [trees, r[0]**2]  

# run the tasks 
pool.map(lambda trees: sklearn_random_forest(trees, X_train,
                                           X_test, y_train, y_test), parameters)

[[10, 0.8468220090058051], [20, 0.8505399421144217], [50, 0.8480309239742657]]

This approach works by using the map function on a pool of threads. The map function takes a lambda expression and array of values as input, and invokes the lambda expression for each of the values in the array. Once all of the threads complete, the output displays the hyperparameter value (n_estimators) and the R-squared result for each thread.

Using thread pools this way is dangerous, because all of the threads will execute on the driver node. If possible it’s best to use Spark data frames when working with thread pools, because then the operations will be distributed across the worker nodes in the cluster. The MLib version of using thread pools is shown in the example below, which distributes the tasks to worker nodes.

In [12]:
# spark version
from pyspark.ml.regression import RandomForestRegressor

# define a function to train a RF model and return metrics 
def mllib_random_forest(trees, boston_train, boston_test):

    # train a random forest regressor with the specified number of trees
    rf = RandomForestRegressor(numTrees = trees, labelCol="target")
    model = rf.fit(boston_train)

    # make predictions
    boston_pred = model.transform(boston_test)
    r = boston_pred.stat.corr("prediction", "target")

    # return the number of trees, and the R value 
    return [trees, r**2]
  
# run the tasks 
pool.map(lambda trees: mllib_random_forest(trees, boston_train, boston_test), parameters)
  

[[10, 0.8137058470592257], [20, 0.8110705072802111], [50, 0.8333640368138624]]

#### Pandas UDFs

One of the newer features in Spark that enables parallel processing is Pandas UDFs. With this feature, you can partition a Spark data frame into smaller data sets that are distributed and converted to Pandas objects, where your function is applied, and then the results are combined back into one large Spark data frame. Essentially, Pandas UDFs enable data scientists to work with base Python libraries while getting the benefits of parallelization and distribution.

If PyArrow has a version older than 1.0.0, install the most recent PyArrow

In [14]:
!pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-7.0.0-cp38-cp38-macosx_10_13_x86_64.whl (20.2 MB)
[K     |████████████████████████████████| 20.2 MB 8.7 MB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-7.0.0


In [20]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# setup the spark data frame as a table
boston_sp.createOrReplaceTempView("boston")

# add train/test label and expand the data set by 3x (each num trees parameter)
full_df = spark.sql("""
  select *
  from (
    select *, case when rand() < 0.8 then 1 else 0 end as training 
    from boston
  ) b
  cross join (
      select 11 as trees union all select 20 as trees union all select 50 as trees)
""")

schema = StructType([StructField('trees', LongType(), True),
                     StructField('r_squared', DoubleType(), True)])  

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_RF(boston_pd):
    trees = boston_pd['trees'].unique()[0]

    # get the train and test groups 
    boston_train = boston_pd[boston_pd['training'] == 1]
    boston_test = boston_pd[boston_pd['training'] == 0] 
        
    # create data and label groups 
    y_train = boston_train['target']
    X_train = boston_train.drop(['target'], axis=1)
    y_test = boston_test['target']
    X_test = boston_test.drop(['target'], axis=1)
   
    # train a classifier 
    rf= RFR(n_estimators = trees)
    model = rf.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)
    r = pearsonr(y_pred, y_test)
    
    # return the number of trees, and the R value 
    return pd.DataFrame({'trees': trees, 'r_squared': (r[0]**2)}, index=[0])
  
# use the Pandas UDF
results = full_df.groupby('trees').apply(train_RF)

# print the results 
print(results.take(3))



[Row(trees=11, r_squared=0.7319703185707604), Row(trees=20, r_squared=0.7647827821222429), Row(trees=50, r_squared=0.7318645764148629)]


With this approach, the result is similar to the method with thread pools, but the main difference is that the task is distributed across worker nodes rather than performed only on the driver.


#### Conclusion

There’s multiple ways of achieving parallelism when using PySpark for data science. It’s best to use native libraries if possible, but based on your use cases there may not be Spark libraries available. In this situation, it’s possible to use thread pools or Pandas UDFs to parallelize your Python code in a Spark environment. Just be careful about how you parallelize your tasks, and try to also distribute workloads if possible.
