This is a demo of how to run PySpark using Python notebooks based on notes prepared by Shilpika, Venkat, and George.

See https://docs.google.com/document/d/1PHGLAbDOZzdiCnoWA6PUOTzt7Lrc_UGq5CZmQj1qW5A/edit?usp=sharing for details.


Let's start by sanity checking the Python and PySpark environment...

In [1]:
import sys
sys.version

'3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]'

In [2]:
sc

Ensure sys.version and sc.pythonVer are reasonably compatible. In our example, we're working with Python 3.6.

In [7]:
sc.pythonVer

'3.6'

Ok, now we now the Python and PySpark versions of Python are (reasonably) compatible.

Now we can *try* using PySpark to do some distributed things.


Start by creating a parallelized collection (RDD). The easiest way to do this is to map over a collection of integers, but Spark also gives you ways of parallelizing based on *any* collection (lines in a file, fileset, etc.)

In [1]:
rdd = sc.parallelize(range(0, 1000), 48)

The easiest sanity check is to make sure we can do a simple reduce.

In [2]:
rdd.reduce(lambda x, y: x + y)

499500

Now we're going to be more ambitious and make sure that we're getting actual nodes. We'll use our friend, ``socket.gethostname()`` to do this magic.

In [3]:
import socket


What you see here may vary. Cobalt allocates whatever nodes it wants. So your output will likely differ.

In [4]:
socket.gethostname()

'cc040'

In [5]:
rdd

PythonRDD[2] at RDD at PythonRDD.scala:48

Here we map each element to the hostname. We use a list to make it easy to do a reduce of list + list, which will eventually be turned into a set. This is not efficient but allows us to use the commutative list concatenation operator, which place nicely with map/reduce thinking.

In [6]:
rdd_hostnames = rdd.map(lambda id: [socket.gethostname()])

In [7]:
hosts_used = rdd_hostnames.reduce(lambda x, y: x + y)

We use the Python set to take all list items and remove duplicates.

In [8]:
unique_hosts = set(hosts_used)


In [9]:
print(unique_hosts)

{'cc040', 'cc118', 'cc100', 'cc117'}


This an aside to explain some functional programming ideas that have been incorporated into Python. All of these come originally from the Lisp programming language. Programming languages such as Scala make extensive use of lambdas and functional programming throughout the language/library design and combine this with full type safety. But this is pretty awesome. Just keep in mind that without some care, it can become inefficient.

In [10]:
integers = range(0, 10)
squares = list(map(lambda x: x * x, integers))

In [11]:
print(squares)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [12]:
even_squares = list(filter(lambda x: x % 2 == 0, squares))
print(even_squares)

[0, 4, 16, 36, 64]


In [13]:
from functools import reduce
int_sum = reduce(lambda x, y: x + y, integers)
print(int_sum)

45


Python supports many ideas from functional programming beyond map, reduce, and filter. Sometimes these are scattered among libraries ``functools`` and ``itertools``. Python has always had support for "programming in the infinite" using generator functions. Here, we'll generate an infinite sequence of integers.

In [14]:
def int_generator():
    i = 0
    while True:
        yield i
        i += 1

...which we can make finite using ``takewhile()``.

In [15]:
from itertools import takewhile

infinite_integers = int_generator()
ints_to_100 = takewhile(lambda x: x < 100, infinite_integers)


In [16]:
import os
import os.path


In [17]:
dirname = os.path.join("/scratch", "100000")
print(dirname)

/scratch/100000


In [20]:
if not os.path.exists(dirname): os.makedirs(dirname)

In [24]:
contents = os.listdir("/scratch")
contents[:min(len(contents), 10)]  # for conciseness


['542', '311', '461', '55', '553', '377', '807', '626', '637', '132']

In [25]:
def create_dir_id(id):
    id = str(id)
    dirname = os.path.join("/scratch", id)
    if not os.path.exists(dirname): os.makedirs(dirname)
    return dirname

In [26]:
create_dir_id(1000000)

'/scratch/1000000'

In [29]:
contents = os.listdir("/scratch")
contents[:min(len(contents), 10)]  # for conciseness



['542', '311', '461', '55', '553', '377', '807', '626', '637', '132']

In [32]:
rdd2 = rdd.map(lambda dir_id: create_dir_id(dir_id))
rdd2.cache()
rdd3 = rdd2.map(lambda pathname: [ '%s:%s' % (socket.gethostname(),  pathname) ]  )
result = rdd3.reduce(lambda x, y: x + y)

