In [1]:
%matplotlib inline
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_context('notebook', font_scale=1.5)
sns.set_style('white')
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
import numpy as np

In [2]:
import pyspark
sc = pyspark.SparkContext()

In [3]:
import findspark
findspark.init()

In [4]:
## Simple example

list1 = sc.parallelize(range(1,1000))
print(type(list1))

## First element
list1.first()

<class 'pyspark.rdd.PipelinedRDD'>


1

### lambda

In [5]:
list2 = list1.map(lambda x: x*10)

In [6]:
## Reduce function output's a single value

list2.reduce(lambda x,y:x+y)

4995000

In [7]:
## Filter function gives the list of values that satisfy a certain condition
## collect method gives a list

lis = list2.filter(lambda x: x%100 == 0).collect()

Transformations Vs actions

RDD methods of two kinds:
1. Transformations:   
    i.  Return another RDD  
    ii. Are not performed until an action is called (lazy)
2. Actions  
    i.  Return a value other than an RDD  
    ii. Performed immediately

In [8]:
## Chaining 

sc.parallelize(range(1,10)).map(lambda x: x*x).filter(lambda x: x%3 ==0).reduce(lambda x,y:x*y)

26244

In [9]:
## Loading data

people = sc.textFile("data/people.txt")
people.first()

'Orlando\tM\t40\tPython'

In [10]:
people.map(lambda x: x.split('\t')).first()

['Orlando', 'M', '40', 'Python']

In [11]:
## one line code

people = sc.textFile("data/people.txt").map(lambda x: x.split('\t'))
people.take(10)

[['Orlando', 'M', '40', 'Python'],
 ['Lina', 'F', '39', 'C#'],
 ['John', 'M', '30', 'Python'],
 ['Jane', 'F', '32', 'Python'],
 ['Michelle', 'F', '18', 'Python'],
 ['Daniel', 'M', '20', 'C#']]

In [12]:
## Reduce by key

people.map(lambda t: (t[1],1)).reduceByKey(lambda x,y:x+y).collect()

[('M', 3), ('F', 3)]

In [13]:
people.map(lambda t: (t[3],1)).reduceByKey(lambda x,y:x+y).collect()

[('Python', 4), ('C#', 2)]

In [14]:
people.map(lambda t: (t[1],int(t[2]))).collect()

[('M', 40), ('F', 39), ('M', 30), ('F', 32), ('F', 18), ('M', 20)]

you can send files within nodes,  
--py-files(.py,.zip,.egg)    
--jars(java jars)  
--packages, --repositories  
--files to include arbitary files in home folder of executor

get ouf of pyspark
ctrl-D


In [15]:
#sc.addPyFile("data/person.py")

#import person
#people = sc.textFile("data/people.txt").map(lambda l: people.Person().parse(l))

In [16]:
## all .txt files will be loaded
sales = sc.textFile("data/sales/sales_*.txt")

In [21]:
sales.take(10)

['2014-01-01\t1\t1\t100',
 '2014-01-01\t1\t2\t37',
 '2014-01-01\t1\t3\t54',
 '2014-01-01\t2\t1\t50',
 '2014-01-01\t2\t2\t40',
 '2014-01-01\t3\t1\t75',
 '2014-01-01\t4\t4\t1',
 '2014-01-02\t1\t1\t10',
 '2014-01-02\t1\t2\t31',
 '2014-01-02\t1\t3\t5']

In [27]:
## join example

states = [ ("AL","123"),
          ("DE","324"),
          ("ED","213")
         ]

pop = [("AL",12),
      ("DE",23),
      ("ED",34)
      ]

states_rdd = sc.parallelize(states)
pop_rdd = sc.parallelize(pop)

states_rdd.join(pop_rdd).collect()

[('DE', ('324', 23)), ('AL', ('123', 12)), ('ED', ('213', 34))]