# This notebook is to start playing around with PySpark and start speeding up code using distributed computing. 

Remember, a PySpark program isn’t that much different from a regular Python program, but the execution model can be very different from a regular Python program, especially if you’re running on a cluster.

There can be a lot of things happening behind the scenes that distribute the processing across multiple nodes if you’re on a cluster. However, for now, think of the program as a Python program that uses the PySpark library.

# Pyspark documentation: https://spark.apache.org/docs/latest/api/python/index.html

In [1]:
# !which python

# Pyspark installationn:
https://spark.apache.org/docs/latest/api/python/getting_started/install.html#python-version-supported

In a Python context, think of PySpark has a way to handle parallel processing without the need for the threading or multiprocessing modules. All of the complicated communication and synchronization between threads, processes, and even different CPUs is handled by Spark.

To interact with PySpark, you create specialized data structures called Resilient Distributed Datasets (RDDs).

Typically, you’ll run PySpark programs on a Hadoop cluster, but other cluster deployment options are supported. You can read Spark’s cluster mode overview for more details.

Run the command: "pip install pyspark"

# Have to write some variables in .bashrc file:
# https://stackoverflow.com/questions/30518362/how-do-i-set-the-drivers-python-version-in-spark

# https://sparkbyexamples.com/spark/spark-exception-python-in-worker-has-different-version-3-4-than-that-in-driver-2-7-pyspark-cannot-run-with-different-minor-versions/

# PySpark Hello World:

In [2]:
import os

# steps for Pyspark program, use !which python and set env vars below: 
os.environ["PYSPARK_PYTHON"]="/Users/calebbowyer/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/Users/calebbowyer/anaconda3/bin/python"

In [3]:
import pyspark

# Test speedups of distributed computing from using various numbers of cpus. 

In [4]:
%%time
# sc = pyspark.SparkContext('local[*]')
sc = pyspark.SparkContext('local[1]')
txt = sc.textFile('distributed_computing_notes.txt')
# print(txt)
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

44
3
CPU times: user 47.8 ms, sys: 30.2 ms, total: 78 ms
Wall time: 9.27 s


In [7]:
sc.stop()

The entry-point of any PySpark program is a SparkContext object. This object allows you to connect to a Spark cluster and create RDDs. The local[*] string is a special string denoting that you’re using a local cluster, which is another way of saying you’re running in single-machine mode. The * tells Spark to create as many worker threads as logical cores on your machine.

In [8]:
%%time
# sc = pyspark.SparkContext('local[*]')
sc2 = pyspark.SparkContext('local[2]')
txt = sc2.textFile('distributed_computing_notes.txt')
# print(txt)
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

44
3
CPU times: user 24.4 ms, sys: 10.1 ms, total: 34.4 ms
Wall time: 1.21 s


In [9]:
sc2.stop()

In [10]:
%%time
# sc = pyspark.SparkContext('local[*]')
sc3 = pyspark.SparkContext('local[3]')
txt = sc3.textFile('distributed_computing_notes.txt')
# print(txt)
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

44
3
CPU times: user 21.8 ms, sys: 6.87 ms, total: 28.6 ms
Wall time: 679 ms


In [11]:
sc3.stop()

In [12]:
%%time
# sc = pyspark.SparkContext('local[*]')
sc4 = pyspark.SparkContext('local[4]')
txt = sc4.textFile('distributed_computing_notes.txt')
# print(txt)
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

44
3
CPU times: user 22.4 ms, sys: 7.59 ms, total: 30 ms
Wall time: 737 ms


# Note that for the size of this problem using 3 CPU threads was faster than using 4, so we can stop here. 