## Sharing data within the cluster

##### Read-only variables (broadcast)

In [1]:
# Example: let's encode the gender found in the demographic data
# As a hot encode. Note: the association should be the same
# on every machine in the cluster, requiring a shared mapping

one_hot_encoding = {"M": (1, 0, 0),
                    "F": (0, 1, 0),
                    "U": (0, 0, 1)
                   }

In [2]:
# Gender one-hot-encoding
(sc.parallelize(["M", "F", "U", "F", "M", "U"])
   .map(lambda x: one_hot_encoding[x])
   .collect())

# The command above works only in the single node configuration
# since the variable "one_hot_encoding" is defined only on this machine
# On a multi-node cluster, it will raise a Java error

[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [3]:
# Solution 1: include the encoding map in the .map() function 
# In this way, all the nodes will see it

def map_ohe(x):
    ohe = {"M": (1, 0, 0),
           "F": (0, 1, 0),
           "U": (0, 0, 1)
          }
    return ohe[x]

sc.parallelize(["M", "F", "U", "F", "M", "U"]).map(map_ohe).collect()



[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [4]:
# Solution 2: broadcast the map to all the nodes.
# All of them will be able to read-only it

bcast_map = sc.broadcast(one_hot_encoding)

def bcast_map_ohe(x, shared_ohe):
    return shared_ohe[x]

(sc.parallelize(["M", "F", "U", "F", "M", "U"])
 .map(lambda x: bcast_map_ohe(x, bcast_map.value))
 .collect())

[(1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 1, 0), (1, 0, 0), (0, 0, 1)]

In [5]:
bcast_map.unpersist()

##### Write-only variables (broadcast)

In [6]:
# Let's coint the empty line in a file

print "The number of empty lines is:"

(sc.textFile('file:///home/vagrant/datasets/hadoop_git_readme.txt')
   .filter(lambda line: len(line) == 0)
   .count())

The number of empty lines is:


6

In [7]:
# Let's count the lines in a file, and at the same time,
# count the empty ones

accum = sc.accumulator(0)

def split_line(line):   
    if len(line) == 0:
        accum.add(1)
    return 1

tot_lines = (
    sc.textFile('file:///home/vagrant/datasets/hadoop_git_readme.txt')
      .map(split_line)
      .count())

empty_lines = accum.value


print "In the file there are %d lines" % tot_lines
print "And %d lines are empty" % empty_lines

In the file there are 31 lines
And 6 lines are empty


# Real world example with broadcast and accumulator
### train multiple classifiers and select the best one, accumulating the errors

In [8]:
# step 1: load the dataset
# note: if the dataset is large, you should read the next section

from sklearn.datasets import load_iris

bcast_dataset = sc.broadcast(load_iris())

In [9]:
# step 2: create an accumulator that stores the errors in a list

from pyspark import AccumulatorParam

class ErrorAccumulator(AccumulatorParam):
    def zero(self, initialList):
        return initialList

    def addInPlace(self, v1, v2):
        if not isinstance(v1, list):
            v1 = [v1]
        if not isinstance(v2, list):
            v2 = [v2]
        return v1 + v2

errAccum = sc.accumulator([], ErrorAccumulator())

In [10]:
# step 3: create mappers: each of them will use a classifier

def apply_classifier(clf, dataset):
    
    clf_name = clf.__class__.__name__
    X = dataset.value.data
    y = dataset.value.target
    
    try:
        from sklearn.metrics import accuracy_score
        
        clf.fit(X, y)
        y_pred = clf.predict(X)
        acc = accuracy_score(y, y_pred)

        return [(clf_name, acc)]

    except Exception as e:
        errAccum.add((clf_name, str(e)))
        return []


In [11]:
from sklearn.linear_model import SGDClassifier
from sklearn.dummy import DummyClassifier
from sklearn.decomposition import PCA
from sklearn.manifold import MDS

classifiers = [DummyClassifier('most_frequent'), 
               SGDClassifier(), 
               PCA(), 
               MDS()]

(sc.parallelize(classifiers)
     .flatMap(lambda x: apply_classifier(x, bcast_dataset))
     .collect())

[('DummyClassifier', 0.33333333333333331),
 ('SGDClassifier', 0.66666666666666663)]

In [12]:
print "The errors are:"
errAccum.value

The errors are:


[('PCA', "'PCA' object has no attribute 'predict'"),
 ('MDS',
  "Proximity must be 'precomputed' or 'euclidean'. Got euclidean instead")]

In [13]:
bcast_dataset.unpersist()

# Load the data

In [14]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [15]:
!cat /home/vagrant/datasets/users.json

{"user_id":0, "balance": 10.0}
{"user_id":1, "gender":"M", "balance": 1.0}
{"user_id":2, "gender":"F", "balance": -0.5}
{"user_id":3, "gender":"F", "balance": 0.0}
{"user_id":4, "balance": 5.0}
{"user_id":5, "gender":"M", "balance": 3.0}

In [16]:
df = sqlContext.read.json("file:///home/vagrant/datasets/users.json")
df.show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|   10.0|  null|      0|
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    5.0|  null|      4|
|    3.0|     M|      5|
+-------+------+-------+



In [17]:
df.printSchema()

root
 |-- balance: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- user_id: long (nullable = true)



In [18]:
(df.filter(df['gender'] != 'null')
   .filter(df['balance'] > 0)
   .select(['balance', 'gender', 'user_id'])
   .show())

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



In [19]:
(df.filter('gender is not null')
   .filter('balance > 0').select("*").show())

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



In [20]:
df.filter('gender is not null and balance > 0').show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|    3.0|     M|      5|
+-------+------+-------+



In [21]:
df.na.drop().show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    3.0|     M|      5|
+-------+------+-------+



In [22]:
df.na.drop(subset=["gender"]).show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    3.0|     M|      5|
+-------+------+-------+



In [23]:
df.na.fill({'gender': "U", 'balance': 0.0}).show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|   10.0|     U|      0|
|    1.0|     M|      1|
|   -0.5|     F|      2|
|    0.0|     F|      3|
|    5.0|     U|      4|
|    3.0|     M|      5|
+-------+------+-------+



In [24]:
(df.na.fill({'gender': "U", 'balance': 0.0})
   .groupBy("gender").avg('balance').show())

+------+------------+
|gender|avg(balance)|
+------+------------+
|     F|       -0.25|
|     M|         2.0|
|     U|         7.5|
+------+------------+



In [25]:
df.registerTempTable("users")

In [26]:
sqlContext.sql("""
    SELECT gender, AVG(balance) 
    FROM users 
    WHERE gender IS NOT NULL 
    GROUP BY gender""").show()

+------+-----+
|gender|  _c1|
+------+-----+
|     F|-0.25|
|     M|  2.0|
+------+-----+



In [27]:
type(sqlContext.table("users"))

pyspark.sql.dataframe.DataFrame

In [28]:
sqlContext.table("users").collect()

[Row(balance=10.0, gender=None, user_id=0),
 Row(balance=1.0, gender=u'M', user_id=1),
 Row(balance=-0.5, gender=u'F', user_id=2),
 Row(balance=0.0, gender=u'F', user_id=3),
 Row(balance=5.0, gender=None, user_id=4),
 Row(balance=3.0, gender=u'M', user_id=5)]

In [29]:
a_row = sqlContext.sql("SELECT * FROM users").first()
a_row

Row(balance=10.0, gender=None, user_id=0)

In [30]:
print a_row['balance']
print a_row.balance

10.0
10.0


In [31]:
a_row.asDict()

{'balance': 10.0, 'gender': None, 'user_id': 0}

In [32]:
!rm -rf /tmp/complete_users*

In [33]:
(df.na.drop().write
   .save("file:///tmp/complete_users.json", format='json'))

In [34]:
!ls -als /tmp/complete_users.json

total 28
4 drwxrwxr-x  2 vagrant vagrant 4096 May 10 20:36 .
4 drwxrwxrwt 22 root    root    4096 May 10 20:36 ..
4 -rw-r--r--  1 vagrant vagrant   83 May 10 20:36 part-r-00000-f5728f74-10d9-4c7a-8865-64cb80c7ca0a
4 -rw-rw-r--  1 vagrant vagrant   12 May 10 20:36 .part-r-00000-f5728f74-10d9-4c7a-8865-64cb80c7ca0a.crc
4 -rw-r--r--  1 vagrant vagrant   82 May 10 20:36 part-r-00001-f5728f74-10d9-4c7a-8865-64cb80c7ca0a
4 -rw-rw-r--  1 vagrant vagrant   12 May 10 20:36 .part-r-00001-f5728f74-10d9-4c7a-8865-64cb80c7ca0a.crc
0 -rw-r--r--  1 vagrant vagrant    0 May 10 20:36 _SUCCESS
4 -rw-rw-r--  1 vagrant vagrant    8 May 10 20:36 ._SUCCESS.crc


In [35]:
sqlContext.sql(
    "SELECT * FROM json.`file:///tmp/complete_users.json`").show()

+-------+------+-------+
|balance|gender|user_id|
+-------+------+-------+
|    0.0|     F|      3|
|    3.0|     M|      5|
|    1.0|     M|      1|
|   -0.5|     F|      2|
+-------+------+-------+



In [36]:
df.na.drop().write.save(
    "file:///tmp/complete_users.parquet", format='parquet')

In [37]:
!ls -als /tmp/complete_users.parquet/

total 44
4 drwxrwxr-x  2 vagrant vagrant 4096 May 10 20:36 .
4 drwxrwxrwt 23 root    root    4096 May 10 20:36 ..
4 -rw-r--r--  1 vagrant vagrant  376 May 10 20:36 _common_metadata
4 -rw-rw-r--  1 vagrant vagrant   12 May 10 20:36 ._common_metadata.crc
4 -rw-r--r--  1 vagrant vagrant 1082 May 10 20:36 _metadata
4 -rw-rw-r--  1 vagrant vagrant   20 May 10 20:36 ._metadata.crc
4 -rw-r--r--  1 vagrant vagrant  750 May 10 20:36 part-r-00000-810195c2-ffa9-4a54-add7-61e6a7c92095.gz.parquet
4 -rw-rw-r--  1 vagrant vagrant   16 May 10 20:36 .part-r-00000-810195c2-ffa9-4a54-add7-61e6a7c92095.gz.parquet.crc
4 -rw-r--r--  1 vagrant vagrant  746 May 10 20:36 part-r-00001-810195c2-ffa9-4a54-add7-61e6a7c92095.gz.parquet
4 -rw-rw-r--  1 vagrant vagrant   16 May 10 20:36 .part-r-00001-810195c2-ffa9-4a54-add7-61e6a7c92095.gz.parquet.crc
0 -rw-r--r--  1 vagrant vagrant    0 May 10 20:36 _SUCCESS
4 -rw-rw-r--  1 vagrant vagrant    8 May 10 20:36 ._SUCCESS.crc


In [38]:
from pyspark.sql import Row

rdd_gender = \
    sc.parallelize([Row(short_gender="M", long_gender="Male"),
                    Row(short_gender="F", long_gender="Female")])

(sqlContext.createDataFrame(rdd_gender)
           .registerTempTable("gender_maps"))

In [39]:
sqlContext.table("gender_maps").show()

+-----------+------------+
|long_gender|short_gender|
+-----------+------------+
|       Male|           M|
|     Female|           F|
+-----------+------------+



In [40]:
sqlContext.sql("""
    SELECT balance, long_gender, user_id 
    FROM parquet.`file:///tmp/complete_users.parquet` 
    JOIN gender_maps ON gender=short_gender""").show()

+-------+-----------+-------+
|balance|long_gender|user_id|
+-------+-----------+-------+
|    1.0|       Male|      1|
|    3.0|       Male|      5|
|   -0.5|     Female|      2|
|    0.0|     Female|      3|
+-------+-----------+-------+



In [41]:
sqlContext.tableNames()

[u'gender_maps', u'users']

In [42]:
for table in sqlContext.tableNames():
    sqlContext.dropTempTable(table)