# RDDとAPI

[reference](https://blog.serverworks.co.jp/introducing-pyspark-3)

# Import and Setting

In [1]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName('sample API').setMaster('local')

In [3]:
sc = SparkContext(conf=conf)

# 変換API

## map

In [6]:
data = [i+1 for i in range(5)]

In [12]:
data

[1, 2, 3, 4, 5]

In [7]:
rdd = sc.parallelize(data)

In [8]:
mapped_rdd = rdd.map(lambda x: x**2)

In [9]:
result = mapped_rdd.collect()

In [11]:
result

[1, 4, 9, 16, 25]

## mapValues

In [13]:
data = [
     ('Fruits', ['Apple', 'Banana', 'Melon']),
    ('Vegitables', ['Carrot', 'Tomato']),
    ('Meats', ['Chicken', 'Lamb', 'Pork', 'Beef'])
]

In [14]:
data

[('Fruits', ['Apple', 'Banana', 'Melon']),
 ('Vegitables', ['Carrot', 'Tomato']),
 ('Meats', ['Chicken', 'Lamb', 'Pork', 'Beef'])]

In [22]:
rdd = sc.parallelize(data)

In [23]:
grouped_rdd = rdd.mapValues(lambda x: len(x))

In [24]:
result = grouped_rdd.collect()

In [25]:
result

[('Fruits', 3), ('Vegitables', 2), ('Meats', 4)]

## groupByKey

In [26]:
data = [
    ('Fruits', 'Apple'),
    ('Vegitables', 'Carrot'),
    ('Meats', 'Chicken'),
    ('Fruits', 'Banana'),
    ('Vegitables', 'Tomato'),
    ('Meats', 'Pork'),
    ('etc', 'Rice')
]

In [27]:
data

[('Fruits', 'Apple'),
 ('Vegitables', 'Carrot'),
 ('Meats', 'Chicken'),
 ('Fruits', 'Banana'),
 ('Vegitables', 'Tomato'),
 ('Meats', 'Pork'),
 ('etc', 'Rice')]

In [28]:
rdd = sc.parallelize(data)

In [29]:
grouped_rdd = rdd.groupByKey().mapValues(list)

In [30]:
result = grouped_rdd.collect()

In [31]:
result

[('Fruits', ['Apple', 'Banana']),
 ('Vegitables', ['Carrot', 'Tomato']),
 ('Meats', ['Chicken', 'Pork']),
 ('etc', ['Rice'])]

## filter

In [38]:
data = [i+1 for i in range(5)]

In [39]:
data

[1, 2, 3, 4, 5]

In [40]:
rdd = sc.parallelize(data)

In [41]:
filtered_rdd = rdd.filter(lambda x: x > 3)

In [42]:
result = filtered_rdd.collect()

In [43]:
result

[4, 5]

## union

In [44]:
data1 = ['aa', 'bb', 'cc']
data2 = ['dd', 'ee', 'ff']

In [45]:
rdd1, rdd2 = sc.parallelize(data1), sc.parallelize(data2)

In [46]:
rdd1, rdd2 = sc.parallelize(data1), sc.parallelize(data2)

In [47]:
union_rdd = rdd1.union(rdd2)

In [48]:
result = union_rdd.collect()

In [49]:
result

['aa', 'bb', 'cc', 'dd', 'ee', 'ff']

# 実行API

## collect

In [50]:
data = [i + 1 for i in range(5)]

In [51]:
data

[1, 2, 3, 4, 5]

In [52]:
rdd = sc.parallelize(data)

In [54]:
result = rdd.collect()

In [55]:
result

[1, 2, 3, 4, 5]

## count

In [56]:
data = [i+1 for i in range(5)]

In [57]:
data

[1, 2, 3, 4, 5]

In [58]:
rdd = sc.parallelize(data)

In [59]:
result = rdd.count()

In [60]:
result

5

## top

In [61]:
data = [i+1 for i in range(5)]

In [62]:
rdd = sc.parallelize(data)

In [63]:
result = rdd.top(2)

In [64]:
result

[5, 4]

## reduce

In [65]:
data = [i+1 for i in range(5)]

In [69]:
data

[1, 2, 3, 4, 5]

In [66]:
rdd = sc.parallelize(data)

In [67]:
result = rdd.reduce(lambda x,y: x+y)

In [68]:
result

15

## saveAsTextFile

In [70]:
data = [i+1 for i in range(5)]

In [71]:
data

[1, 2, 3, 4, 5]

In [72]:
rdd = sc.parallelize(data)

In [73]:
filtered_rdd = rdd.filter(lambda x: x > 3)

In [74]:
filtered_rdd.saveAsTextFile('/home/jovyan/result')