##### STA 220 Data & Web Technologies for Data Analysis

### Lecture 4, 1/15/26, Concurrency


### Today's Topics

 - Concurrency
     - Threads and Processes
     - I/O-Concurrency
         - `threading`
     - CPU-Concurrency (Parallelization)
         - `multiprocessing` 
         - `Spark`
 
### References
- [SuperFastPython](https://superfastpython.com/thread-vs-process/) by Jason Brownlee
- [An introduction to parallel programming](https://sebastianraschka.com/Articles/2014_multiprocessing.html) by Sebastian Raschka
- [Speed Up Your Python Program With Concurrency](https://realpython.com/python-concurrency/) by Jim Anderson
- [An Intro to Threading in Python](https://realpython.com/intro-to-python-threading/) by Jim Anderson
- [Parallelization in Python](https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473) by Ben Weber

### Announcements

- First homework will be assigned soon

### Discussion

The discussion sections take place after Thursday's lecture, 3:20 - 4:00 PM, at the Olson Hall 251.

This week's topics are:
- Regular expressions



### Threads and Processes

A computer can have multiple CPUs, each CPU has multiple cores (e.g., two quad-core CPUs). 
All the CPUs are connected to memory (e.g., 64G memory). 
CPU cores can execute in parallel. 

<div>
<img src="../images/fig1.png" alt="Drawing" style="width: 1000px;"/>
</div>

A __process__ is the *operating system’s spawned and controlled entity that encapsulates an executing application* ([Breshears: The Art of Concurrency](https://amzn.to/3J74TRr)). 

A __thread__ is a path of execution which belongs to a process. 

Each thread belongs to a process. In single-threaded processes, the process contains one thread. In multithreaded processes, the process contains more than one thread, and the process is accomplishing a number of things at the same time. 

Threads can share memory within a process. This means that functions executed in new threads can access the same data and state. These might be global variables or data shared via function arguments. As such, sharing state between threads is straightforward.

Threads are sometimes called lightweight processes because they have their own stack but can access shared data. 

<div>
<img src="../images/fig3.png" alt="Drawing" style="width: 800px;"/>
</div>

On the other hand, processes are 'share nothing', i.e., they independently execute without sharing memory or state. This makes it easier to turn into a distributed application, but typically, sharing data between processes requires explicit mechanisms.

<div>
<img src="../images/fig4.png" alt="Drawing" style="width: 800px;"/>
</div>

Python allows to execute code using the principle of global interpreter lock (GIL). This means that only one thread can be executed at a time. This simplifies implementation, but makes it more difficult to execute code concurrently. 

Today, we will explore the advantages of executing multiple processes and threads and discuss under what circumstances which approach is most adequate. 

There are two major kind of tasks, that will slow down your program: CPU-bound and I/O-bound.

I/O-bound tasks cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise when your program interacts with other sources, i.e., when your are *requesting* data from another source. 

<div>
<img src=https://files.realpython.com/media/IOBound.4810a888b457.png style="width: 1500px;"/>
</div>

CPU-bound tasks are those that require a lot of *computational* effort to complete. 

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1700px;"/>
</div>

We will use threads and the module `threading` for I/O-bound tasks and processes and the module `multiprocessing` for CPU-bound tasks.

- CPU: 100_000_000_000 instructions per second (Intel i7)
- 10_000_000 memory references per second
- 500 disc seeks per second
- 7 pings from US to Europe (and back) per second

Thus, if you are waiting for a disc seek to complete or for data from the web, you could do a LOT of calculations in between.

### I/O-Concurrency

We’ll start with a non-concurrent version of this a I/O-bound task. Namely, we will use `requests` to request data from a [website](https://homepage.univie.ac.at/nicolai.amann/). For `request.Session`, see [here](https://requests.readthedocs.io/en/latest/user/advanced/) and [docs](https://requests.readthedocs.io/en/latest/api/?#requests.Session). 

In [3]:
import requests, time

def download_site(url, session):
    session.get(url) # fetch information from url 
    # ... do something ... 
    
def download_all_sites(sites):
    session = requests.Session()
    [download_site(url, session) for url in sites]
        
def task(): 
    sites = ["https://homepage.univie.ac.at/nicolai.amann/"] * 20
    start_time = time.time()
    download_all_sites(sites)
    print(time.time() - start_time)

In [4]:
task()

6.990546941757202


We will now use concurrent threads that accomplish the same task, retrieving information by executing `requests.get` more efficient. 

In [5]:
import concurrent.futures, threading

def download_site(url, session):
    session.get(url) # fetch information from url 
    # ... do something ... 
    
def download_all_sites(sites):
    session = requests.Session()
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        executor.map(lambda u: download_site(u, session), sites)
        
def task(): 
    sites = ["https://homepage.univie.ac.at/nicolai.amann/"] * 10
    start_time = time.time()
    download_all_sites(sites)
    print(time.time() - start_time)

In [6]:
task()

0.7973179817199707


In [7]:
import concurrent.futures, threading

thread_local = threading.local() # instantiates thread to create local data (here: session-attr.)

def download_site(url):
    session = get_session()
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(download_site, sites)

def get_session():
    '''Create a new requests.Session if there is none in thread_local'''
    if not hasattr(thread_local, "session"): 
        thread_local.session = requests.Session()
    return thread_local.session

We have created a `concurrent.futures.ThreadPoolExecutor`. It creates four threads that are run concurrently. 

Also, each thread will become its own separate `requests.Session`. This is one of the interesting and difficult issues with threading. Because the operating system is in control of switching between threads, any data that is shared between the threads needs to be protected, or thread-safe. Unfortunately `requests.Session` is not thread-safe. If untreated, *race conditions* can produce hard-to-detect bugs. 

Here, we use `threading.local()` to instantiate an object that looks like a global but is specific to each individual thread. 

In [8]:
task()

1.1452770233154297


The code above is faster than the non-concurrent version, because the I/O-bound has been circumvented. 

<div>
<img src=https://files.realpython.com/media/Threading.3eef48da829e.png style="width: 900px;"/>
</div>

### CPU-Concurrency (Parallelization)

The above example of concurrency run only on a single CPU. This is due to the GIL. The `multiprocessing` module breaks down that barrier and runs code across multiple CPUs. 

It does this by creating a new instance of the Python interpreter (a new process) to run on each CPU and then farming out part of your program to run on it.  Bringing up a separate Python interpreter is not as fast as starting a new thread in the current Python interpreter. Regarding `.set_start_method`, see [here](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods). 

In [1]:
import multiprocessing

multiprocessing.cpu_count() # number of cores on laptop/computer

8

In [None]:
session = None
multiprocessing.set_start_method("fork") # new process will be a copy from previous process

def set_global_session():
    global session
    if not session:
        session = requests.Session()

def download_site(url):
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session, processes = 8) as pool: # change processes!
        pool.map(download_site, sites)

def task(): 
    sites = ["https://homepage.univie.ac.at/nicolai.amann/"] * 10
    start_time = time.time()
    download_all_sites(sites)
    print(time.time() - start_time)

task()

We may even be faster than with `threading`, since we are running up to 8 processes in parallel, but `multiprocessing` cannot scale beyond the number of cores in your local machine. 
The default argument `processes` of `multiprocessing.pool.Pool` ([docs](https://docs.python.org/3/library/multiprocessing.html?#module-multiprocessing.pool)), is the number of available cores on the machine. 

Remember that each process has its own memory space (share-nothing). That means that they cannot share things like a `requests.Session` object. We don’t want to create a new Session each time the function is called, you want to create one for each process, which calls `request.get` multiple times after another. 

The `initializer` function parameter is built for just this case. We initialize a global `session` variable to hold the single `requests.Session` *for each process*. Because each process has its own memory space, the global for each one will be different.

<div>
<img src=https://files.realpython.com/media/MProc.7cf3be371bbc.png style="height: 500px;"/>
</div>

This course will deal with fetching information from the web, usually via requests. While these requests will usually be bound by third partys who maintain the servers we are requesting from as well, they are in essence I/O-bound task. `multiprocessing` however is useful for CPU-bound tasks. Consequently, lets consider a computational problem. 

For the purposes of our example, we’ll use a somewhat silly function to create something that takes a long time to run on the CPU. This function computes the sum of the squares of each number from 0 to the passed-in value:

In [120]:
def problem(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    for number in numbers:
        problem(number)

def task():
    numbers = [5_000_000 + x for x in range(20)]
    start_time = time.time()
    find_sums(numbers)
    print(time.time() - start_time)

In [121]:
task()

3.062579870223999


This code calls `problem` 20 times with a different large number each time. It does all of this on a single thread in a single process on a single CPU. The execution timing diagram looks like this:

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1600px;"/>
</div>

Since there is no I/O waiting time, `threading` will not speed up this problem. We can however speed the computation by using our multiple cores. 

In [122]:
# multiprocessing.set_start_method("fork", True) # has already been set
def find_sums(numbers):
    pool = multiprocessing.Pool(processes = 8)
    return pool.map(problem, numbers)

In [123]:
task()

0.7028112411499023


In [124]:
def problem(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    pool = multiprocessing.Pool(processes = 8)
    return pool.map(problem, numbers)

def task():
    numbers = [5_000_000 + x for x in range(20)]
    start_time = time.time()
    vl_numbers = find_sums(numbers)
    print(time.time() - start_time)
    print(vl_numbers)

In [125]:
task()

0.6990382671356201
[41666654166667500000, 41666679166667500000, 41666704166677500001, 41666729166697500005, 41666754166727500014, 41666779166767500030, 41666804166817500055, 41666829166877500091, 41666854166947500140, 41666879167027500204, 41666904167117500285, 41666929167217500385, 41666954167327500506, 41666979167447500650, 41667004167577500819, 41667029167717501015, 41667054167867501240, 41667079168027501496, 41667104168197501785, 41667129168377502109]


In [30]:
multiprocessing.cpu_count()

8

This code is similar as for the I/O-bound problem, but here you don’t need to worry about the `requests.Session` object. Notably, the speed-up is not equal to the number of cores, as each process has to set up its own Python interpreter. 

<div>
<img src=https://files.realpython.com/media/CPUMP.69c1a7fad9c4.png style="height: 400px;"/>
</div>

While this codes is easy and fast. All the single processes are automatically taken care of with `multiprocessing.Pool`. The results returned by `problem` are gathered by `multiprocessing.map` as a <kbd>list</kbd> type. 

In [126]:
find_sums(range(0, 4)) # returns a list

[0, 0, 1, 5]

However, many solution require communication between the processes. This can add some complexity to your solution that a non-concurrent program would not need to deal with.

A `multiprocessing.Queue` object provides a mechanism to pass data between a parent process and the descendent processes of it. It adheres the *first in first out* principle. We can retrieve with the `multiprocessing.get` method and set with the `multiprocessing.put` method. 

<div>
<img src=https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-4.png style="width: 700px;"/>
</div>

In [128]:
q = multiprocessing.Queue()
def myfun(q, i): 
    q.put(i)

In [129]:
q.empty()

True

In [130]:
# initialize process
processes = [multiprocessing.Process(target=myfun, args = (q, i)) for i in range(4)]
processes

[<Process name='Process-44' parent=1029 initial>,
 <Process name='Process-45' parent=1029 initial>,
 <Process name='Process-46' parent=1029 initial>,
 <Process name='Process-47' parent=1029 initial>]

Report snapshot of the current processes number 33 (without the keyword grep).

In [131]:
!ps aux | grep "[5]44"

nicolai           2151   0.0  0.1 410854464  24320   ??  Ss    8:50AM   0:00.05 /System/Library/Frameworks/AppKit.framework/Versions/C/XPCServices/ThemeWidgetControlViewService.xpc/Contents/MacOS/ThemeWidgetControlViewService
nicolai           2134   0.0  0.1 410954448  20176   ??  S     8:50AM   0:00.12 /usr/libexec/mlhostd
nicolai            698   0.0  0.0 410172544   2816   ??  Ss    6:20AM   0:00.00 /System/Library/Frameworks/NetFS.framework/Versions/A/XPCServices/PlugInLibraryService.xpc/Contents/MacOS/PlugInLibraryService
nicolai            587   0.0  0.0 410075440   1024   ??  S     6:20AM   0:00.00 /Applications/Firefox.app/Contents/MacOS/crashhelper 461 gecko-crash-server-pipe.461 /private/var/folders/qz/ztp_k9054wz3wyfvjqbw10_h0000gp/T/TemporaryItems/ 10
root               544   0.0  0.1 410387728  19216   ??  Ss    6:20AM   0:00.35 /System/Library/PrivateFrameworks/AppSSO.framework/Support/AppSSODaemon
root               401   0.0  0.1 410298544  10944   ??  Ss    6:20AM   0

The processess are initialized but not yet executed. 

In [132]:
# run processes
for process in processes:
    process.start()

In [133]:
# join processes
for process in processes:
    process.join()

The main purpose of `multiprocessing.join` ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.join)) is to ensure that a child process has completed before the main process does anything that depends on the work of the child process.

In [134]:
processes

[<Process name='Process-44' pid=3708 parent=1029 stopped exitcode=0>,
 <Process name='Process-45' pid=3709 parent=1029 stopped exitcode=0>,
 <Process name='Process-46' pid=3710 parent=1029 stopped exitcode=0>,
 <Process name='Process-47' pid=3711 parent=1029 stopped exitcode=0>]

The value `exitcode=0` means that the process has been completed successfully, without error ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.exitcode)). 

In [135]:
q.empty()

False

In [136]:
[q.get() for process in processes]

[0, 1, 2, 3]

In [137]:
q.empty()

True

While `multiprocessing` allows you to steer the processes directly, many statistical problems are already implemented and ready for parallel computing, e.g. via `Spark`. 

In [139]:
from multiprocessing import Process, Queue
import time

def producer(q, items_to_produce):
    for i in range(items_to_produce):
        item = f"Item {i}"
        print(f"Producer: Putting {item}")
        q.put(item)
        time.sleep(0.1) # Simulate work
    q.put(None)

def consumer(q):
    while True:
        item = q.get()
        if item is None: # Check for sentinel value
            break
        print(f"Consumer: Got {item}")
        time.sleep(0.2) # Simulate work

queue = Queue()
    
p1 = Process(target=producer, args=(queue, 5))
c1 = Process(target=consumer, args=(queue,))

p1.start()
c1.start()

p1.join()
c1.join()

if queue.empty():
    print("Main: All processes finished.")

Producer: Putting Item 0
Consumer: Got Item 0
Producer: Putting Item 1
Producer: Putting Item 2Consumer: Got Item 1

Producer: Putting Item 3
Consumer: Got Item 2
Producer: Putting Item 4
Consumer: Got Item 3
Consumer: Got Item 4
Main: All processes finished.


### Spark

Apache Spark is a computational engine that works with huge sets of data by processing them in parallel and batch systems. Spark is written in Scala, and [PySpark](https://www.dominodatalab.com/data-science-dictionary/pyspark) was released to support the collaboration of Spark and Python. 

The key data type used in PySpark is the Spark dataframe. This object can be thought of as a table distributed across a cluster, and has functionality that is similar to dataframes in `pandas`. If you want to do distributed computation using `PySpark`, then you’ll need to perform operations on Spark dataframes and not other Python data types.

Here we explore how to perform tasks using `PySpark`.

Note: Unfortunately, just pip installing PySpark may not be enough to use the following code. Sometimes the installed JavaScript version on your PC/laptop is not matching the requirement for PySpark.

In [1]:
!pip install pyspark



We consider a simple regression model for predicting house prices. Lets consider the non-serialized version first: 

In [1]:
import numpy as np
import pandas as pd

In [2]:
# load the california housing data set
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()

# convert to a Pandas Data Frame
housing_pd = pd.DataFrame(data= np.c_[housing['data'],housing['target']], 
                          columns= np.append(housing['feature_names'], 'target')).sample(frac=1) 

# concatenate data
# get all the data (fraction of 1 = 100%)
# sample chooses without replacement: thus, we get a random ordering!

In [3]:
housing_pd.head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,target
5516,6.2673,38.0,5.516667,0.972222,374.0,2.077778,33.97,-118.39,3.572
14286,3.37,36.0,4.96013,1.045566,3093.0,2.51668,32.72,-117.12,1.591
14791,2.5988,28.0,5.43314,1.18314,1074.0,3.122093,32.58,-117.11,1.356
6092,3.1133,27.0,4.201923,0.96875,985.0,2.367788,34.11,-117.85,1.806
3443,1.95,19.0,3.218391,0.965517,483.0,5.551724,34.25,-118.41,1.375


California housing dataset.

ThThis dataset was obtained from the StatLib repository. https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html

The data contains 20,640 observations on 9 variables.

This dataset contains the average house value as target variable
and the following input variables (features): media income,
housing median age, average rooms, average bedrooms, population,
average occupation, latitude, and longitude in that order.

References:

Pace, R. Kelley and Ronald Barry, Sparse Spatial Autoregressions,
Statistics and Probability Letters, 33:291-297, 1997.

In [4]:
housing_pd.shape

(20640, 9)

In [5]:
housing_pd.head(5)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,target
5516,6.2673,38.0,5.516667,0.972222,374.0,2.077778,33.97,-118.39,3.572
14286,3.37,36.0,4.96013,1.045566,3093.0,2.51668,32.72,-117.12,1.591
14791,2.5988,28.0,5.43314,1.18314,1074.0,3.122093,32.58,-117.11,1.356
6092,3.1133,27.0,4.201923,0.96875,985.0,2.367788,34.11,-117.85,1.806
3443,1.95,19.0,3.218391,0.965517,483.0,5.551724,34.25,-118.41,1.375


Attribute Information:
- MedInc: median income in block group
- HouseAge: median house age in block group
- AveRooms: average number of rooms per household
- AveBedrms: average number of bedrooms per household
- Population: block group population
- AveOccup: average number of household members
- Latitude: block group latitude
- Longitude: block group longitude

The target variable is the median house value for California districts, expressed in hundreds of thousands of dollars ($100,000).

This dataset was derived from the 1990 U.S. census, using one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people).

A household is a group of people residing within a home. Since the average number of rooms and bedrooms in this dataset are provided per household, these columns may take surprisingly <em>large values for block groups with few households and many empty houses, such as vacation resorts</em>.

In [6]:
housing_pd.index.dtype

dtype('int64')

In [7]:
import pyspark
print(pyspark.__version__)

4.0.0


In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local') # Create an instance of SparkContext. Only run locally, i.e., on this laptop
spark = SparkSession(sc) # allows to use PySpark for DB, dataframes, etc

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/16 10:02:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
# convert to a Spark data frame
housing_sp = spark.createDataFrame(housing_pd) # convert to sparkDF

In [10]:
print(housing_sp)

DataFrame[MedInc: double, HouseAge: double, AveRooms: double, AveBedrms: double, Population: double, AveOccup: double, Latitude: double, Longitude: double, target: double]


In [11]:
features = housing_sp.schema.names[:] # define the features to be all the names above
target = features.pop() # remove the last one (the target), and define it as the target variable

In [69]:
# alternatively, we also could have used the following code
features = housing_sp.schema.names[:-1] # define the features to be all but the last one of the names above
target = housing_sp.schema.names[-1] # define the last feature to be the target

In [12]:
print(features)
print(target)

['MedInc', 'HouseAge', 'AveRooms', 'AveBedrms', 'Population', 'AveOccup', 'Latitude', 'Longitude']
target


In [13]:
# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols=features, outputCol="features" ) # creates a vector consisting of all features. They are distinguished by their column name.
housing = assembler.transform(housing_sp).select('features', 'target') # creates a new dataframe consisting and selects the features and target variables

See [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html). 

In [15]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [16]:
# linear regression with penalization
lr = LinearRegression(labelCol="target", featuresCol="features", 
                      elasticNetParam = 1.0, # lasso / l1-penalization
                      standardization = True, 
                      fitIntercept=True)

In [17]:
lrparamGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.2, 0.5, 0.75, 1.0, 1.5, 2.0]) # the Lasso regularizationParameter can either be 0.001, 0.01, etc.
               .build())

In [18]:
# Evaluate model
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target", metricName="mse")

In [19]:
# Create 10-fold CrossValidator
lrcv = CrossValidator(estimator = lr, # base ML algorithm to be trained and evaluated
                      estimatorParamMaps = lrparamGrid, # possible parameters
                      evaluator = lrevaluator, # defines the metric to be optimized
                      numFolds = 10) # number of folds

In [20]:
# Run cross validations
lrcvModel = lrcv.fit(housing)

25/10/16 10:12:33 WARN TaskSetManager: Stage 0 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:34 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/10/16 10:12:34 WARN TaskSetManager: Stage 1 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:34 WARN TaskSetManager: Stage 2 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:35 WARN TaskSetManager: Stage 3 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:35 WARN TaskSetManager: Stage 4 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:35 WARN TaskSetManager: Stage 5 contains a task of very large size (1703 KiB). The maximum recommended task size is 1000 KiB.
25/10/16 10:12:35 WARN TaskSetManager: Stage 6 

See [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegressionTrainingSummary.html). 

In [21]:
# Get Model Summary Statistics
lrcvSummary = lrcvModel.bestModel.summary
lrcvSummary.meanSquaredError

0.5243769901637115

In [22]:
print(lrcvModel.bestModel.extractParamMap())

{Param(parent='LinearRegression_529bb0ad1e5e', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearRegression_529bb0ad1e5e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0, Param(parent='LinearRegression_529bb0ad1e5e', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35, Param(parent='LinearRegression_529bb0ad1e5e', name='featuresCol', doc='features column name.'): 'features', Param(parent='LinearRegression_529bb0ad1e5e', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearRegression_529bb0ad1e5e', name='labelCol', doc='label column name.'): 'target', Param(parent='LinearRegression_529bb0ad1e5e', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError', Pa

In [23]:
lrcvModel.bestModel.intercept

-36.43748711990771

In [24]:
lrcvModel.bestModel.coefficients

DenseVector([0.4339, 0.0095, -0.1018, 0.6161, -0.0, -0.0037, -0.4163, -0.4289])

In [25]:
housing_sp.schema.names[:-1] # last one is the target

['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude']

Interpretation:
- Median Income influences the housing prices.
- Surprisingly, older houses are more expensive.
- Houses with less rooms as well - this might be a correction factor to a large influence of AveBedrooms: More rooms increase the housing price as long as there are enough beds/bedrooms. Alternatively, a large number of rooms might indicate a large number of houses used only for the holidays.
- The popoulation in this area has no influence on the price.
- Southern and western areas are more expensive than northeastern ones. (This could be a potential influence of LA and SF).

### Summary 

- There are I/O- and CPU-bound problems
- Use `threading` for I/O-bound problems, `multiprocessing` for CPU-bound problems
- Communication between processes is cumbersome
- For many CPU-bound tasks, there may be implemented solutions. 