<a href="https://colab.research.google.com/github/gevargas/bigdata-management/blob/master/Intro_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Baby stepts into **Spark**

## Colab configuration

In [1]:
SPARK_VERSION   = "3.0.0"
HADOOP_VERSION  = "2.7"
OPENJDK_VERSION = "11"

In [2]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-{0}-openjdk-amd64".format(OPENJDK_VERSION)
os.environ["SPARK_HOME"] = "/usr/local/spark"

In [3]:
# Java
!apt-get update && apt-get install --no-install-recommends -y openjdk-"$OPENJDK_VERSION"-jdk-headless -qq > /dev/null

# Spark
!curl -L --silent https://archive.apache.org/dist/spark/spark-"$SPARK_VERSION"/spark-"$SPARK_VERSION"-bin-hadoop"$HADOOP_VERSION".tgz > spark-"$SPARK_VERSION"-bin-hadoop"$HADOOP_VERSION".tgz
!tar xzf spark-"$SPARK_VERSION"-bin-hadoop"$HADOOP_VERSION".tgz
!rm spark-"$SPARK_VERSION"-bin-hadoop"$HADOOP_VERSION".tgz
!mv spark-"$SPARK_VERSION"-bin-hadoop"$HADOOP_VERSION" /usr/local/spark

!pip install -q findspark

Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:3 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:12 http://ppa.launchpad.net/cran/libgit2/u

## Pyspark configuration

Modify according to your needs

In [4]:
import os
import findspark

SPARK_DRIVER_MEMORY   = "8g"

os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory {0} pyspark-shell".format(SPARK_DRIVER_MEMORY)

findspark.init()

# Initializing Spark

* SparkContext creation

In [5]:
import pyspark

sc = pyspark.SparkContext.getOrCreate()
sc

# Pyspark example

* Read local/HDFS file

In [6]:
textFile = sc.textFile("sample_data/README.md")

* Count the number of rows in the file

In [7]:
textFile.count()

19

* Print first line in file

In [8]:
textFile.first()

'This directory includes a few sample datasets to get you started.'

* Count the number of lines containing the word `"dataset"` 

In [9]:
lines = textFile.filter(lambda line: "dataset" in line)
lines.count()

2

* Collect (i.e. extract from the spark cluster) the lines containing the word `"dataset"`

In [10]:
lines.collect()

['This directory includes a few sample datasets to get you started.',
 '    [vega_datasets library](https://github.com/altair-viz/vega_datasets/blob/4f67bdaad10f45e3549984e17e1b3088c731503d/vega_datasets/_data/anscombe.json).']

# First steps working with RDD's

Create a parallelized collection holding the numbers 1 to 5:

In [12]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. 

For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list.

In [13]:
distData.reduce(lambda a, b: a + b)

15

An important parameter for parallel collections is the number of partitions to cut the dataset into. 
Spark will run one task for each partition of the cluster. 

*   Spark will run one task for each partition of the cluster. 
*   Typically you want 2-4 partitions for each CPU in your cluster. 

Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). 

Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

## RDD operations

In [15]:
lines = sc.textFile("sample_data/README.md")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

In [16]:
totalLength

911

The first line defines a base RDD from an external file.
This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. 

The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. 

Finally, we run reduce, which is an action. 

At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

If we also wanted to use lineLengths again later, we could add: