# Part 1: Sharing data within a cluster #

## Broadcast: read-only variables ##

In [1]:
# Example: let's encode the gender found in the demographic data
# as a hot encode. The association should be the same
# on every machine in the cluster, requiring a shared mapping

one_hot_encoding = {"M": (1, 0, 0),
                    "F": (0, 1, 0),
                    "U": (0, 0, 1)
                   }

In [2]:
# Gender one-hot-encoding using Sparck context(sc)
(sc.parallelize(["M", "F", "U", "F", "M", "U"])
   .map(lambda x: one_hot_encoding[x])
   .collect())

'''
The command above works only in the single node configuration since the variable "one_hot_encoding" is defined only on
this machine.
On a multi-node cluster, it will raise an error.
'''

[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [4]:
# Solution 1: include the encoding map in the .map() function 
# In this way, all the nodes will see it

def map_ohe(x):
    ohe = {"M": (1, 0, 0),
           "F": (0, 1, 0),
           "U": (0, 0, 1)
          }
    return ohe[x]

sc.parallelize(["M", "F", "U", "F", "M", "U"]) \
  .map(map_ohe) \
  .collect()

'''
This solution will work both locally and on server. But the mapping function is not reusable. 
'''

[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [5]:
# Solution 2: broadcast the map to all the nodes.
# These variables will be read-only.
bcast_map = sc.broadcast(one_hot_encoding)

def bcast_map_ohe(x, shared_ohe):
    return shared_ohe[x]

# Use the bcast_map_ohe function to call the broadcasted variable bcast_map.
(sc.parallelize(["M", "F", "U", "F", "M", "U"])
 .map(lambda x: bcast_map_ohe(x, bcast_map.value))
 .collect())

[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [6]:
# To remove a broadcasted variable, use unpersist on the variable. This will also free up memory
bcast_map.unpersist()

## Accumulators: write-only variables ##

Only the driver node and node running the IPython notebook can read its value. The other nodes cannot.

In the following example, the objective is to count the number of empty lines in a text file.

In [7]:
'''
Solution 1:
2 Spark jobs-
a. Read the text file.
b. Filter the empty lines
c. Count them
'''
(sc.textFile('file:///home/vagrant/datasets/hadoop_git_readme.txt')
   .filter(lambda line: len(line) == 0)
   .count())

6

In [9]:
'''
Solution 2:
a. Define accumulator variable and set it to 0
b. Add 1 to it for each empty line that we find in the file.
'''
accum = sc.accumulator(0)

def split_line(line):
    if len(line) == 0:
        accum.add(1)
    return 1

tot_lines = (sc.textFile('file:///home/vagrant/datasets/hadoop_git_readme.txt')
                .map(split_line)
                .count())

empty_lines = accum.value
print 'Number of empty lines in the hadoop git readme file is : ', empty_lines

Number of empty lines in the hadoop git readme file is :  6


## Broadcast and Accumulators ##

### Example using the iris dataset ###

The steps we will follow are as follows:

* Load the dataset and broadcast to all nodes.
* Each node will use a different classifier on the dataset and return the classifier name and its accuracy score on the full dataset.
* If a classifier raises an exception, store the error in an accumulator.
* Final output will be a list of classifiers that didnt have errors and their corresponding accuracy score.

### Step 1: Load the dataset and broadcast to all nodes

In [10]:
from sklearn.datasets import load_iris

# Broadcasting dataset to all nodes
bcast_dataset = sc.broadcast(load_iris())

### Step 2: Create accumulator object that stores a tuple (classifier_name, exception_raised)

In [11]:
from pyspark import AccumulatorParam

class ErrorAccumulator(AccumulatorParam):
    def zero(self, initialList):
        return initialList
    
    # Make sure the elements are list elements so they can be added
    def addInPlace(self, v1, v2):
        if not isinstance(v1, list):
            v1 = [v1]
        if not isinstance(v2, list):
            v2 = [v2]
        return v1 + v2

errAccum = sc.accumulator([], ErrorAccumulator())

### Step 3: Define mapping function ###

It will perform the following:

* Each node will train, test and evaluate the classifier on the broadcasted Iris dataset.
* The function will recieve the classifier object as an argument.
* It will return a tuple containing the classifier name and its accuracy score. 
* If an exception is raised the classifier name and exception are added to the accumulator.

In [12]:
# Mapping function
def apply_classifier(clf, dataset):
    # Define classifier and feature and target variables
    clf_name = clf.__class__.__name__
    X = dataset.value.data
    y = dataset.value.target
    
    try:
        from sklearn.metrics import accuracy_score
        # Fit the classifier and make predictions
        clf.fit(X, y)
        y_pred = clf.predict(X)
        acc = accuracy_score(y, y_pred)

        return [(clf_name, acc)]
    
    # If exception, classifier name and exception are added to the accumulator
    except Exception as e:
        errAccum.add((clf_name, str(e)))
        return []

### Step 4: Apply classifiers to the iris dataset ###

In [13]:
from sklearn.linear_model import SGDClassifier
from sklearn.dummy import DummyClassifier
from sklearn.decomposition import PCA
from sklearn.manifold import MDS

# We will try the following classifiers
classifiers = [DummyClassifier('most_frequent'), 
               SGDClassifier(), 
               PCA(), 
               MDS()]

# Use flatmap() to collect just the outputs of the mappers that didn't catch an exception
# bcast_dataset is loaded in Step 1
(sc.parallelize(classifiers)
     .flatMap(lambda x: apply_classifier(x, bcast_dataset))
     .collect())

[('DummyClassifier', 0.33333333333333331),
 ('SGDClassifier', 0.66666666666666663)]

In [14]:
# Find the classifiers that raised an exception
print "The errors are:", errAccum.value

The errors are: [('PCA', "'PCA' object has no attribute 'predict'"), ('MDS', "Proximity must be 'precomputed' or 'euclidean'. Got euclidean instead")]


### Step 5: Free memory ###

In [15]:
bcast_dataset.unpersist()

# Part 2: Data Preprocessing in Spark #

## JSON ##

In [16]:
# To load JSON compliant files we create an SQL context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [18]:
# View sample JSON file 
!cat /home/vagrant/datasets/users.json

{"user_id":0, "balance": 10.0}
{"user_id":1, "gender":"M", "balance": 1.0}
{"user_id":2, "gender":"F", "balance": -0.5}
{"user_id":3, "gender":"F", "balance": 0.0}
{"user_id":4, "balance": 5.0}
{"user_id":5, "gender":"M", "balance": 3.0}

#### Load JSON 

In [19]:
# Using .read.json('file_name')
df = sqlContext.read.json("file:///home/vagrant/datasets/users.json")
df.show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|   10.0|  null|      0|
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    5.0|  null|      4|
|    3.0|     M|      5|
+-------+------+-------+



#### Perform computations using filter()

In [20]:
# Check the schema of the JSON file
df.printSchema()

root
 |-- balance: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- user_id: long (nullable = true)



In [21]:
# Using filter using dataframe operations
(df.filter(df['gender'] != 'null')
   .filter(df['balance'] > 0)
   .select(['balance', 'gender', 'user_id'])
   .show())

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



In [22]:
# Using filter with SQL query 
(df.filter('gender is not null')
   .filter('balance > 0').select("*").show())

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



In [23]:
# One line
df.filter('gender is not null and balance > 0').show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



#### Dealing with missing data ####

In [25]:
# Remove rows with missing values
df.na.drop().show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    3.0|     M|      5|
+-------+------+-------+



In [26]:
# Remove rows where we have missing values in the 'gender' column
df.na.drop(subset=["gender"]).show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    3.0|     M|      5|
+-------+------+-------+



In [27]:
# Set values for missing values in the gender and balance column
df.na.fill({'gender': "U", 'balance': 0.0}).show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|   10.0|     U|      0|
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    5.0|     U|      4|
|    3.0|     M|      5|
+-------+------+-------+



#### Grouping and creating tables in memory ####

In [28]:
# Fill in default values for missing values for the gender and balance column
# Then, group by the gender column and average out the values in the balance column for each gender
(df.na.fill({'gender': "U", 'balance': 0.0})
   .groupBy("gender").avg('balance').show())

+------+------------+
|gender|avg(balance)|
+------+------------+
|     F|       -0.25|
|     M|         2.0|
|     U|         7.5|
+------+------------+



### Registering Dataframe as an SQL table ###

#### Running SQL commands ####

In [30]:
# Register datafram as SQL table
df.registerTempTable("users")

In [31]:
# SQL command to find average balance for each gender for rows which don't have NULL values
sqlContext.sql("""
    SELECT gender, AVG(balance) 
    FROM users 
    WHERE gender IS NOT NULL 
    GROUP BY gender""").show()

+------+-----+
|gender|  _c1|
+------+-----+
|     F|-0.25|
|     M|  2.0|
+------+-----+



In [32]:
# Checking type of table
type(sqlContext.table("users"))

pyspark.sql.dataframe.DataFrame

In [33]:
# Collect the full table
sqlContext.table("users").collect()

[Row(balance=10.0, gender=None, user_id=0),
 Row(balance=1.0, gender=u'M', user_id=1),
 Row(balance=-0.5, gender=u'F', user_id=2),
 Row(balance=0.0, gender=u'F', user_id=3),
 Row(balance=5.0, gender=None, user_id=4),
 Row(balance=3.0, gender=u'M', user_id=5)]

In [34]:
# First row
a_row = sqlContext.sql("SELECT * FROM users").first()
a_row

Row(balance=10.0, gender=None, user_id=0)

In [35]:
# Options to extract column values
print a_row['balance']
print a_row.balance

10.0
10.0


In [36]:
a_row.asDict()

{'balance': 10.0, 'gender': None, 'user_id': 0}

## Write RDD to disk ##

In [37]:
!rm -rf /tmp/complete_users*

In [38]:
# Write dataframe to disk
(df.na.drop().write
   .save("file:///tmp/complete_users.json", format='json'))

In [39]:
# Check local filesystem
!ls -als /tmp/complete_users.json

total 28
4 drwxrwxr-x 2 vagrant vagrant 4096 Oct  7 21:01 .
4 drwxrwxrwt 9 root    root    4096 Oct  7 21:01 ..
4 -rw-r--r-- 1 vagrant vagrant   83 Oct  7 21:01 part-r-00000-8a9e7ba8-f31e-4e37-818d-8a5fca989427
4 -rw-rw-r-- 1 vagrant vagrant   12 Oct  7 21:01 .part-r-00000-8a9e7ba8-f31e-4e37-818d-8a5fca989427.crc
4 -rw-r--r-- 1 vagrant vagrant   82 Oct  7 21:01 part-r-00001-8a9e7ba8-f31e-4e37-818d-8a5fca989427
4 -rw-rw-r-- 1 vagrant vagrant   12 Oct  7 21:01 .part-r-00001-8a9e7ba8-f31e-4e37-818d-8a5fca989427.crc
0 -rw-r--r-- 1 vagrant vagrant    0 Oct  7 21:01 _SUCCESS
4 -rw-rw-r-- 1 vagrant vagrant    8 Oct  7 21:01 ._SUCCESS.crc


In [40]:
# View JSON file using SQL
sqlContext.sql(
    "SELECT * FROM json.`file:///tmp/complete_users.json`").show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    3.0|     M|      5|
+-------+------+-------+



### Parquet data format 

In [41]:
# Write data onto local system in parquet format
df.na.drop().write.save(
    "file:///tmp/complete_users.parquet", format='parquet')

In [42]:
# View the files on the local filesystem
!ls -als /tmp/complete_users.parquet/

total 44
4 drwxrwxr-x  2 vagrant vagrant 4096 Oct  7 21:31 .
4 drwxrwxrwt 10 root    root    4096 Oct  7 21:31 ..
4 -rw-r--r--  1 vagrant vagrant  376 Oct  7 21:31 _common_metadata
4 -rw-rw-r--  1 vagrant vagrant   12 Oct  7 21:31 ._common_metadata.crc
4 -rw-r--r--  1 vagrant vagrant 1082 Oct  7 21:31 _metadata
4 -rw-rw-r--  1 vagrant vagrant   20 Oct  7 21:31 ._metadata.crc
4 -rw-r--r--  1 vagrant vagrant  750 Oct  7 21:31 part-r-00000-c36fe8f5-3523-4ed5-8a31-570c610dd3fb.gz.parquet
4 -rw-rw-r--  1 vagrant vagrant   16 Oct  7 21:31 .part-r-00000-c36fe8f5-3523-4ed5-8a31-570c610dd3fb.gz.parquet.crc
4 -rw-r--r--  1 vagrant vagrant  746 Oct  7 21:31 part-r-00001-c36fe8f5-3523-4ed5-8a31-570c610dd3fb.gz.parquet
4 -rw-rw-r--  1 vagrant vagrant   16 Oct  7 21:31 .part-r-00001-c36fe8f5-3523-4ed5-8a31-570c610dd3fb.gz.parquet.crc
0 -rw-r--r--  1 vagrant vagrant    0 Oct  7 21:31 _SUCCESS
4 -rw-rw-r--  1 vagrant vagrant    8 Oct  7 21:31 ._SUCCESS.crc


### Creating a Spark dataframe from an existing RDD ###

In [44]:
from pyspark.sql import Row

# Create a row object for every record in the RDD
rdd_gender = \
    sc.parallelize([Row(short_gender="M", long_gender="Male"),
                    Row(short_gender="F", long_gender="Female")])

# Create the dataframe 
(sqlContext.createDataFrame(rdd_gender)
           .registerTempTable("gender_maps"))

In [45]:
# Display the table
sqlContext.table("gender_maps").show()

+-----------+------------+
|long_gender|short_gender|
+-----------+------------+
|       Male|           M|
|     Female|           F|
+-----------+------------+



In [46]:
# JOIN parquet file and newly created dataframe
sqlContext.sql("""
    SELECT balance, long_gender, user_id 
    FROM parquet.`file:///tmp/complete_users.parquet` 
    JOIN gender_maps ON gender=short_gender""").show()

+-------+-----------+-------+
|balance|long_gender|user_id|
+-------+-----------+-------+
|    3.0|       Male|      5|
|    1.0|       Male|      1|
|    0.0|     Female|      3|
|   -0.5|     Female|      2|
+-------+-----------+-------+



### Clearing memory ###

In [47]:
# Check tables currently in-memory
sqlContext.tableNames()

[u'gender_maps', u'users']

In [48]:
# Drop tables in memory using dropTempTable
for table in sqlContext.tableNames():
    sqlContext.dropTempTable(table)