In [1]:
import pyspark
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)

In [2]:
sc.stop()

In [3]:
sc.parallelize(range(10)).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [4]:
textFile = sc.textFile("data/lotr.txt")

In [9]:
textFile.first()


'When Mr. Bilbo Baggins of Bag End announced that he would shortly be celebrating his eleventy-first birthday with a party of special magnificence, there was much talk and excitement in Hobbiton.'

In [7]:
linesWithBilbo = textFile.filter(lambda line: "Bilbo" in line)

In [8]:
linesWithBilbo.count()

71

In [129]:
rdd = sc.textFile("data/products.txt")

In [130]:
prod = rdd.map(lambda s: s.split())

In [131]:
prod = prod.map(lambda k: (int(k[0]), k[1], float(k[2]), k[3]))

In [132]:
prod.collect()


[(1, 'apple', 3.3, 'fruit'),
 (2, 'orange', 6.5, 'fruit'),
 (3, 'beer', 10.0, 'beverage'),
 (4, 'wine', 15.0, 'beverage'),
 (5, 'TAOCP1', 20.9, 'book'),
 (6, 'TAOCP2', 5.2, 'book'),
 (7, 'film1', 30.0, 'film'),
 (8, 'film2', 20.4, 'film'),
 (9, 'film3', 33.0, 'film'),
 (10, 'film4', 42.9, 'film'),
 (11, 'film5', 13.3, 'film'),
 (12, 'milk', 7.6, 'beverage'),
 (13, 'banana', 5.3, 'fruit'),
 (14, 'grapes', 7.5, 'fruit'),
 (15, 'soda', 4.0, 'beverage'),
 (16, 'water', 2.0, 'beverage'),
 (17, 's_water', 3.5, 'beverage'),
 (18, 'grappe', 8.2, 'fruit'),
 (19, 'pear', 7.0, 'fruit'),
 (20, 'strawberry', 9.0, 'fruit'),
 (21, 'plum', 4.4, 'fruit'),
 (22, 'mango', 17.5, 'fruit')]

In [76]:
def filter_prod20(value):
    _, _, price, _ = value
    return price > 20
prod20 = prod.filter(filter_prod20)

In [77]:
prod20.collect()

[(5, 'TAOCP1', 20.9, 'book'),
 (7, 'film1', 30.0, 'film'),
 (8, 'film2', 20.4, 'film'),
 (9, 'film3', 33.0, 'film'),
 (10, 'film4', 42.9, 'film')]

In [81]:
def sort_prod20(value):
    _, _, price, _ = value
    return price


prod20s = prod20.sortBy(sort_prod20)


In [82]:
prod20s.collect()

[(8, 'film2', 20.4, 'film'),
 (5, 'TAOCP1', 20.9, 'book'),
 (7, 'film1', 30.0, 'film'),
 (9, 'film3', 33.0, 'film'),
 (10, 'film4', 42.9, 'film')]

In [92]:
def func_map(v):
    _, _, price, category = v
    return category, price

maxpricebycat = prod.map(func_map)

def func_reduce(a, b):
    return a if a > b else b

maxpricebycat = maxpricebycat.reduceByKey(func_reduce)

maxpricebycat.collect()


[('fruit', 17.5), ('beverage', 15.0), ('book', 20.9), ('film', 42.9)]

In [91]:
def func_map(v):
    _, _, _, category = v
    return (category, 1)


countbycat = prod.map(func_map)


from operator import add

countbycat = countbycat.reduceByKey(add)

countbycat.collect()


[('fruit', 9), ('beverage', 6), ('book', 2), ('film', 5)]

In [96]:
def func_map(v):
    _, _, price, category = v
    return category, price


badavg = prod.map(func_map)

def sort_func(value):
    _, price = value
    return price


badavg = badavg.sortBy(sort_func)

def avg(a, b):
    return 0.5*(a+b)


badavg = badavg.reduceByKey(avg)

badavg.collect()


[('beverage', 11.371875),
 ('fruit', 12.885546875),
 ('book', 13.049999999999999),
 ('film', 35.55625)]

In [108]:
def func_map(v):
    _, _, price, category = v
    return category, (price, 1)


goodavg = prod.map(func_map)


def func_reduce(a, b):
    pricea, counta = a
    priceb, countb = b
    return pricea + priceb, counta + countb


goodavg = goodavg.reduceByKey(func_reduce)


def func_map(v):
    category, (price, count) = v
    return category, price / count


goodavg = goodavg.map(func_map)

goodavg.collect()


[('fruit', 7.633333333333332),
 ('beverage', 7.016666666666667),
 ('book', 13.049999999999999),
 ('film', 27.920000000000005)]

In [120]:
rddA = sc.textFile("data/A.txt")
rddB = sc.textFile("data/B.txt")

A = rddA.map(lambda v: int(v)).distinct()
B = rddB.map(lambda v: int(v)).distinct()


In [117]:
A.collect(), B.collect()

([1, 2, 4, 6, 8, 10, 12, 14], [1, 3, 5, 7, 9, 10, 12, 14, 16, 18, 20])

In [122]:
unionAB = A.union(B).distinct()

unionAB.collect()


[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 1, 3, 5, 7, 9]

In [125]:
interAB = A.intersection(B)

interAB.collect()


[10, 12, 14, 1]

In [139]:

Ams = A.map(lambda e: (e, (1, 0)))
Bms = B.map(lambda e: (e, (0, 1)))

ABms = Ams.union(Bms)

def reduce_func(a, b):
    Ac1, Bc1 = a
    Ac2, Bc2 = b

    return Ac1 + Ac2, Bc1 + Bc2


ABms = ABms.reduceByKey(reduce_func)

ABms.collect()

[(2, (1, 0)),
 (4, (1, 0)),
 (6, (1, 0)),
 (8, (1, 0)),
 (10, (1, 1)),
 (12, (1, 1)),
 (14, (1, 1)),
 (16, (0, 1)),
 (18, (0, 1)),
 (20, (0, 1)),
 (1, (1, 1)),
 (3, (0, 1)),
 (5, (0, 1)),
 (7, (0, 1)),
 (9, (0, 1))]

In [141]:
def map_func(v):
    e, (a, b) = v
    return e, min(a, b)

IABms = ABms.map(map_func)

IABms.collect()


[(2, 0),
 (4, 0),
 (6, 0),
 (8, 0),
 (10, 1),
 (12, 1),
 (14, 1),
 (16, 0),
 (18, 0),
 (20, 0),
 (1, 1),
 (3, 0),
 (5, 0),
 (7, 0),
 (9, 0)]

In [143]:
def map_func(v):
    e, (a, b) = v
    return e, max(0, a - b)


DABms = ABms.map(map_func)

DABms.collect()


[(2, 1),
 (4, 1),
 (6, 1),
 (8, 1),
 (10, 0),
 (12, 0),
 (14, 0),
 (16, 0),
 (18, 0),
 (20, 0),
 (1, 0),
 (3, 0),
 (5, 0),
 (7, 0),
 (9, 0)]

In [144]:
def map_func(v):
    e, (a, b) = v
    return e, max(a, b) - min(a, b)


SABms = ABms.map(map_func)

SABms.collect()


[(2, 1),
 (4, 1),
 (6, 1),
 (8, 1),
 (10, 0),
 (12, 0),
 (14, 0),
 (16, 1),
 (18, 1),
 (20, 1),
 (1, 0),
 (3, 1),
 (5, 1),
 (7, 1),
 (9, 1)]