In [1]:
sc = SparkContext.getOrCreate()

## total length of a file : reduce_by

In [2]:
text_file = sc.textFile("test1.txt")

In [3]:
text_file.collect()

['Apache Spark Examples',
 'These examples give a quick overview of the Spark API. ',
 'Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. You create a dataset from external data, then apply parallel operations to it. The building block of the Spark API is its RDD API. In the RDD API, there are two types of operations: transformations, which define a new dataset based on previous ones, and actions, which kick off a job to execute on a cluster. On top of Spark’s RDD API, high level APIs are provided, e.g. DataFrame API and Machine Learning API. These high level APIs provide a concise way to conduct certain data operations. In this page, we will show examples using RDD API as well as examples using high level APIs.']

In [4]:
text_file.count()  # number of rows

3

In [5]:
lineLengths = text_file.map(lambda s: len(s))

In [6]:
lineLengths.collect()

[21, 55, 686]

In [7]:
totalLength = lineLengths.reduce(lambda a, b: a + b)
totalLength

762

In [8]:
data1 = sc.parallelize([['a', 21],['b',31], ['a',22], ['a',23]])
data1.reduceByKey(lambda x,y:x+y).collect()

[('a', 66), ('b', 31)]

## filter  

In [9]:
text_file = sc.textFile("test2.txt")
lines = text_file.filter(lambda line : "line" in line)

In [10]:
lines.collect()

['This is a first line', 'This is a second line', 'This is the last line']

In [11]:
lines = text_file.filter(lambda line : "last" in line)
lines.collect()

['This is the last line']

In [12]:
lines.first()

'This is the last line'

## data manipulation

In [13]:
data = sc.textFile("test3.txt")
data.collect()

['Carlo,5,3,3,4',
 'Mokhtar,2,5,5,3',
 'Jacques,4,2,4,5',
 'Braden,5,3,2,5',
 'Chris,5,4,5,1']

In [14]:
data1 = data.map(lambda l: l.split(","))
data1.collect()

[['Carlo', '5', '3', '3', '4'],
 ['Mokhtar', '2', '5', '5', '3'],
 ['Jacques', '4', '2', '4', '5'],
 ['Braden', '5', '3', '2', '5'],
 ['Chris', '5', '4', '5', '1']]

In [15]:
data2 = data1.map(lambda item: (item[0], item[1]+item[2]+item[3]+item[4]))
data2.collect()

[('Carlo', '5334'),
 ('Mokhtar', '2553'),
 ('Jacques', '4245'),
 ('Braden', '5325'),
 ('Chris', '5451')]

In [16]:
data2 = data1.map(lambda item: (item[0], int(item[1])+int(item[2])+int(item[3])+int(item[4])))
data2.collect()

[('Carlo', 15),
 ('Mokhtar', 15),
 ('Jacques', 15),
 ('Braden', 15),
 ('Chris', 15)]

In [17]:
data3 = data2.map(lambda item: (item[0], item[1], item[1]/4))
data3.collect()

[('Carlo', 15, 3.75),
 ('Mokhtar', 15, 3.75),
 ('Jacques', 15, 3.75),
 ('Braden', 15, 3.75),
 ('Chris', 15, 3.75)]

# mapvalues 

In [18]:
inputrdd = sc.parallelize([ ["maths", 50], ["maths", 60], ["english", 65],  ["english", 85]])
inputrdd.collect()

[['maths', 50], ['maths', 60], ['english', 65], ['english', 85]]

In [19]:
mapped = inputrdd.mapValues(lambda mark : (mark, 1))
mapped.collect()

[('maths', (50, 1)),
 ('maths', (60, 1)),
 ('english', (65, 1)),
 ('english', (85, 1))]

In [20]:
mapped.reduceByKey(lambda x,y : (x,y)).collect()

[('english', ((65, 1), (85, 1))), ('maths', ((50, 1), (60, 1)))]

In [21]:
reduced = mapped.reduceByKey(lambda x, y : (x[0]+ y[0] , x[1]+ y[1]))
reduced.collect()

[('english', (150, 2)), ('maths', (110, 2))]

In [22]:
average = reduced.map(lambda  x : (x[0], x[1][0]/x[1][1]))
average.collect()

[('english', 75.0), ('maths', 55.0)]

# Page Rank
http://www.openkb.info/2016/03/understanding-pagerank-algorithm-in.html
https://eyeballs.tistory.com/70
<img src="pagerank1.png">

