### Before you begin

To check whether the code you've written is correct, we'll again use automark. Please update the username in the cell below with your student number. 


### Troubleshooting

In the Spark tasks in this assignment, Spark might randomly crash if it doesn't have enough memory available. If you encounter issues like that when working on the tasks today, we suggest increasing the amount of memory available to the docker image to 4gb or more. If you are using docker-for-windows or docker-for-mac, you can easily increase it from the Whale 🐳 icon in the task bar, then go to Preferences -> Resources.

Note that passing a command line argument like `-m 4g` when running `docker run ...` does only work for limiting the memory to even lower values than the limit in your Docker Resource Preferences, and does not work for relaxing the limit that is set in these Preferences. For more information, please refer to the Docker [documentation](https://docs.docker.com/config/containers/resource_constraints/).

In [318]:
import automark as am
import math

# fill in you student number as your username
am.configure(username='13646168')

# to check your progress, you can run this function
am.get_progress()

----------------------------------------------------------------------
| Jung, David                                                        |
----------------------------------------------------------------------
| a0_t1_sailor_avg_experience                       | completed      |
| a0_t2_sailor_avg_experience_pyspark               | completed      |
| a1a_t1_all_model_number_b                         | completed      |
| a1a_t2_all_model_number_not_b                     | completed      |
| a1a_t3_avg_PCs_speed                              | completed      |
| a1a_t4_max_printer_price                          | completed      |
| a1a_t5_min_spending_visa                          | completed      |
| a1a_t6_customer_id                                | completed      |
| a1a_t7_highest_price_USD                          | completed      |
| a1b_t1a_at_least_four_products                    | completed      |
| a1b_t1b_decending_speed_laptop                    | completed      |
| a1b_

# Part A - MapReduce

## Our MapReduce Engine

Real-world MapReduce systems like [Apache Hadoop](https://hadoop.apache.org/) are tedious to install and debug, and require code in a programming language like Java. For the sake of simplicity and to allow for easy debugging, we work with our own local, non-parallel MapReduce engine written in Python. Note that your MapReduce programs (if written correctly) would also work in distributed mode on a real cluster.

In the following, we ask you to implement several mapreduce programs and run them via this simple MapReduce engine.

In [2]:
def mapreduce(input_partitions, f_map, f_reduce, num_reducers=2, print_debug_text=False):
    if print_debug_text: print("---Starting MAP-phase.---")
    # These arrays will hold the outputs of the map-phase for each input partition
    map_output_partitions = []
    # We are running the map-phase on each input partition now. In a real mapreduce system, this would run
    # in parallel on different machines.
    for counter, input_partition in enumerate(input_partitions):
        if print_debug_text: print(f"  Applying f_map to input partition {counter}")
        map_output = []
        # We apply f_map to each (key, value) pair in the input partition, and store the corresponding outputs
        for key, value in input_partition:
            if print_debug_text: print(f"    f_map({key}, {value}) -> ")
            for mapped_key, mapped_value in f_map(key, value):
                if print_debug_text: print(f"      ({mapped_key}, {mapped_value})")
                map_output.append((mapped_key, mapped_value))
        # Store output partition of this map operation        
        map_output_partitions.append(map_output)        
    
    # Next, we start the shuffle phase. We need to create several reducer partitions from the map outputs, 
    # assign each key to a partition and collect all the values for this key.
    if print_debug_text: print("\n---Starting SHUFFLE-phase.---")
    reduce_input_partitions = []
    # We create as many reducer partitions as specified by num_reducers
    for _ in range(0, num_reducers):
        reduce_input_partitions.append(dict())
        
    # We shuffle each map output partition now. In a real mapreduce system, this would run
    # in parallel on different machines.     
    for counter, map_output_partition in enumerate(map_output_partitions):
        if print_debug_text: print(f"  Shuffling map output input partition {counter}")    
        # We process each (key, value) pair from the map-output here
        for key, value in map_output_partition:
            # We determine the target partition for this key via hash-partitioning
            target_partition_index = abs(hash(key)) % num_reducers
            if print_debug_text: print(f"    Assigning key [{key}] to reducer input {target_partition_index} via hash-partitioning")            
            # We add the value to the group of the key in the target partition
            target_partition = reduce_input_partitions[target_partition_index]
            if not key in target_partition:
                target_partition[key] = []            
            target_partition[key].append(value)
    
    # Next, we run the reduce-phase
    if print_debug_text: print("\n---Starting REDUCE-phase.---")
    reduce_output_partitions = []
        
    # We reduce each reduce partition now. In a real mapreduce system, this would run
    # in parallel on different machines.            
    for counter, reduce_input_partition in enumerate(reduce_input_partitions):
        reduce_output = []
        if print_debug_text: print(f"  Applying f_reduce to reduce_input partition {counter}")    
        # We apply f_reduce to each key and its corresponding values now and collect the results
        for key, values in reduce_input_partition.items():
            if print_debug_text: print(f"    f_reduce({key}, {values}) -> ")
            for reduced_key, reduced_value in f_reduce(key, values):
                if print_debug_text: print(f"      ({reduced_key}, {reduced_value})")
                reduce_output.append((reduced_key, reduced_value))
        reduce_output_partitions.append(reduce_output)
        
    return reduce_output_partitions     

The following is a helper function for local testing, which checks the size of the result of a mapreduce job and tests where certain keys are present and have the expected value.

In [3]:
def verify_locally(mapreduce_result, expected_num_keys, expected_keys_and_values):
    print("\nRESULT VERIFICATION:")
    results_by_key = {}
    for partition in mapreduce_result:
        for key, value in partition:
            results_by_key[key] = value

    if len(results_by_key) != expected_num_keys:
        print(f"  ERROR: Expected {len(expected_keys_and_values)} keys in the output, but found {len(results_by_key)}!")

    for key_of_interest, expected_value in expected_keys_and_values:
        if key_of_interest not in results_by_key:
            print(f"  ERROR: Unable to find key [{key_of_interest}] in result!")
        else:
            observed_value = results_by_key[key_of_interest]
            if isinstance(expected_value, (float, int)):
                if not math.isclose(expected_value, observed_value, rel_tol=1e-5, abs_tol=1e-5):
                    print(f"  ERROR: Key [{key_of_interest}] has unexpected value [{observed_value}]!")
                else:
                    print(f"  PASS: Key [{key_of_interest}] has expected value!")
            else:
                if expected_value is not observed_value:
                    print(f"  ERROR: Key [{key_of_interest}] has unexpected value [{observed_value}]!")
                else:
                    print(f"  PASS: Key [{key_of_interest}] has expected value!")

## WordCount - the "Hello World" of MapReduce

Before we ask you to write your own MapReduce program, you should first investigate the example from our lecture. 

Our input are two partitions of key value pairs denoting documents, where the document is the key, and the value is text of the document. 

In [4]:
partitioned_documents = [
    # Input partition 0
    [
        (1, "the shawshank redemption"),
        (2, "the godfather")
    ],
    # Input partition 1
    [
        (3, "the godfather part two")
    ]
]

We want to count the number of occurrences of each word using mapreduce. The following code shows the corresponding functions `f_map` and `f_reduce`.  The `f_map` function split the text into words and emits a key value pair `(word, 1)` for each word it finds. The `f_reduce` function receives a word as key and its associated counts, and computes the total number of occurrences from that.

In [6]:
def wordcount_f_map(doc_id, text):   
    output_tuples = []
    for word in text.split(" "):
        output_tuples.append((word, 1))
    return output_tuples        

def wordcount_f_reduce(word, counts):
    total_count = 0
    for count in counts:
        total_count += count
    return [(word, total_count)]

Next, we will execute the program in the next cell. The following diagram shows the flow of data during the execution of our MapReduce program. After execution, look at the log outputs of our engine and compare them to the diagram. Can you see the correspondence?

![title](wordcount.png)

In [7]:
wordcount_result = mapreduce(partitioned_documents, wordcount_f_map, wordcount_f_reduce, print_debug_text=True)

---Starting MAP-phase.---
  Applying f_map to input partition 0
    f_map(1, the shawshank redemption) -> 
      (the, 1)
      (shawshank, 1)
      (redemption, 1)
    f_map(2, the godfather) -> 
      (the, 1)
      (godfather, 1)
  Applying f_map to input partition 1
    f_map(3, the godfather part two) -> 
      (the, 1)
      (godfather, 1)
      (part, 1)
      (two, 1)

---Starting SHUFFLE-phase.---
  Shuffling map output input partition 0
    Assigning key [the] to reducer input 1 via hash-partitioning
    Assigning key [shawshank] to reducer input 0 via hash-partitioning
    Assigning key [redemption] to reducer input 1 via hash-partitioning
    Assigning key [the] to reducer input 1 via hash-partitioning
    Assigning key [godfather] to reducer input 1 via hash-partitioning
  Shuffling map output input partition 1
    Assigning key [the] to reducer input 1 via hash-partitioning
    Assigning key [godfather] to reducer input 1 via hash-partitioning
    Assigning key [part] to 

Test your program by running the following cell, which checks whether your program produces the correct output for our example data.

In [8]:
verify_locally(wordcount_result, 6, [("the", 3), ("godfather", 2), ("two", 1)])


RESULT VERIFICATION:
  PASS: Key [the] has expected value!
  PASS: Key [godfather] has expected value!
  PASS: Key [two] has expected value!


## Task 1: Sensor Measurements

In the following, we ask you to write some mapreduce programs of your own. In the first task, we will work with sensor data. We have a couple of __sensors__ that have a name and produce some numerical value as measurement. Our data consists of several partitions, which contain key value pairs with the __name__ of the sensor as key and the __measurement__ as value. 

In [9]:
partitioned_sensor_measurements = [
    [
        ("Sensor1", 12.3),
        ("Sensor2", 9.3),
        ("Sensor3", 2.7),
        ("BrokenSensor", 1000.0),
    ],
    [
        ("Sensor1", 17.1),
        ("Sensor2", 7.3),
        ("Sensor3", -12),
    ],
    [
        ("Sensor1", 19.1),
        ("BrokenSensor", 5000.0),
        ("Sensor2", 3.3),
        ("Sensor3", 0.0),
    ],    
]

In [13]:
type(partitioned_sensor_measurements[0][0])

tuple

In [15]:
partitioned_sensor_measurements[0]

[('Sensor1', 12.3),
 ('Sensor2', 9.3),
 ('Sensor3', 2.7),
 ('BrokenSensor', 1000.0)]

In [29]:
test =  ('x', 19.1)

In [32]:
test[1] == 19.1

True

In [46]:
test1 = [('Sensor1', 12.3),
 ('Sensor2', 9.3),
 ('Sensor3', 2.7),
 ('BrokenSensor', 1000.0)]

In [50]:
'Sensor1' in test1

False

In [60]:
l = []
for a,b in enumerate(partitioned_sensor_measurements):
    print('a', a)
    print('b',b)
    for c,d in b:
        if c != 'BrokenSensor':
            print('c', c)
            print('d',d)
            l.append((c,d))

a 0
b [('Sensor1', 12.3), ('Sensor2', 9.3), ('Sensor3', 2.7), ('BrokenSensor', 1000.0)]
c Sensor1
d 12.3
c Sensor2
d 9.3
c Sensor3
d 2.7
a 1
b [('Sensor1', 17.1), ('Sensor2', 7.3), ('Sensor3', -12)]
c Sensor1
d 17.1
c Sensor2
d 7.3
c Sensor3
d -12
a 2
b [('Sensor1', 19.1), ('BrokenSensor', 5000.0), ('Sensor2', 3.3), ('Sensor3', 0.0)]
c Sensor1
d 19.1
c Sensor2
d 3.3
c Sensor3
d 0.0


In [61]:
l

[('Sensor1', 12.3),
 ('Sensor2', 9.3),
 ('Sensor3', 2.7),
 ('Sensor1', 17.1),
 ('Sensor2', 7.3),
 ('Sensor3', -12),
 ('Sensor1', 19.1),
 ('Sensor2', 3.3),
 ('Sensor3', 0.0)]

In [82]:
li = []
for i in l:
    

('Sensor1', 12.3)
('Sensor2', 9.3)
('Sensor3', 2.7)
('Sensor1', 17.1)
('Sensor2', 7.3)
('Sensor3', -12)
('Sensor1', 19.1)
('Sensor2', 3.3)
('Sensor3', 0.0)


In [90]:
bla = [12.3, 17.1, 19.1]

In [91]:
max(bla)

19.1

### 1(a) - Compute the maximum measurement per sensor (except for the broken sensor)

Time to write your first mapreduce program. Your task is to compute the maximum measurement value per sensor, for all sensors except the "BrokenSensor". Implement your code in the `max_measurement_per_sensor_f_map` and `max_measurement_per_sensor_f_reduce` functions (and change the return statement).

Please note that the result of your mapreduce program should be equivalent to running the following SQL query on the data (if it was in a database):

```
SELECT sensor, MAX(measurement)
FROM sensors
WHERE name != "BrokenSensor"
GROUP BY sensor
```

In [92]:
def a2_t1a_max_measurement_per_sensor():
    def max_measurement_per_sensor_f_map(sensor, measurement): 
        output = []
        if sensor != "BrokenSensor":
            output.append((sensor, measurement))
        return output

    
    def max_measurement_per_sensor_f_reduce(sensor, measurements):
        out = [(sensor, max(measurements))]
        return out
    
    return max_measurement_per_sensor_f_map, max_measurement_per_sensor_f_reduce

Execute your program by running the following cell: 

In [93]:
result = mapreduce(partitioned_sensor_measurements, *a2_t1a_max_measurement_per_sensor(), print_debug_text=True)

---Starting MAP-phase.---
  Applying f_map to input partition 0
    f_map(Sensor1, 12.3) -> 
      (Sensor1, 12.3)
    f_map(Sensor2, 9.3) -> 
      (Sensor2, 9.3)
    f_map(Sensor3, 2.7) -> 
      (Sensor3, 2.7)
    f_map(BrokenSensor, 1000.0) -> 
  Applying f_map to input partition 1
    f_map(Sensor1, 17.1) -> 
      (Sensor1, 17.1)
    f_map(Sensor2, 7.3) -> 
      (Sensor2, 7.3)
    f_map(Sensor3, -12) -> 
      (Sensor3, -12)
  Applying f_map to input partition 2
    f_map(Sensor1, 19.1) -> 
      (Sensor1, 19.1)
    f_map(BrokenSensor, 5000.0) -> 
    f_map(Sensor2, 3.3) -> 
      (Sensor2, 3.3)
    f_map(Sensor3, 0.0) -> 
      (Sensor3, 0.0)

---Starting SHUFFLE-phase.---
  Shuffling map output input partition 0
    Assigning key [Sensor1] to reducer input 1 via hash-partitioning
    Assigning key [Sensor2] to reducer input 1 via hash-partitioning
    Assigning key [Sensor3] to reducer input 1 via hash-partitioning
  Shuffling map output input partition 1
    Assigning key [Se

Test your program by running the following cell, which checks whether your program produces the correct output for our example data.

In [94]:
verify_locally(result, 3, [("Sensor1", 19.1), ("Sensor2", 9.3), ("Sensor3", 2.7)])


RESULT VERIFICATION:
  PASS: Key [Sensor1] has expected value!
  PASS: Key [Sensor2] has expected value!
  PASS: Key [Sensor3] has expected value!


In [95]:
am.test_student_function(a2_t1a_max_measurement_per_sensor)

Running local tests...
a2_t1a_max_measurement_per_sensor successfully passed local tests
Running remote test...
Test was successful. Congratulations!


### 1(b) - Compute the globally maximal measurement

In the next task, you have to compute the globally maximal measurement value from all sensors (including the "BrokenSensor"). Implement your code in the `global_max_measurement_f_map` and `global_max_measurement_f_reduce` functions (and change the return statement).

Hint: use an artificial key "global" to produce the final output.

In [158]:
def a2_t1b_global_max_measurement():
    def global_max_measurement_f_map(sensor, measurement):
        return [('global', measurement)]
    
    def global_max_measurement_f_reduce(sensor, measurements):
        out = [('global', max(measurements))]
        return out
    
    return global_max_measurement_f_map, global_max_measurement_f_reduce

Execute your program by running the following cell: 

In [159]:
result = mapreduce(partitioned_sensor_measurements, *a2_t1b_global_max_measurement(), print_debug_text=True) 

---Starting MAP-phase.---
  Applying f_map to input partition 0
    f_map(Sensor1, 12.3) -> 
      (global, 12.3)
    f_map(Sensor2, 9.3) -> 
      (global, 9.3)
    f_map(Sensor3, 2.7) -> 
      (global, 2.7)
    f_map(BrokenSensor, 1000.0) -> 
      (global, 1000.0)
  Applying f_map to input partition 1
    f_map(Sensor1, 17.1) -> 
      (global, 17.1)
    f_map(Sensor2, 7.3) -> 
      (global, 7.3)
    f_map(Sensor3, -12) -> 
      (global, -12)
  Applying f_map to input partition 2
    f_map(Sensor1, 19.1) -> 
      (global, 19.1)
    f_map(BrokenSensor, 5000.0) -> 
      (global, 5000.0)
    f_map(Sensor2, 3.3) -> 
      (global, 3.3)
    f_map(Sensor3, 0.0) -> 
      (global, 0.0)

---Starting SHUFFLE-phase.---
  Shuffling map output input partition 0
    Assigning key [global] to reducer input 0 via hash-partitioning
    Assigning key [global] to reducer input 0 via hash-partitioning
    Assigning key [global] to reducer input 0 via hash-partitioning
    Assigning key [global] t

Test your program by running the following cell, which checks whether your program produces the correct output for our example data.

In [160]:
verify_locally(result, 1, [("global", 5000)])


RESULT VERIFICATION:
  PASS: Key [global] has expected value!


In [161]:
am.test_student_function(a2_t1b_global_max_measurement)

Running local tests...
a2_t1b_global_max_measurement successfully passed local tests
Running remote test...
Test was successful. Congratulations!


## Task 2 - Text Processing

In the next task, we implement some simple text processing program with mapreduce. Our input data is partitioned data about books, where each key value pair contains a __book_id__ as key and a __sentence__ as value.

In [164]:
partitioned_books = [
    [
        (1, "It was a bright cold day in April, and the clocks were striking thirteen."),
        (2, "Mr. Jones, of the Manor Farm, had locked the hen-houses for the night, but was too drunk to remember to shut the pop-holes. "),
        (3, "He was a tough-looking youth of twenty-five or six, with reddish-yellow hair and powerful shoulders.")
    ],
    [
        (1, "On each landing, opposite the lift-shaft, the poster with the enormous face gazed from the wall."),
        (2, "All the animals were now present except Moses, the tame raven, who slept on a perch behind the back door."),        
        (3, "This was in late December 1936, less than seven months ago as I write, and yet it is a period that has already receded into enormous distance.")
    ]
]

### Compute the number of commas per book.

Please write a program to count the number of commas occurring in the text of each book. Implement your code in the `commacount_f_map` and `commacount_f_reduce` functions (and change the return statement).

Please note that the result of your mapreduce program should be equivalent to running the following SQL query on the data (if it was in a database and there was a CHAR_COUNT function):

```
SELECT book_id, SUM(CHAR_COUNT(sentence, ','))
FROM book_sentences
GROUP BY book_id
```


In [273]:
import re
def a2_t2_commacount():
    def commacount_f_map(book_id, sentence):
        output_tuples = []
        for word in re.findall(r"[\w']+|[.,!?;]", sentence):
            if word == ',':
                output_tuples.append((book_id, 1))
        return output_tuples         


    def commacount_f_reduce(book_id, counts):
        return [(book_id, sum(counts))]

    return commacount_f_map, commacount_f_reduce

In [268]:
re.findall(r"[\w']+|[.,!?;]", 'IKF,,,,,IWr')

['IKF', ',', ',', ',', ',', ',', 'IWr']

In [274]:
import re
def a2_t2_commacount():
    def commacount_f_map(book_id, sentence):
        return [(book_id, sentence.count(','))]         


    def commacount_f_reduce(book_id, counts):
        total_count = 0
        for count in counts:
            total_count += count
        return [(book_id, total_count)]

    return commacount_f_map, commacount_f_reduce

Execute your program by running the following cell:

In [275]:
result = mapreduce(partitioned_books, *a2_t2_commacount(), print_debug_text=True)

---Starting MAP-phase.---
  Applying f_map to input partition 0
    f_map(1, It was a bright cold day in April, and the clocks were striking thirteen.) -> 
      (1, 1)
    f_map(2, Mr. Jones, of the Manor Farm, had locked the hen-houses for the night, but was too drunk to remember to shut the pop-holes. ) -> 
      (2, 3)
    f_map(3, He was a tough-looking youth of twenty-five or six, with reddish-yellow hair and powerful shoulders.) -> 
      (3, 1)
  Applying f_map to input partition 1
    f_map(1, On each landing, opposite the lift-shaft, the poster with the enormous face gazed from the wall.) -> 
      (1, 2)
    f_map(2, All the animals were now present except Moses, the tame raven, who slept on a perch behind the back door.) -> 
      (2, 2)
    f_map(3, This was in late December 1936, less than seven months ago as I write, and yet it is a period that has already receded into enormous distance.) -> 
      (3, 2)

---Starting SHUFFLE-phase.---
  Shuffling map output input partit

Test your program by running the following cell, which checks whether your program produces the correct output for our example data.

In [276]:
verify_locally(result, 3, [(2, 5), (1, 3), (3, 3)])


RESULT VERIFICATION:
  PASS: Key [2] has expected value!
  PASS: Key [1] has expected value!
  PASS: Key [3] has expected value!


In [277]:
am.test_student_function(a2_t2_commacount)

Running local tests...
a2_t2_commacount successfully passed local tests
Running remote test...
Test was successful. Congratulations!


### Task 3 - Movie visitor numbers

In the final mapreduce task of this assignment, we work with key value pairs about the visitor numbers of a movie for several weeks. Note that the data is a bit more messy than before. The key denotes the __name__ of the movie, but the value is a nested array of tuples, denoting the number of the __week__ and the __number of visitors__ in that week for a given movie.

In [280]:
partitioned_movie_stats = [
    [
        ("MovieA", [(1, 100), (2, 20), (3, "50")]),
        ("MovieC", [(1, 100), (2, "250"), (3, 100), (4, "120")]),        
        ("MovieB", [(1, 1000), (2, 250)]),

    ],
    [
        ("MovieA", [(4, 50), (5, "10"), (6, 0)]),
        ("MovieB", [(3, 0), (4, "260")]),  
        ("MovieC", [(5, "180")]),
    ]
]

### Maximum number of visitors in a week per movie

Please write a mapreduce program to compute the maximum number of visitors in a week per movie (but ignore the first week of each movie). Implement your code in the `max_weekly_visitors_f_map` and `max_weekly_visitors_f_reduce` functions (and change the return statement).

In [313]:
def a2_t3_max_weekly_visitors():
    def max_weekly_visitors_f_map(movie, weekly_stats):   
        l = []
        for i in weekly_stats:
            if i[0] != 1:
                l.append(int(i[1]))
        return [(movie, max(l))]

    def max_weekly_visitors_f_reduce(movie, numbers):
        return [(movie, max(numbers))]
    
    return max_weekly_visitors_f_map, max_weekly_visitors_f_reduce

Execute your program by running the following cell:

In [314]:
result = mapreduce(partitioned_movie_stats, *a2_t3_max_weekly_visitors(), print_debug_text=True)

---Starting MAP-phase.---
  Applying f_map to input partition 0
    f_map(MovieA, [(1, 100), (2, 20), (3, '50')]) -> 
      (MovieA, 50)
    f_map(MovieC, [(1, 100), (2, '250'), (3, 100), (4, '120')]) -> 
      (MovieC, 250)
    f_map(MovieB, [(1, 1000), (2, 250)]) -> 
      (MovieB, 250)
  Applying f_map to input partition 1
    f_map(MovieA, [(4, 50), (5, '10'), (6, 0)]) -> 
      (MovieA, 50)
    f_map(MovieB, [(3, 0), (4, '260')]) -> 
      (MovieB, 260)
    f_map(MovieC, [(5, '180')]) -> 
      (MovieC, 180)

---Starting SHUFFLE-phase.---
  Shuffling map output input partition 0
    Assigning key [MovieA] to reducer input 0 via hash-partitioning
    Assigning key [MovieC] to reducer input 0 via hash-partitioning
    Assigning key [MovieB] to reducer input 1 via hash-partitioning
  Shuffling map output input partition 1
    Assigning key [MovieA] to reducer input 0 via hash-partitioning
    Assigning key [MovieB] to reducer input 1 via hash-partitioning
    Assigning key [MovieC] t

Test your program by running the following cell, which checks whether your program produces the correct output for our example data.

In [315]:
verify_locally(result, 3, [('MovieA', 50), ('MovieB', 260), ('MovieC', 250)])


RESULT VERIFICATION:
  PASS: Key [MovieA] has expected value!
  PASS: Key [MovieB] has expected value!
  PASS: Key [MovieC] has expected value!


In [317]:
am.test_student_function(a2_t3_max_weekly_visitors)

Running local tests...
a2_t3_max_weekly_visitors successfully passed local tests
Running remote test...
Test was successful. Congratulations!


# Part B - Spark

In the second part of the assignment, we revisit our problems but solve them with a real dataflow system now: [Apache Spark](https://spark.apache.org/docs/latest/api/python/index.html). Please note that we use its Python variant in local mode for simplicity and easy debugging. But your programs would also run on a real cluster!


But first, we define other helper function to test our program outputs locally

In [319]:
def verify_locally_from_rdd(result, expected_num_keys, expected_keys_and_values):
    verify_locally([result.collect()], expected_num_keys, expected_keys_and_values)

Spark has the concept of a session that allows us to move data into the cluster. We use it in local mode though.

Note that you can ignore the following warnings that might occur when executing the next cell:

```
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/spark-3.2.0-bin-hadoop3.2/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
```

and

```
 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
```


In [320]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/23 15:30:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark works on so-called RDDS (resilient distributed datasets). We revisit our "hello world" example and create an RDD of documents.

In [333]:
documents = spark.sparkContext.parallelize([
    (1, "the shawshank redemption"),
    (2, "the godfather"),
    (3, "the godfather part two")
], 2) 

In [358]:
documents

ParallelCollectionRDD[19] at readRDDFromFile at PythonRDD.scala:274

We write programs in Spark by RDDs into new RDDs. Similar to MapReduce we can write our own functions to apply to the data during these transformations. 

Be sure to consult the [overview over the PySpark RDD API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis) for the following tasks.


The following code shows our "Hello World" example in Spark. We apply a `flatMap` to transform an document into multiple word tuples (analogously to our f_map function in mapreduce) and use Spark's `reduceByKey` transformation to group and reduce the resulting output. Note that we only have to define how to aggregate (e.g., sum) the values here.

In [322]:
def extract_words(docid_and_title):
    text = docid_and_title[1] # must be a tuplet
    output_tuples = []
    for word in text.split(" "):
        output_tuples.append((word, 1))
    return output_tuples # list of tuplets       

wordcounts = documents\
    .flatMap(lambda docid_and_title: extract_words(docid_and_title))\
    .reduceByKey(lambda count1, count2: count1 + count2)

We can use the adapted test function to verify that the outputs of our program are correct.

In [359]:
verify_locally_from_rdd(wordcounts, 6, [("the", 3), ("godfather", 2), ("two", 1)])


RESULT VERIFICATION:
  PASS: Key [the] has expected value!
  PASS: Key [godfather] has expected value!
  PASS: Key [two] has expected value!


Spark RDD's have a built-in action `collect` which transforms them into a Python collection. In real-world usecases, this would load the data from the cluster into the memory of our program (and potentially cause a crash), but we can use it here for debugging and inspecting the outputs of our transformations.

Here is how the data looks after the flatMap.

In [324]:
documents\
    .flatMap(lambda docid_and_title: extract_words(docid_and_title))\
    .collect()

[('the', 1),
 ('shawshank', 1),
 ('redemption', 1),
 ('the', 1),
 ('godfather', 1),
 ('the', 1),
 ('godfather', 1),
 ('part', 1),
 ('two', 1)]

And this cell shows how the data looks like after the reduce:

In [325]:
documents\
    .flatMap(lambda docid_and_title: extract_words(docid_and_title))\
    .reduceByKey(lambda count1, count2: count1 + count2)\
    .collect()

[('godfather', 2),
 ('two', 1),
 ('the', 3),
 ('shawshank', 1),
 ('redemption', 1),
 ('part', 1)]

Note that we can often write Spark programs without having to explicitly define new functions, we can also use lambda functions and list comprehensions, like this:

In [326]:
wordcounts = documents\
    .flatMap(lambda docid_and_title: [(word, 1) for word in docid_and_title[1].split(" ")])\
    .reduceByKey(lambda count1, count2: count1 + count2)

In [327]:
verify_locally_from_rdd(wordcounts, 6, [("the", 3), ("godfather", 2), ("two", 1)])


RESULT VERIFICATION:
  PASS: Key [the] has expected value!
  PASS: Key [godfather] has expected value!
  PASS: Key [two] has expected value!


### Task 4 - Sensors again

Next, we revisit our sensors task. First we load the measurement data into an RDD.


In [360]:
sensor_measurements = spark.sparkContext.parallelize([
        ("Sensor1", 12.3),
        ("Sensor2", 9.3),
        ("Sensor3", 2.7),
        ("BrokenSensor", 1000.0),
        ("Sensor1", 17.1),
        ("Sensor2", 7.3),
        ("Sensor3", -12),
        ("Sensor1", 19.1),
        ("BrokenSensor", 5000.0),
        ("Sensor2", 3.3),
        ("Sensor3", 0.0)
], 2)

In [361]:
sensor_measurements

ParallelCollectionRDD[21] at readRDDFromFile at PythonRDD.scala:274

In [None]:
    def max_measurement_per_sensor_f_map(sensor, measurement): 
        output = []
        if sensor != "BrokenSensor":
            output.append((sensor, measurement))
        return output

    
    def max_measurement_per_sensor_f_reduce(sensor, measurements):
        out = [(sensor, max(measurements))]
        return out
    

### Task 4(a) - Maximum value per sensor

Your task is to compute an RDD containing the maximum measurement value per sensor, for all sensors except the "BrokenSensor". Write the corresponding code in the `compute_max_value_per_sensor` function (and change the return statement).

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `compute_max_value_per_sensor` function.


In [363]:
# This is just for local development, you can test different transformations here

def a2_t4a_compute_max_value_per_sensor(sensor_measurements):
    print(sensor_measurements)
    sensor = sensor_measurements[0]
    measurement = sensor_measurements[1]
    output = []
    if sensor != "BrokenSensor":
        output.append((sensor, max(measurement)))
    return output

In [367]:
sensor_measurements\
    .flatMap(lambda sensor_measurements: a2_t4a_compute_max_value_per_sensor(sensor_measurements))\
    .collect()

('Sensor1', 12.3)
22/02/23 16:23:06 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 16)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_55/3564516574.py", line 2, in <lambda>
  File "/tmp/ipykernel_55/1278843982.py", line 9, in a2_t4a_compute_max_value_per_sensor
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.a

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 16) (e52e45bb1fdc executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_55/3564516574.py", line 2, in <lambda>
  File "/tmp/ipykernel_55/1278843982.py", line 9, in a2_t4a_compute_max_value_per_sensor
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_55/3564516574.py", line 2, in <lambda>
  File "/tmp/ipykernel_55/1278843982.py", line 9, in a2_t4a_compute_max_value_per_sensor
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [365]:

sensor_measurements\
    .flatMap((lambda sensor_measurements: a2_t4a_compute_max_value_per_sensor(sensor_measurements))\
    .reduceByKey(lambda count1, count2: count1 + count2)

SyntaxError: unexpected EOF while parsing (3189857397.py, line 3)

In [350]:
def a2_t4a_compute_max_value_per_sensor(sensor_measurements):
    sensor = sensor_measurements[0]
    measurement = sensor_measurements[1]
    output = []
    if sensor != "BrokenSensor":
        return [(sensor, measurement)]
    #return output


    #return sensor_measurements

Execute the following cell to test your Spark program locally:

In [None]:
result = a2_t4a_compute_max_value_per_sensor(sensor_measurements)

verify_locally_from_rdd(result, 3, [("Sensor1", 19.1), ("Sensor2", 9.3), ("Sensor3", 2.7)])

In [None]:
am.test_student_function(a2_t4a_compute_max_value_per_sensor)

### Task 4(b) - Globally maximal sensor value

In the next task, you have to compute an RDD with the globally maximal measurement value from all sensors (including the "BrokenSensor"). Implement your code in the `compute_global_max_value` function (and change the return statement). Hint: use an artificial key "global" to produce the final output.

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `compute_global_max_value` function.

In [None]:
# This is just for local development, you can test different transformations here
sensor_measurements\
    .collect()

In [None]:
def a2_t4b_compute_global_max_value(sensor_measurements):
    # IMPLEMENT ME
    return sensor_measurements

Execute the following cell to test your Spark program locally:

In [None]:
result = a2_t4b_compute_global_max_value(sensor_measurements)
verify_locally_from_rdd(result, 1, [("global", 5000)])

In [None]:
am.test_student_function(a2_t4b_compute_global_max_value)

### Task 5 - Movie stats again

Next, we also revisit our movie stats task from before. First, we turn the data into an RDD:


In [None]:
movie_stats = spark.sparkContext.parallelize([
    ("MovieA", [(1, 100), (2, 20), (3, "50")]),
    ("MovieC", [(1, 100), (2, "250"), (3, 100), (4, "120")]),        
    ("MovieB", [(1, 1000), (2, 250)]),
    ("MovieA", [(4, 50), (5, "10"), (6, 0)]),
    ("MovieB", [(3, 0), (4, "260")]),  
    ("MovieC", [(5, "180")]),
])

Please write a Spark program to compute an RDD containing the maximum amount of visitors per week per movie (but ignore the first week of each movie). Implement your code in the `compute_max_weekly_visitors` function (and change the return statement).

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `compute_max_weekly_visitors` function.

In [None]:
# This is just for local development, you can test different transformations here
movie_stats\
    .collect()

In [None]:
def a2_t5_compute_max_weekly_visitors(movie_stats):
    # IMPLEMENT ME    
    return movie_stats

Execute the following cell to test your Spark program locally:

In [None]:
result = a2_t5_compute_max_weekly_visitors(movie_stats)
verify_locally_from_rdd(result, 3, [('MovieA', 50), ('MovieB', 260), ('MovieC', 250)])

In [None]:
am.test_student_function(a2_t5_compute_max_weekly_visitors)

# Part C - Spark SQL

In the final part of this assignment, we explore how Spark unifies relational processing and dataflow computation via [SparkSQL](https://spark.apache.org/docs/3.2.1/sql-programming-guide.html). Spark has a distributed version of the well-known dataframes and allows us to query them with SQL.

Be sure to consult the [overview over PySpark's dataframe API](https://spark.apache.org/docs/3.2.1/api/python/reference/pyspark.sql.html#dataframe-apis) for the following tasks.


Let us first define two dataframes. They might remind you of the tables in the previous assignment.

In [None]:
from pyspark.sql import Row

customers = spark.sparkContext.parallelize([
    Row(customer_id='1122334455', firstname='Ann', lastname='O\'Brien', city='Rotterdam'),
    Row(customer_id='1231231231', firstname='John', lastname='Doe', city='Amsterdam'), 
    Row(customer_id='1234567890', firstname='Paul', lastname='Murphy', city='Diemen'), 
    Row(customer_id='9876543210', firstname='Jack', lastname='Murphy', city='Utrecht'), 
    Row(customer_id='9999999999', firstname='Norah', lastname='Jones', city='Amsterdam')
], 2).toDF()  

sales = spark.sparkContext.parallelize([
    Row(customer_id='1122334455', model='2010', quantity=1, paid=2300, type_of_payment='mastercard credit'), 
    Row(customer_id='1122334455', model='3001', quantity=1, paid=99, type_of_payment='cash'), 
    Row(customer_id='1231231231', model='2002', quantity=2, paid=1898, type_of_payment='visa credit'), 
    Row(customer_id='1231231231', model='3002', quantity=1, paid=239, type_of_payment='cash'), 
    Row(customer_id='1234567890', model='1001', quantity=1, paid=1902, type_of_payment='mastercard credit'), 
    Row(customer_id='9876543210', model='1007', quantity=1, paid=510, type_of_payment='visa debit'), 
    Row(customer_id='9876543210', model='1007', quantity=3, paid=1530, type_of_payment='visa debit'), 
    Row(customer_id='9876543210', model='2002', quantity=1, paid=949, type_of_payment='visa debit'), 
    Row(customer_id='9999999999', model='1007', quantity=1, paid=459, type_of_payment='visa credit'), 
    Row(customer_id='9999999999', model='3007', quantity=2, paid=360, type_of_payment='visa credit')
], 2).toDF()     

Spark dataframes have a built-in method `show` which allows us to take a look at the first rows of the dataframe. We can use this to inspect our sales and customer data. It looks a lot like relational tables.

In [None]:
customers.show()

In [None]:
sales.show()

Now we can use the SparkSQL API to transform these dataframes with SQL. As an example, let's assume we are interested in the number of customers per city. In a relational database, we could query for this quantity like this:

```
SELECT city, COUNT(*)
FROM customers
GROUP BY city
```

Spark allows us to write a dataflow program that produces identical results to the query and also looks very much a like:

In [None]:
cities_with_counts = customers\
  .groupby("city")\
  .count()

We can again use `show` to inspect the result.

In [None]:
cities_with_counts.show()

Let's look at another example, what are the distinct types of payment in the sales data? In SQL, a corresponding query would look like this:

```
SELECT DISTINCT type_of_payment
FROM sales
```

Again, the corresponding Spark code is very similar:

In [None]:
payment_types = sales\
    .select(sales['type_of_payment'])\
    .distinct()

In [None]:
payment_types.show()

Before we give you more tasks, we first define another helper function to verify SparkSQL results

In [None]:
def verify_sql_result_locally(observed_dataframe, expected_rows):
    observed_rows = observed_dataframe.collect()
    if len(observed_rows) != len(expected_rows):
        print(f"ERROR: Expected {len(expected_rows)} rows in the result, but found {len(observed_rows)}!")
    
    for expected_row in expected_rows:
        if not expected_row in observed_rows:
            print(f"ERROR: Row {expected_row} missing from result!")
        else:    
            print(f"PASS: Encountered expected row {expected_row}")

### Task 6 - Filter customers

Write a SparkSQL program to find the distinct ids of all customers who made a visa credit payment of less than 1000 euros. Implement the `customers_with_certain_visa_payment` function, and make it return the correct Dataframe.

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `customers_with_certain_visa_payment` function.

In [None]:
# This is just for local development, you can test different transformations here
sales\
    .show()

In [None]:
def a2_t6_customers_with_certain_visa_payment(sales):
    # IMPLEMENT ME    
    return sales

Execute the following cell to test your Spark SQL program locally:

In [None]:
result = a2_t6_customers_with_certain_visa_payment(sales)
verify_sql_result_locally(result, [Row(customer_id='9999999999')])

In [None]:
am.test_student_function(a2_t6_customers_with_certain_visa_payment)

### Task 7 - Payment types in Amsterdam

Write a SparkSQL program to find the payment types used by customers in a given city. No type should occur more than once. Implement the `find_payment_types` function to return the correct Dataframe. You can use Spark's `join` function to join the two tables, you can find the documentation [here](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.sql.DataFrame.join.html?highlight=join#pyspark.sql.DataFrame.join).

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `find_payment_types` function.

In [None]:
# This is just for local development, you can test different transformations here
sales\
    .show()

In [None]:
def a2_t7_find_payment_types(sales, city, customers):
    # IMPLEMENT ME
    return sales

Execute the following cell to test your Spark SQL program locally:

In [None]:
result = a2_t7_find_payment_types(sales, "Amsterdam", customers)
verify_sql_result_locally(result, [Row(type_of_payment='visa credit'), Row(type_of_payment='cash')])

In [None]:
am.test_student_function(a2_t7_find_payment_types)

### Task 8 - Number of sales and money per city

Write a spark SQL program that computes the amount of money paid per city.  Implement the `quantities_per_city` function to return the correct Dataframe with the columns __city__ and __amount__. To find the aggregation function you need for this, you can look at the `.groupby()` [documentation](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html). Depending on how you do this, you may want to use Spark's `withColumnRenamed` function to rename columns where appropriate.

Note that you can use the next cell for locally trying out some Spark code, before you fill in your solution into the `quantities_per_city` function.

In [None]:
# This is just for local development, you can test different transformations here
sales\
    .show()

In [None]:
def a2_t8_quantities_per_city(sales, customers):
    # IMPLEMENT ME
    return sales

Execute the following cell to test your Spark SQL program locally:

In [None]:
result = a2_t8_quantities_per_city(sales, customers)

expected_rows = [
    Row(city='Diemen', amount=1902),
    Row(city='Rotterdam', amount=2399),
    Row(city='Amsterdam', amount=2956),
     Row(city='Utrecht', amount=2989)
]

verify_sql_result_locally(result, expected_rows)

In [None]:
am.test_student_function(a2_t8_quantities_per_city)