In [0]:
# sc already initialist = sparkcontext
sc

In [0]:
rdd = sc.parallelize(range(1000))
rdd

In [0]:
# see all elements in 'rdd':
# rdd.collect()
# see first element in 'rdd':
rdd.first()

In [0]:
rdd.getNumPartitions()

In [0]:
rdd.count()

In [0]:
rdd.sum()

In [0]:
rdd.max(), rdd.min(), rdd.mean(), rdd.stdev() # description stats method

In [0]:
rdd.stats() # basic stats all together

In [0]:
rdd.histogram() # returns bin boundaries, number of entries per bin
# N.B.: to have 5 bins, rdd.histogram(5)

In [0]:
rdd.sample(False,0.01, 52).collect()
# get subset of rdd, params: do you want replacement, fraction of set in sample
# see number for random seed
# produces an RDD, so need to collect
# this is not random, but pseudorandom based on given seed
# same seed, same numbers (wouldn't happen if truly random!)

# can assign to variables:
rdd_sample = rdd.sample(False,0.01, 23).collect()

In [0]:
# map function
def square(x):
  return x**2
# map method = apply argument to rdd
# take(x) = print x entries
rdd.map(square).take(5)

In [0]:
# OR
rdd.map(lambda x: x**2).take(5)

In [0]:
def hundreds_filter(x):
  return x % 100 == 0
rdd.filter(hundreds_filter).take(5)

In [0]:
print(rdd.filter(lambda x: x % 100 == 0).take(5))
# c.f.
rdd.map(lambda x: x % 100 == 0).take(5)

In [0]:
rdd2 = rdd.filter(lambda  x: x % 150 == 0).map(lambda x: (x**2)/5)
# took a fraction of the time despite being more complicated
# WHY? LAZY! saved to instruction, didn't run it!

In [0]:
rdd2.take(5) # output required -> runs calculations -> takes longer!

In [0]:
import random
random.seed(107)
keyrdd = rdd.map(lambda x: (random.choice(['a','b','c']), x))
keyrdd.take(5)
# produces tuple, but rdd will extract key, value pair even though it's not a dictionary and we're in Python

In [0]:
keyrdd.countByKey()

In [0]:
#keyrdd.countByValue() # will take key-value pair as its 'value' - API quirk
# this is to sort by value within pair: get seond element of tuple
keyrdd.map(lambda x: x[1]).countByValue()

In [0]:
keyrdd.collectAsMap()  # creates a Python dictionary object!
# returns final value with letter because dictionary keys MUST be unique
# -> value overwritten for each key conflict

In [0]:
keyrdd.groupByKey().take(3)

In [0]:
keyrdd.groupByKey().mapValues(list).take(3)
# gives each key + list of all values associated with that key

In [0]:
a = keyrdd.groupByKey().mapValues(sum).collect()
# gives each key + sum of all values associated with that key
# this is essentially a reduce function

In [0]:
# explicit reduce
b = keyrdd.reduceByKey(lambda x,y: x + y).collect()
# reduceByKey requires certain format for function argument
# need to function inputs: x = current cumulative value, y = next line value
# taking next line, add value, keep hold of result

In [0]:
keyrdd.map(lambda x: x[1]).reduce(lambda x,y:x+y)
# MapReduce for sum of rdd values

In [0]:
# TASK: find the average per key ('a','b','c')
# ANSWER need key, sum, count
a = keyrdd.mapValues(lambda x: (x, 1))
# create rdd with tuples of form ('b', (1, 1)) using mapValues
b = a.reduceByKey(lambda x,y:(x[0] + y[0], x[1] + y[1]))
# reduceByKey sets aside key -> 2 args = x,y 0th
b.mapValues(lambda x: x[0]/x[1]).collect()

# reduceByKey

In [0]:
# read in uploaded iris.csv data file
iris = sc.textFile('/FileStore/tables/iris.csv')

# find the average petal width, length per iris species

header = iris.first()
# print(header)
# produced: sepal_length,sepal_width,petal_length,petal_width,species
iris_nohead = iris.filter(lambda x: x != header)
split_iris = iris_nohead.map( lambda x: x.split(','))
iris_data = split_iris.map( lambda x: (x[4],(float(x[2]),float(x[3]), 1)))

reduced_by_species = iris_data.reduceByKey(lambda x,y:(x[0] + y[0], x[1] + y[1], x[2] + y[2]))
# remember, 2 args: x = current cumultive, y = next line
print('species,  petal_length,  petal_width')
#reduced_by_species.mapValues(lambda x: ((x[0]/x[2]), x[1]/x[2])).collect()
reduced_by_species.mapValues(lambda x: (round(x[0]/x[2],3), round(x[1]/x[2],3))).collect()
# FROM BEFORE: keyrdd = rdd.map(lambda x: (random.choice(['a','b','c']), x))