# Activity 1: Setting Up

In [2]:
import findspark

# provide path to your spark directory directly
findspark.init("spark2")

import pyspark

IndexError: list index out of range

**Ok, so we have an error. Now what?**

**Did you start the Spark instance first?**

        cd spark2/sbin
        ./start-master.sh

**Have you specifed the path correctly?**

In [13]:
import findspark

# provide path to your spark directory directly
findspark.init("../spark2/")

import pyspark

**Now let's create a SparkContext and use it to count the number of lines in a file. For that, let's create a text file first.**

        cd
        ls >> helloworld
        cat helloworld

In [5]:
sc = pyspark.SparkContext(appName="helloworld")

def nonempty(x):
    len(x) > 0
    
# let's test our setup by counting the number of nonempty lines in a text file
lines = sc.textFile('README.md')
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
lines_nonempty = lines.filter( nonempty )
lines_nonempty.count()

3

**Ok, so we can't run multiple SparkContexts at once! What about running the one created before?**

In [3]:
# let's test our setup by counting the number of nonempty lines in a text file
lines = sc.textFile('README.md')
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
lines_nonempty.count()

169

# Activity 2: Using Anonyous Functions

**Let's use *lambda* to create an anonymous function to count the number of lines containing *Python*.**

In [4]:
%%bash
text="Python is a fun language,\n
but then what language\n
is not, if\n
I may ask. But Python\n
is also."

echo -e $text > python.txt
cat python.txt

Python is a fun language,
 but then what language
 is not, if
 I may ask. But Python
 is also.


In [None]:
lines = sc.textFile("python.txt")
pythonLines = lines.filter(lambda line: "Python" in line)
print("No of lines containing 'Python':", pythonLines.count())

**Well, do explain the answer.**

# Activity 3: Counting Primes

**We’ll go ahead and calculate the number of primes less than a given large number. To start with, we'll define a function that determines the primality of any given number (we'll later parallelize this function on a set of numbers).**

In [6]:
def isprime(n):
    """
    check if integer n is a prime
    """
    # make sure n is a positive integer
    n = abs(int(n))
    # 0 and 1 are not primes
    if n < 2:
        return False
    # 2 is the only even prime number
    if n == 2:
        return True
    # all other even numbers are not primes
    if not n & 1:
        return False
    # range starts with 3 and only needs to go up the square root of n
    # for all odd numbers
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [9]:
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(range(1000000))

# Compute the number of primes in the RDD
print(nums.filter(isprime))

PythonRDD[8] at RDD at PythonRDD.scala:48


# Activity 4:  Word and Line Counting

In [7]:
import re
from operator import add

filein = sc.textFile('README.md')

**Count lines**

In [8]:
print('number of lines in file: %s' % filein.count())

number of lines in file: 240


In [10]:
filein

README.md MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:-2

In [11]:
filein.collect()