In [23]:
mapLink = sc.parallelize([ ["MapR","Baidu"],["MapR", "Blogger"],["Baidu", "MapR"], \
                         ["Blogger","Google"], ["Blogger", "Baidu"],["Google", "MapR"]])

In [24]:
links = mapLink.groupByKey()

In [25]:
links.collect()

[('Google', <pyspark.resultiterable.ResultIterable at 0x217efcc8a58>),
 ('MapR', <pyspark.resultiterable.ResultIterable at 0x217efcc8a20>),
 ('Baidu', <pyspark.resultiterable.ResultIterable at 0x217efcc8b00>),
 ('Blogger', <pyspark.resultiterable.ResultIterable at 0x217efcc8be0>)]

In [26]:
for k,v in links.collect():
    print(k),
    print(list(v))

Google
['MapR']
MapR
['Baidu', 'Blogger']
Baidu
['MapR']
Blogger
['Google', 'Baidu']


In [27]:
print(list((k, list(v)) for (k, v) in links.collect()))

[('Google', ['MapR']), ('MapR', ['Baidu', 'Blogger']), ('Baidu', ['MapR']), ('Blogger', ['Google', 'Baidu'])]


In [28]:
ranks = links.map(lambda pairs : (pairs[0],1))
ranks.collect() 

[('Google', 1), ('MapR', 1), ('Baidu', 1), ('Blogger', 1)]

In [44]:
data1 = sc.parallelize([['park', 1122],['kim', 2222]])
data2 = sc.parallelize([['park', 'abc@dddd'], ['kim','ccc@eeee'], ['choi', 'www@ffff']])
data1.join(data2).collect()

[('kim', (2222, 'ccc@eeee')), ('park', (1122, 'abc@dddd'))]

In [45]:
data1 = sc.parallelize([['a',1,'kim'],['b','choi',2]])
data2 = sc.parallelize([['a',['kim','c'],'choi'], ['b',3,4]])
data1.join(data2).collect()


[('b', ('choi', 3)), ('a', (1, ['kim', 'c']))]

In [30]:
cvalues  = links.join(ranks)
cvalues.collect()

[('Google', (<pyspark.resultiterable.ResultIterable at 0x217efcaaa58>, 1)),
 ('MapR', (<pyspark.resultiterable.ResultIterable at 0x217efcaa550>, 1)),
 ('Baidu', (<pyspark.resultiterable.ResultIterable at 0x217efcaa6a0>, 1)),
 ('Blogger', (<pyspark.resultiterable.ResultIterable at 0x217efcaaf28>, 1))]

In [31]:
cvalues  = links.join(ranks)
"""
for (k,v) in cvalues.collect():
    print(k),
    print(list(v[0])),
    print(v[1])
"""
print(list((k,  list(v)) for (k,v) in cvalues.collect()))
print(list((k,  (list(v[0]), v[1])) for (k,v) in cvalues.collect()))

[('Google', [<pyspark.resultiterable.ResultIterable object at 0x00000217EFCDA1D0>, 1]), ('MapR', [<pyspark.resultiterable.ResultIterable object at 0x00000217EFCDA9B0>, 1]), ('Baidu', [<pyspark.resultiterable.ResultIterable object at 0x00000217EFCDA278>, 1]), ('Blogger', [<pyspark.resultiterable.ResultIterable object at 0x00000217EFCDAE80>, 1])]
[('Google', (['MapR'], 1)), ('MapR', (['Baidu', 'Blogger'], 1)), ('Baidu', (['MapR'], 1)), ('Blogger', (['Google', 'Baidu'], 1))]


<img src="pagerank2.png">

In [32]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

In [33]:
a = computeContribs(['Baidu', 'Blogger'], 1)
for i in a :
    print(i)

('Baidu', 0.5)
('Blogger', 0.5)


In [34]:
a = computeContribs([1,2,3],1)
for i in a :
    print(i)

(1, 0.3333333333333333)
(2, 0.3333333333333333)
(3, 0.3333333333333333)


In [35]:
links.join(ranks).flatMap(lambda x: x).collect()

['Google',
 (<pyspark.resultiterable.ResultIterable at 0x217efcda898>, 1),
 'MapR',
 (<pyspark.resultiterable.ResultIterable at 0x217efcda278>, 1),
 'Baidu',
 (<pyspark.resultiterable.ResultIterable at 0x217efcda5c0>, 1),
 'Blogger',
 (<pyspark.resultiterable.ResultIterable at 0x217efcda710>, 1)]