In [33]:
result[:min(len(result), 10)]

['cc117:/scratch/0',
 'cc117:/scratch/1',
 'cc117:/scratch/2',
 'cc117:/scratch/3',
 'cc117:/scratch/4',
 'cc117:/scratch/5',
 'cc117:/scratch/6',
 'cc117:/scratch/7',
 'cc117:/scratch/8',
 'cc117:/scratch/9']

In [55]:
def touch_file(pathname):
    datafile = os.path.join(pathname, "data.txt")
    os.system("touch %s" % datafile)
    return datafile


In [56]:
touch_file("/scratch/1000000")

'/scratch/1000000/data.txt'

In [57]:
os.listdir("/scratch/1000000")

['data.txt']

In [58]:
rdd4 = rdd2.map(lambda pathname: [ touch_file(pathname) ])


In [59]:
result = rdd4.reduce(lambda x, y: x + y)
result

['/scratch/0/data.txt',
 '/scratch/1/data.txt',
 '/scratch/2/data.txt',
 '/scratch/3/data.txt',
 '/scratch/4/data.txt',
 '/scratch/5/data.txt',
 '/scratch/6/data.txt',
 '/scratch/7/data.txt',
 '/scratch/8/data.txt',
 '/scratch/9/data.txt',
 '/scratch/10/data.txt',
 '/scratch/11/data.txt',
 '/scratch/12/data.txt',
 '/scratch/13/data.txt',
 '/scratch/14/data.txt',
 '/scratch/15/data.txt',
 '/scratch/16/data.txt',
 '/scratch/17/data.txt',
 '/scratch/18/data.txt',
 '/scratch/19/data.txt',
 '/scratch/20/data.txt',
 '/scratch/21/data.txt',
 '/scratch/22/data.txt',
 '/scratch/23/data.txt',
 '/scratch/24/data.txt',
 '/scratch/25/data.txt',
 '/scratch/26/data.txt',
 '/scratch/27/data.txt',
 '/scratch/28/data.txt',
 '/scratch/29/data.txt',
 '/scratch/30/data.txt',
 '/scratch/31/data.txt',
 '/scratch/32/data.txt',
 '/scratch/33/data.txt',
 '/scratch/34/data.txt',
 '/scratch/35/data.txt',
 '/scratch/36/data.txt',
 '/scratch/37/data.txt',
 '/scratch/38/data.txt',
 '/scratch/39/data.txt',
 '/scratch

We can verify that the operation worked by running `os.listdir()` on the driver (to see what files were created by executor running on this same node.

In [60]:
os.listdir("/scratch")

['542',
 '311',
 '461',
 '55',
 '553',
 '377',
 '807',
 '626',
 '637',
 '132',
 '472',
 '632',
 '306',
 '297',
 '474',
 '543',
 '52',
 '383',
 '561',
 '470',
 '221',
 '641',
 '552',
 '644',
 '135',
 '219',
 '305',
 '217',
 '710',
 '45',
 '882',
 '208',
 '891',
 '42',
 '386',
 '476',
 '793',
 '640',
 '973',
 '888',
 '627',
 '714',
 '961',
 '880',
 '223',
 '963',
 '884',
 '41',
 '391',
 '709',
 '642',
 '209',
 '43',
 '798',
 '465',
 '60',
 '878',
 '128',
 '137',
 '376',
 '393',
 '725',
 '559',
 '213',
 '549',
 '375',
 '643',
 '803',
 '978',
 '716',
 '811',
 '894',
 '390',
 '469',
 '293',
 '974',
 '958',
 '797',
 '131',
 '877',
 '303',
 '629',
 '291',
 '129',
 '806',
 '143',
 '61',
 '210',
 '968',
 '555',
 '726',
 '140',
 '385',
 '130',
 '890',
 '876',
 '473',
 '127',
 '967',
 '51',
 '394',
 '388',
 '975',
 '708',
 '301',
 '125',
 '464',
 '545',
 '796',
 '100000',
 '554',
 '141',
 '56',
 '308',
 '541',
 '384',
 '46',
 '50',
 '809',
 '639',
 '970',
 '550',
 '959',
 '546',
 '225',
 '295',
 

In [61]:
os.listdir("/scratch/386")

['data.txt']