['# Setting up your system',
 '',
 '',
 '1. Set up Python (Install Anaconda and necessary packages)',
 '3. Using Anaconda and Jupyter Notebooks',
 '4. Install and configure Spark, findspark and pySpark',
 '',
 'The instructions have been tested only on Ubuntu and OS X. Please raise a issue if you hit any snags on your windows system.',
 '',
 '---',
 '',
 '# Set up Python',
 '',
 '## Install Anaconda',
 '',
 'From the Anaconda [docs](http://conda.pydata.org/docs):',
 '',
 '> Conda is an open source package management system and environment management system',
 'for installing multiple versions of software packages and their dependencies and',
 'switching easily between them. It works on Linux, OS X and Windows, and was created',
 'for Python programs but can package and distribute any software.',
 '',
 '## Overview',
 '',
 'Using Anaconda consists of the following:',
 '',
 '1. Install [`miniconda`](http://conda.pydata.org/miniconda.html) on your computer',
 '2. Create a new `conda` [env

**Count non-empty lines**

In [None]:
filein_nonempty = filein.filter( lambda x: len(x) > 0 )
print('number of non-empty lines in file: %s' % filein_nonempty.count()) 

**Count no of characters**

In [None]:
chars = filein.map(lambda s: len(s)).reduce(add)
print('number of characters in file: %s' % chars)

**Count words of length greater than 3 characters**

In [15]:
words = filein.flatMap(lambda line: re.split('\W+', line.lower().strip()))
words = words.filter(lambda x: len(x) > 3)
words.collect()

['setting',
 'your',
 'system',
 'python',
 'install',
 'anaconda',
 'necessary',
 'packages',
 'using',
 'anaconda',
 'jupyter',
 'notebooks',
 'install',
 'configure',
 'spark',
 'findspark',
 'pyspark',
 'instructions',
 'have',
 'been',
 'tested',
 'only',
 'ubuntu',
 'please',
 'raise',
 'issue',
 'snags',
 'your',
 'windows',
 'system',
 'python',
 'install',
 'anaconda',
 'from',
 'anaconda',
 'docs',
 'http',
 'conda',
 'pydata',
 'docs',
 'conda',
 'open',
 'source',
 'package',
 'management',
 'system',
 'environment',
 'management',
 'system',
 'installing',
 'multiple',
 'versions',
 'software',
 'packages',
 'their',
 'dependencies',
 'switching',
 'easily',
 'between',
 'them',
 'works',
 'linux',
 'windows',
 'created',
 'python',
 'programs',
 'package',
 'distribute',
 'software',
 'overview',
 'using',
 'anaconda',
 'consists',
 'following',
 'install',
 'miniconda',
 'http',
 'conda',
 'pydata',
 'miniconda',
 'html',
 'your',
 'computer',
 'create',
 'conda',
 'envi

In [14]:
words = filein.map(lambda line: re.split('\W+', line.lower().strip()))
words = words.filter(lambda x: len(x) > 3)
words.collect()

[['', 'setting', 'up', 'your', 'system'],
 ['1',
  'set',
  'up',
  'python',
  'install',
  'anaconda',
  'and',
  'necessary',
  'packages',
  ''],
 ['3', 'using', 'anaconda', 'and', 'jupyter', 'notebooks'],
 ['4', 'install', 'and', 'configure', 'spark', 'findspark', 'and', 'pyspark'],
 ['the',
  'instructions',
  'have',
  'been',
  'tested',
  'only',
  'on',
  'ubuntu',
  'and',
  'os',
  'x',
  'please',
  'raise',
  'a',
  'issue',
  'if',
  'you',
  'hit',
  'any',
  'snags',
  'on',
  'your',
  'windows',
  'system',
  ''],
 ['', 'set', 'up', 'python'],
 ['from',
  'the',
  'anaconda',
  'docs',
  'http',
  'conda',
  'pydata',
  'org',
  'docs',
  ''],
 ['',
  'conda',
  'is',
  'an',
  'open',
  'source',
  'package',
  'management',
  'system',
  'and',
  'environment',
  'management',
  'system'],
 ['for',
  'installing',
  'multiple',
  'versions',
  'of',
  'software',
  'packages',
  'and',
  'their',
  'dependencies',
  'and'],
 ['switching',
  'easily',
  'between',
 

In [None]:
words = words.map(lambda w: (w, 1))
words = words.reduceByKey(add)
print('number of words with more than 3 characters in file: %s' % words.count())

# Activity 5: Workflow Template

In [19]:
## Spark Application Template - execute with spark-submit

## Imports
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Name of Application"  #helps in debugging

## Closure Functions

## Main functionality

def main(sc):
    pass

if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    sc   = SparkContext(conf=conf)

    # Execute Main functionality
    main(sc)

# To close or exit the program use sc.stop() or sys.exit(0)

In [3]:
from add import *

add(5,6)

9


11

# Activity 6: Sample Application

In [18]:
sc.stop()

In [None]:
import findspark

# provide path to your spark directory directly
findspark.init("/Users/soumendra/spark2/")

import pyspark

In [None]:
%matplotlib inline
## Imports
import csv
import matplotlib.pyplot as plt

from io import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext

## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"

fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)

## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """

    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])

def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()

def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))

    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)

    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')

    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))

    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')

    plt.title('Total Minutes Delayed per Airline')
    plt.show()

## Main functionality
def main(sc):

    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())

    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)

    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)

    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))

    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))

    # Provide output from the driver
    for d in delays:
        print("%0.0f minutes delayed\t%s" % (d[1], d[0]))

    # Show a bar chart of the delays
    plot(delays)

if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
    # Uncomment the lines above when running the application with "submit" (spark-submit app.py)
    # Comment the lines above out when running in IPython Notebook

    # Execute Main functionality
    main(sc)