[back](./01-spark-overview.ipynb)

---
## `Spark - Key Functions`

- Map vs. FlatMap
- PairRDD
- Reduce by Key

### `Initial Setup`

In [1]:
%%writefile input.txt
hello world
another line
yet another line
yet another new line

Overwriting input.txt


In [2]:
import pyspark as ps

spark = ps.sql.SparkSession.builder.master(
    'local[*]').appName('spark-lecture').getOrCreate()

sc = spark.sparkContext


22/06/13 23:39:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### `Map vs. FlatMap`

Map is a function, meaning a transformation.

- Count the number of lines in the file

In [3]:
sc.textFile('input.txt').map(lambda x: x.split()).count()

                                                                                

4

- Split the lines in a file

In [4]:
sc.textFile('input.txt').map(lambda x: x.split()).collect()

[['hello', 'world'],
 ['another', 'line'],
 ['yet', 'another', 'line'],
 ['yet', 'another', 'new', 'line']]

- Read the file

In [5]:
rdd = sc.textFile('input.txt')
rdd.collect()

['hello world', 'another line', 'yet another line', 'yet another new line']

- FlatMap to split and file content

In [6]:
sc.textFile('input.txt').flatMap(lambda x: x.split()).collect()

['hello',
 'world',
 'another',
 'line',
 'yet',
 'another',
 'line',
 'yet',
 'another',
 'new',
 'line']

Map and FlatMap are both transformations, but Map can return a list of list, where as a FlatMap will return flattened list

### `PairRDD`

As of now, we have seen how to aggregate values across an RDD. If we have an RDD containing sales transactions, we can find the total revenue across all transactions.

Q. Using the following sales data, find the total revenue across all transactions.

In [7]:
%%writefile sales.txt
#ID   Date          Store   State   Product   Amount
101   11/13/2014    100     WA      331       300.00
104   11/18/2014    700     OR      329       450.00
102   11/15/2014    203     CA      321       200.00
106   11/19/2014    202     CA      331       330.00
103   11/17/2014    101     WA      373       750.00
105   11/19/2014    202     CA      321       200.00

Overwriting sales.txt


- Read the file

In [8]:
sc.textFile('sales.txt').take(2)

['#ID   Date          Store   State   Product   Amount',
 '101   11/13/2014    100     WA      331       300.00']

In [9]:
sc.textFile('sales.txt').top(2)

['106   11/19/2014    202     CA      331       330.00',
 '105   11/19/2014    202     CA      321       200.00']

- Split the contents in a lines

In [10]:
sc.textFile('sales.txt').map(lambda x: x.split()).take(2)

[['#ID', 'Date', 'Store', 'State', 'Product', 'Amount'],
 ['101', '11/13/2014', '100', 'WA', '331', '300.00']]

- Remove the #

In [11]:
sc.textFile('sales.txt') \
.map(lambda x: x.split()) \
.filter(lambda x: not x[0].startswith('#')) \
.take(2)

[['101', '11/13/2014', '100', 'WA', '331', '300.00'],
 ['104', '11/18/2014', '700', 'OR', '329', '450.00']]

- Pick the last field

In [12]:
sc.textFile('sales.txt') \
    .map(lambda x: x.split()) \
    .filter(lambda x: not x[0].startswith('#')) \
    .map(lambda x: x[-1]) \
    .take(2)

['300.00', '450.00']

- Convert to float and then sum

In [13]:
sc.textFile('sales.txt') \
    .map(lambda x: x.split()) \
    .filter(lambda x: not x[0].startswith('#')) \
    .map(lambda x: float(x[-1])) \
    .sum()

2230.0

### `Reduce By Key`

Q. Calculate revenue per state?

- Instead of creating a sequence of revenue numbers, we can create tuples of state and revenues.

In [14]:
sc.textFile('sales.txt')\
  .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
      .collect()

[('WA', 300.0),
 ('OR', 450.0),
 ('CA', 200.0),
 ('CA', 330.0),
 ('WA', 750.0),
 ('CA', 200.0)]

- Now, we can use reduceByKey to add them up

In [15]:
sc.textFile('sales.txt')\
  .map(lambda x: x.split())\
    .filter(lambda x: not x[0].startswith('#'))\
    .map(lambda x: (x[-3], float(x[-1])))\
      .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
        .collect()

[('CA', 730.0), ('WA', 1050.0), ('OR', 450.0)]

- ReduceByMap is now treating the first element in the tuple as a **key**

Q. Find the state with the highest revenue.

- We can either use the action `top` or the transformation `sortBy`.

In [16]:
sc.textFile('sales.txt')\
  .map(lambda x: x.split())\
  .filter(lambda x: not x[0].startswith('#'))\
  .map(lambda x: (x[-3], float(x[-1])))\
  .reduceByKey(lambda amount1, amount2: amount1 + amount2)\
  .sortBy(lambda state_amount: state_amount[1], ascending = False)\
  .take(1)

[('WA', 1050.0)]

Q. What does `reduceByKey` do?

- `reduceByKey` only works on RDDs made up of 2-tuples.
- `reduceByKey` works as both a reducer and a combiner.
- It requires that the operation is associative

#### `Word Count`

Q. Implement word count in Spark

- Create some input

In [17]:
%%writefile wc.txt
hello world
another line
this is another line
this is the last line

Overwriting wc.txt


- Count the words

In [18]:
sc.textFile('wc.txt')\
  .flatMap(lambda x: x.split())\
  .map(lambda word: (word, 1))\
  .reduceByKey(lambda count1, count2: count1 + count2)\
  .collect()

[('world', 1),
 ('line', 3),
 ('this', 2),
 ('is', 2),
 ('last', 1),
 ('hello', 1),
 ('another', 2),
 ('the', 1)]

### `Conclusion`

In [19]:
sc.stop()



---
[next]()