In [1]:
import os
sparkFile = os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')
exec(compile(open(sparkFile, "rb").read(), sparkFile, 'exec'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 3.5.4 (default, Oct 27 2017 11:48:53)
SparkSession available as 'spark'.


# "Hello world"

In [2]:
sc

If PySpart was correctly installed, the variable `sc` (for SparkContent) should be initialized. `sc` is used as the driver that tells Spark how to use the cluster resources. In this case, Spark will run locally.

Below there is an example of π estimation using MC.

In [3]:
import random
num_samples = 100000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

3.15552


# RDDs

Resilient Distributed Datasets (RDDs) are sets of Java or Scala objects representing data and that can be operated in parallel. They have a time (and hence are compile-time safe), they are lazy and they are based on the Scala collections API. They are the building blocks of Spark. However, they might present problems with non-JVM languages, such as Python: inefficiency, difficult-to-read problems, etc.

RDDs can be created either from a storage source supported by Hadoop of from an existing iterable. 

In [4]:
# RDD from a text file; its a pointer, doesn't load it into memory
distFile = sc.textFile ("data/readme.txt")
distFile

data/readme.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
# RDD from iterable
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData

ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:480

## RDD operations

We can manipulate RDDs through two types of operations, actions and transformations.

- Actions return a value ("actual Python variable") to the driver program after running a computation on the dataset. `reduce` is an action.
- Transformations create a new dataset (RDD) from the existing one. Transformations are lazy. `map` is a transformation.

In [6]:
# calculate the length of each line
lineLengths = distFile.map(lambda s: len(s))
# lazyness: lineLengths is not yet computed
print("lineLengths -> ", lineLengths)

totalLength = lineLengths.reduce(lambda a, b: a + b)
# reduce is an action, therefore it is evaluated
print("totalLength -> ", totalLength)

lineLengths ->  PythonRDD[5] at RDD at PythonRDD.scala:48
totalLength ->  2075



When we make operations on RDD, Spark breaks them up into tasks, each of which is executed by an executor. Before execution, Spark computes the task's closure: the variables and methods that must be visible to the executor to perform the task. Then, this closure is sent to the executor. Functions passed can be lambda expressions, local `defs` or top-level functions in a module. Variables are copied, and each executor sees its own copy. If we want different executors to update the value of a variable, we need `Accumulators`.

### Important actions

In [7]:
# get first element
distFile. first()

'Title:  Bag of Words Data Set'

In [8]:
# get first 5 elements
distFile. take(5)

['Title:  Bag of Words Data Set',
 '',
 'Abstract: This data set contains five text collections in the form of bags-of-words.',
 '',
 '-----------------------------------------------------\t']

In [9]:
# get all the elements
print(distFile. collect())

['Title:  Bag of Words Data Set', '', 'Abstract: This data set contains five text collections in the form of bags-of-words.', '', '-----------------------------------------------------\t', '', 'Data Set Characteristics: Text', 'Number of Instances: 8000000', 'Area: N/A', 'Attribute Characteristics: Integer', 'Number of Attributes: 100000', 'Date Donated: 2008-03-12', 'Associated Tasks: Clustering', 'Missing Values? N/A', '', '-----------------------------------------------------\t\t', '', 'Source:', '', 'David Newman', "newman '@' uci.edu", 'University of California, Irvine', '', '-----------------------------------------------------\t', '', 'Data Set Information:', '', 'For each text collection, D is the number of documents, W is the', 'number of words in the vocabulary, and N is the total number of words', 'in the collection (below, NNZ is the number of nonzero counts in the', 'bag-of-words). After tokenization and removal of stopwords, the', 'vocabulary of unique words was truncated

In [10]:
# sample 3 elements without replacement
distFile. takeSample (False, 3)

['', 'docID wordID count', 'N=730,000,000 (approx)']

In [11]:
# count number of elements
distFile. count()

95

In [12]:
# aggregate the elements of the dataset
# function takes two args and returns one
# for correct parallelization, it should be associative and commutative
distFile .map(lambda s: len(s)). reduce(lambda a, b: a + b)

2075

### Important transformations

In [13]:
# apply a function to every line
# return an iterable of iterables with the results
distFile. map(lambda line: line.split()). take(3)

[['Title:', 'Bag', 'of', 'Words', 'Data', 'Set'],
 [],
 ['Abstract:',
  'This',
  'data',
  'set',
  'contains',
  'five',
  'text',
  'collections',
  'in',
  'the',
  'form',
  'of',
  'bags-of-words.']]

In [14]:
# apply a function to every line
# return an iterable with the results
distFile. flatMap(lambda line: line.split()). take(10)

['Title:',
 'Bag',
 'of',
 'Words',
 'Data',
 'Set',
 'Abstract:',
 'This',
 'data',
 'set']

In [15]:
# reduce the dataset based on a condition
distFile. filter(lambda line: 'source' in line). collect()

['orig source: www.cs.cmu.edu/~enron',
 'orig source: books.nips.cc',
 'orig source: dailykos.com',
 'orig source: ldc.upenn.edu',
 'orig source: www.pubmed.gov']

### Transformations for key-value pairs (K, V)

In [16]:
data = [(1, 5), (2, 4), (1, 3), (2, 5), (3, 1), (1, 2.5)]
distData = sc.parallelize(data)

In [17]:
# groups pairs (K, V) in pairs (K, Iterable<V>)
distData. groupByKey(). collect()

[(1, <pyspark.resultiterable.ResultIterable at 0x10820eb38>),
 (2, <pyspark.resultiterable.ResultIterable at 0x10820e710>),
 (3, <pyspark.resultiterable.ResultIterable at 0x10820e518>)]

In [18]:
# returns pairs (K,V) where V is the reduction of all the inputs with key K
distData. reduceByKey(lambda a, b: a + b). collect()

[(1, 10.5), (2, 9), (3, 1)]

In [19]:
distData. sortByKey(). collect()

[(1, 5), (1, 3), (1, 2.5), (2, 4), (2, 5), (3, 1)]

## DataFrames

DataFrames were conceived to overcome the efficiency limitations of RDD. At its core they are still RDD, but with an interface that makes interacting with the data easier and faster. The price to pay is the compile-time safety, which is gone in exchange for the flexibility and, therefore, a more prone to error code.

## DataSets

DataSets were developed to reconcile the usability of DataFrames with the safety of RDDs. In Spark, a DataSet can be untyped, and hehce equivalent to a DataFrame, or typed. However, in Python, only untyped is available.

# References

- [Spark Programming Guide](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html)