In [36]:

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)
        
contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
contribs.collect()

[('MapR', 1.0),
 ('Baidu', 0.5),
 ('Blogger', 0.5),
 ('MapR', 1.0),
 ('Google', 0.5),
 ('Baidu', 0.5)]

[('Google', (['MapR'], 1)), ('MapR', (['Baidu', 'Blogger'], 1)), ('Baidu', (['MapR'], 1)), ('Blogger', (['Google', 'Baidu'], 1))]

In [38]:
def computeContribs1(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    result = []
    for url in urls:
        result.append( (url, rank / num_urls))
    return result
        
contribs1 = links.join(ranks).map(
            lambda url_urls_rank: computeContribs1(url_urls_rank[1][0], url_urls_rank[1][1]))
contribs1.collect()

[[('MapR', 1.0)],
 [('Baidu', 0.5), ('Blogger', 0.5)],
 [('MapR', 1.0)],
 [('Google', 0.5), ('Baidu', 0.5)]]

In [39]:
def computeContribs1(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    result = []
    for url in urls:
        result.append( (url, rank / num_urls))
    return result
        
contribs1 = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs1(url_urls_rank[1][0], url_urls_rank[1][1]))
contribs1.collect()

[('MapR', 1.0),
 ('Baidu', 0.5),
 ('Blogger', 0.5),
 ('MapR', 1.0),
 ('Google', 0.5),
 ('Baidu', 0.5)]

In [40]:
new_rank = contribs.reduceByKey(lambda x, y : x+y).collect()
print(new_rank)
new_rank1 = contribs.reduceByKey(lambda x, y : x+y).mapValues(lambda rank:0.15+0.85*rank).collect()
print(new_rank1)

[('Google', 0.5), ('MapR', 2.0), ('Baidu', 1.0), ('Blogger', 0.5)]
[('Google', 0.575), ('MapR', 1.8499999999999999), ('Baidu', 1.0), ('Blogger', 0.575)]


In [41]:
mapLink = sc.parallelize([ ["MapR","Baidu"],["MapR", "Blogger"],["Baidu", "MapR"], \
                         ["Blogger","Google"], ["Blogger", "Baidu"],["Google", "MapR"]])
links = mapLink.groupByKey()

ranks = links.map(lambda pairs : (pairs[0],1))

cvalues  = links.join(ranks)

def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)
        
contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))


new_rank = contribs.reduceByKey(lambda x, y : x+y).mapValues(lambda rank:0.15+0.85*rank).collect()
print(ranks.collect())
print(new_rank)

[('Google', 1), ('MapR', 1), ('Baidu', 1), ('Blogger', 1)]
[('Google', 0.575), ('MapR', 1.8499999999999999), ('Baidu', 1.0), ('Blogger', 0.575)]


In [43]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)
        
mapLink = sc.parallelize([ ["MapR","Baidu"],["MapR", "Blogger"],["Baidu", "MapR"], \
                         ["Blogger","Google"], ["Blogger", "Baidu"],["Google", "MapR"]])
links = mapLink.groupByKey()

ranks = links.map(lambda pairs : (pairs[0],1))
print("initial ranks:", ranks.collect())

for i in range(5):
    cvalues  = links.join(ranks)
    contribs = links.join(ranks).flatMap(
            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    ranks = contribs.reduceByKey(lambda x, y : x+y).mapValues(lambda rank:0.15+0.85*rank)
    print("ranks(",i,"):", ranks.collect())
    print()


initial ranks: [('Google', 1), ('MapR', 1), ('Baidu', 1), ('Blogger', 1)]
ranks( 0 ): [('Google', 0.575), ('MapR', 1.8499999999999999), ('Baidu', 1.0), ('Blogger', 0.575)]

ranks( 1 ): [('MapR', 1.4887499999999998), ('Blogger', 0.9362499999999999), ('Baidu', 1.1806249999999998), ('Google', 0.394375)]

ranks( 2 ): [('Google', 0.5479062499999999), ('MapR', 1.4887499999999996), ('Baidu', 1.1806249999999998), ('Blogger', 0.7827187499999999)]

ranks( 3 ): [('MapR', 1.6192515624999995), ('Baidu', 1.1153742187499998), ('Blogger', 0.7827187499999998), ('Google', 0.48265546875)]

ranks( 4 ): [('MapR', 1.5083252343749995), ('Google', 0.48265546874999987), ('Blogger', 0.8381819140624998), ('Baidu', 1.1708373828124996)]



http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/