# ITP4869
## Lecture 10 (RDD Programming) - Code Examples

In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName='Start')
sc

### Creating RDDs

In [2]:
lines = sc.parallelize(['spark', 'rdd', 'python'])
lines.collect()

['spark', 'rdd', 'python']

In [3]:
data = sc.textFile('data.txt')
words = data.flatMap(lambda line: line.split(' '))
words.first()

'Hadoop'

### RDD Operations

In [4]:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(lines.collect())
print(lineLengths.collect())
print('Total length is', totalLength)

['Hadoop is an elegant fellow.', 'An elephant gentle and mellow.', 'He never gets mad,', 'Or does anything bad,', 'Because, at his core, he is yellow.']
[28, 30, 18, 21, 35]
Total length is 132


### Transformations

In [5]:
rdd = sc.parallelize(range(1,6))
result = rdd.map(lambda x: x+2)
result.collect()

[3, 4, 5, 6, 7]

In [6]:
rdd = sc.textFile("data.txt")
lines = rdd.flatMap(lambda lines: lines.split(' '))
lines.collect()

['Hadoop',
 'is',
 'an',
 'elegant',
 'fellow.',
 'An',
 'elephant',
 'gentle',
 'and',
 'mellow.',
 'He',
 'never',
 'gets',
 'mad,',
 'Or',
 'does',
 'anything',
 'bad,',
 'Because,',
 'at',
 'his',
 'core,',
 'he',
 'is',
 'yellow.']

In [7]:
rdd = sc.textFile("data.txt")
lines = rdd.map(lambda lines: lines.split(' '))
lines.collect()

[['Hadoop', 'is', 'an', 'elegant', 'fellow.'],
 ['An', 'elephant', 'gentle', 'and', 'mellow.'],
 ['He', 'never', 'gets', 'mad,'],
 ['Or', 'does', 'anything', 'bad,'],
 ['Because,', 'at', 'his', 'core,', 'he', 'is', 'yellow.']]

In [8]:
rdd = sc.parallelize(range(1,11))
filtered_nums = rdd.filter(lambda n: n%2 == 0)
filtered_nums.collect()

[2, 4, 6, 8, 10]

In [9]:
rdd1 = sc.parallelize(['Spark', 'Spark', 'Hadoop', 'Flink'])
rdd2 = sc.parallelize(['Big data', 'Spark', 'Flink'])
rdd3 = rdd1.union(rdd2)
rdd3.collect()

['Spark', 'Spark', 'Hadoop', 'Flink', 'Big data', 'Spark', 'Flink']

In [10]:
rdd1 = sc.parallelize(['Spark', 'Spark', 'Hadoop', 'Flink'])
rdd2 = sc.parallelize(['Big data', 'Spark', 'Flink'])
rdd3 = rdd1.intersection(rdd2)
rdd3.collect()

['Flink', 'Spark']

In [11]:
rdd1 = sc.parallelize(['Spark', 'Spark', 'Hadoop', 'Flink'])
rdd1.distinct().collect()

['Hadoop', 'Spark', 'Flink']

In [12]:
rdd1 = sc.parallelize([('a',1),('b',2),('c',3)])
rdd2 = sc.parallelize([('a',4),('a',6),('b',7),('c',3),('c',8)])
rdd1.join(rdd2).collect()

[('b', (2, 7)), ('c', (3, 3)), ('c', (3, 8)), ('a', (1, 4)), ('a', (1, 6))]

### Actions

In [13]:
rdd = sc.parallelize([1, 2, 2, 3, 4, 5, 5, 6])
print(rdd.count())

8


In [14]:
rdd = sc.parallelize([1, 2, 2, 3, 4, 5, 5, 6])
print(rdd.count())

8


In [15]:
rdd = sc.parallelize(range(10))
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [16]:
type(rdd.collect())

list

In [17]:
rdd = sc.parallelize([('k',5),('s',3),('s',4),('p',7),
('p',5),('t',8),('k',6)])
rdd.take(3)

[('k', 5), ('s', 3), ('s', 4)]

In [18]:
rdd = sc.parallelize([('k',5),('s',3),('s',4),('p',7),
('p',5),('t',8),('k',6)])
rdd.top(3)

[('t', 8), ('s', 4), ('s', 3)]

In [19]:
rdd = sc.parallelize([1, 2, 2, 3, 4, 5, 5, 6])
rdd.countByValue()

defaultdict(int, {1: 1, 2: 2, 3: 1, 4: 1, 5: 2, 6: 1})

In [20]:
rdd = sc.parallelize([20,32,45,62,8,5])
rdd.reduce(lambda n1,n2: n1+n2)

172

In [21]:
rdd = sc.parallelize([1,2,3,4,5])
identity = 0  # 0 for sum, 1 for multiplication
sum = rdd.fold(identity, lambda t1,t2: t1+t2)
sum

15

In [22]:
seqOp = lambda data, item: (data[0] + [item], data[1] + item)
combOp = lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])
rdd = sc.parallelize([1,2,3,4,5])
agg = rdd.aggregate(([], 0), seqOp, combOp)
agg

([1, 2, 3, 4, 5], 15)

In [23]:
%%time

names = []
prices = []
nb_in_stock = []
img_urls = []
categories = []
ratings = []

# Scrape data for every book URL
for url in booksURLs:
    soup = getAndParseURL(url)
    names.append(soup.find("div", class_ = re.compile("product_main"))\
                 .h1.text)
    # Get rid of the pound sign
    prices.append(soup.find("p", class_ = "price_color").text[2:])
    # Number of available products (Get rid of non numerical characters)
    nb_in_stock.append(re.sub("[^0-9]", "", \
                    soup.find("p", class_ = "instock availability").text))
    img_urls.append(url.replace("index.html", "") + soup.find("img").get("src"))
    categories.append(soup.find("a", href = re.compile("../category/books/"))\
                      .get("href").split("/")[3])
    ratings.append(soup.find("p", class_ = re.compile("star-rating"))\
                   .get("class")[1])

In [24]:
# Add data into a DataFrame
import pandas as pd

scraped_data = pd.DataFrame({'name': names, 'price': prices, \
                             'nb_in_stock': nb_in_stock, "url_img": img_urls, \
                             "product_category": categories, "rating": ratings})
scraped_data.head()

[1, 2, 3]

In [25]:
accum = sc.accumulator(0)
accum

Accumulator<id=0, value=0>

In [26]:
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

In [27]:
accum.value

10