# Wide Transformations

## Spark Set Up

In [1]:
## Imports
import re
import json
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

app_name = "week2_wt"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.ui.port","42229")\
        .getOrCreate()
sc = spark.sparkContext

## Change the working directory
%cd /media

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/20 17:55:24 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/04/20 17:55:24 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/04/20 17:55:24 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/04/20 17:55:24 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
22/04/20 17:55:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 42229. Attempting port 42230.
22/04/20 17:55:24 WARN org.apache.spark.util.Utils: Service 'SparkUI' could not bind on port 42230. Attempting port 42231.


/media


## Data Set Up

In [2]:
%%writefile data/grades.csv
10001,101,98
10001,102,87
10002,101,75
10002,102,55
10002,103,80
10003,102,45
10003,103,75
10004,101,90
10005,101,85
10005,103,60

Writing data/grades.csv


## Aggregations

### groupByKey()

You may be familiar with this concept from SQL. However, is not the preferred function to use and it's quite problematic. The reason? The executor needs to hold all values for any given key in memory before applying the aggregation function

In [9]:
## Let's see some examples
ALICE_TXT = 'file:///media' + "/data/alice.txt"
aliceRDD = sc.textFile(ALICE_TXT)

aliceRDD.flatMap(lambda line: re.findall('[a-z]+', line.lower())) \
                 .map(lambda word: (word, 1)) \
                 .groupByKey()\
                 .take(5)

[('project', <pyspark.resultiterable.ResultIterable at 0x7ff49880caf0>),
 ('gutenberg', <pyspark.resultiterable.ResultIterable at 0x7ff49880ce80>),
 ('ebook', <pyspark.resultiterable.ResultIterable at 0x7ff49880cb80>),
 ('of', <pyspark.resultiterable.ResultIterable at 0x7ff49880cf40>),
 ('s', <pyspark.resultiterable.ResultIterable at 0x7ff49880cfa0>)]

Here we see the main difference between `groupByKey` and `reduceByKey`, `groupByKey` returns an iterable (think of a list) of all the values for each key, no aggregation has happened! We need to use `mapValues`, which will perfomr the aggregation on the iterator. 

In [12]:
aliceRDD.flatMap(lambda line: re.findall('[a-z]+', line.lower())) \
                 .map(lambda word: (word, 1)) \
                 .groupByKey()\
                 .mapValues(sum)\
                 .take(5)

[('project', 88), ('gutenberg', 98), ('ebook', 13), ('of', 638), ('s', 222)]

### reduceByKey

Much more stable, and preferable approach, to do aggregations. The main performance difference is that the reduce happens in memory at each partition, before the shuffle. This reduces the number of records that have to be shuffled.

Main way to call it `reduceByKey(Func)` where the function can be a lambda function or any other

In [13]:
aliceRDD.flatMap(lambda line: re.findall('[a-z]+', line.lower())) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda a, b: a + b)\
                 .take(5)

[('project', 88), ('gutenberg', 98), ('ebook', 13), ('of', 638), ('s', 222)]

Finally, is important to understand that `reduceByKey` is a transformation and `reduce` is an action, thus the latter will force execution

### combineByKey

This add more flexibility to our aggregation. The way to call it is `combineByKey(createCombiner, mergeValue, mergeCombiners)`. Think something like adding values on a list and then merging the lists instead of adding them

In [30]:
aliceRDD.flatMap(lambda line: re.findall('[a-z]+', line.lower()))\
                 .map(lambda word: (word, 1))\
                 .combineByKey(lambda value: (value, 1),
                               lambda x, value: (x[0] + value, x[1] + 1),
                               lambda x, y: (x[0] + y[0], x[1] + y[1])
                           )\
                 .take(5)

[('project', (88, 88)),
 ('gutenberg', (98, 98)),
 ('ebook', (13, 13)),
 ('of', (638, 638)),
 ('s', (222, 222))]

### aggregateByKey

Finally, let's define `aggregateByKey(zeroValue, seqOp, combOp)`. The main difference is that it specifies a zero value when the key is a single value (so no reduce can take place)

In [31]:
## Let's assume we want to compute the letter string of a student
def LetterGrade(x):
    if x > 90.0:
        return "A"
    elif x > 80.0:
        return "B"
    elif x > 70.0:
        return "C"
    elif x > 60.0:
        return "D"
    else:
        return "F"
    
## Let's define an aggregation function
def getCounts(a,b):
    return (a[0] + b[0], a[1] + b[1], LetterGrade(a[0])+LetterGrade(b[0]))

## Let's prepare the data
def parse_grades(line):
    """Helper function to parse input & track failing grades."""
    student,course,grade = line.split(',')
    grade = int(grade)
    return(student,course, grade)

csv_path = 'file:///media' + "/data/grades.csv"
  
gradesRDD = sc.textFile(csv_path)\
              .map(lambda x: parse_grades(x))

## Let's compute the students GPA
studentAvgs = gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .reduceByKey(getCounts)\
                       .mapValues(lambda x: ((x[0]/x[1]),x[2]))\
                       .collect()

22/04/20 21:54:43 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 42.0 (TID 64)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2276, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "/tmp/ipykernel_5866/2702443486.py", line 33, in <lambda>
IndexError: tuple index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scal

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 42.0 failed 1 times, most recent failure: Lost task 0.0 in stage 42.0 (TID 64) (data-eng-m.c.triple-backbone-256714.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2276, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "/tmp/ipykernel_5866/2702443486.py", line 33, in <lambda>
IndexError: tuple index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2259)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2208)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2207)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2446)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2276, in <lambda>
    map_values_fn = lambda kv: (kv[0], f(kv[1]))
  File "/tmp/ipykernel_5866/2702443486.py", line 33, in <lambda>
IndexError: tuple index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Oh no! Why the error? Let's debug

In [32]:
gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
                       .collect()

[('10001', (98, 1)),
 ('10001', (87, 1)),
 ('10002', (75, 1)),
 ('10002', (55, 1)),
 ('10002', (80, 1)),
 ('10003', (45, 1)),
 ('10003', (75, 1)),
 ('10004', (90, 1)),
 ('10005', (85, 1)),
 ('10005', (60, 1))]

So far so good

In [33]:
gradesRDD.map(lambda x: (x[0], (x[2], 1)))\
        .reduceByKey(getCounts).collect()

[('10001', (185, 2, 'AB')),
 ('10004', (90, 1)),
 ('10002', (210, 3, 'AC')),
 ('10003', (120, 2, 'FC')),
 ('10005', (145, 2, 'BF'))]

Oh! The Reduce doesn't work if there is nothing to reduce! Student 10004 doesn't have a second grade, thus we don't to do any reduction b Key and Spark skips it. Let's solve this problem by adding a zero value

In [35]:
## Let's define the sequence to add. Because we will define a 0th value, there is no need to compute the grade of the initial sequence
def seqOp(a,b):
    return(a[0] + b[0], a[1] + b[1], a[2]+LetterGrade(b[2]))

def combOp(a,b):
    return (a+b);

## We will define our 0th value as (0,0,"") by defining the empty string, we can concatenate the next value even if the key has one value
gradesRDD.map(lambda x: (x[0], (x[2], 1, x[2])))\
             .aggregateByKey((0,0,""),seqOp,combOp)\
             .mapValues(lambda x: ((x[0]/x[1]),x[2]))\
             .collect()

[('10001', (92.5, 'AB')),
 ('10004', (90.0, 'B')),
 ('10002', (70.0, 'CFC')),
 ('10003', (45.0, 'F')),
 ('10005', (72.5, 'BF'))]