Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [None]:
NAME = ""
COLLABORATORS = ""

---

---
# First steps into Spark

In this notebook, we will launch our very first Spark code.

![Spark logo](http://spark.apache.org/images/spark-logo-trademark.png)

[Apache Spark](http://spark.apache.org/) is a cluster computing engine designed to be __fast__ and __general-purpose__, making it the ideal choice for processing of large datasets. It answers those two points with __efficient data sharing__ accross computations.
<hr/>
The past years have seen a major changes in computing systems, as growing data volumes required more and more applications to scale out to large clusters. To solve this problem, a wide range of new programming models have been designed to manage multiple types of computations in a distributed fashion, without having people learn too much about distributed systems. Those programming models would need to deal with _parallelism, fault-tolerance and resource sharing_ for us.

[Google's MapReduce](https://en.wikipedia.org/wiki/MapReduce) presented a simple and general model for batch processing, which handles faults and parallelism easily. Unfortunately the programming model is not adapted for other types of workloads, and multiple specialized systems were born to answer a specific need in a distributed way. 
* Iterative : Giraph
* Interactive : Impala, Piccolo, Greenplum
* Streaming : Storm, Millwheel

The initial goal of Apache Spark is to try and unify all of the workloads for generality purposes. [Matei Zaharia](https://cs.stanford.edu/~matei/) in his [PhD dissertation](https://www2.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf) suggests that most of the data flow models that required a specialized system needed _efficient data sharing_ accross computations:
* Iterative algorithms like PageRank or K-Means need to make multiple passes over the same dataset
* Interactive data mining often requires running multiple ad-hoc queries on the same subset of data
* Streaming applications need to maintain and share state over time.

He then proposes to create a new abstraction that gives its users direct control over data sharing, something that other specialized systems would have built-in for their specific needs. The abstraction is implemented inside a new engine that is today called Apache Spark. The engine makes it possible to support more types of computations than with the original MapReduce in a more efficient way, including interactive queries and stream processing. 

---
## Prerequisites

Before running Spark code, we need to start a SparkContext instance. The following block will be common to every notebook so you can run your code.

While your SparkContext is running, you can hit `http://localhost:4040` or `http://host.docker.internal:4040` to get an overview of your Spark local cluster and all operations ongoing.

In [9]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local')
sc = SparkContext.getOrCreate(conf=conf)
sc

In [10]:
# Import other important libraries

from pyspark.rdd import RDD

In [11]:
filePath = 'FL_insurance_sample.csv'

---
## Part A - Your first RDDs

In this chapter, we are going to introduce Spark's core abstraction for working with data in a distributed and resilient way : the <text style="color:red;">resilient distributed dataset</text>, or <text style="color:red;">RDD</text>. Under the hood, Spark automatically performs the distribution of RDDs and its processing around the cluster, so we can focus on our code and not on distributed processing problems, such as the handling of data locality or resiliency in case of node failure.

A RDD consists of a collection of elements partitioned accross the nodes of a cluster of machines that can be operated on in parallel. In Spark, work is expressed by the creation and transformation of RDDs using Spark operators.

<text style="color:red;">Note</text> : RDD is the core data structure to Spark, but the style of programming we are studying in this lesson is considered the _lowest-level API_ for Spark. The Spark community is pushing the use of Structured programming with Dataframes/Datasets instead, an optimized interface for working with structured and semi-structured data, which we will learn later. Understanding RDDs is still important because it teaches you how Spark works under the hood and will serve you to understand and optimize your application when deployed into production.

There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

## Question

Generate a RDD from a Python array with the `parallelize` method.

In [12]:

from pyspark import SparkContext

# Create SparkContext

In [13]:
sc.parallelize([1, 2, 3]).collect()

[1, 2, 3]

In [14]:
def rdd_from_list(sc, n):
    """
    Return a RDD consisting of elements from 1 to n. 
    For now we assume we will always get n > 1, no need to test for the exception nor raise an Exception.
    """
     # Create an RDD with elements from 1 to n
    
    
    return sc.parallelize(list(range(1,n+1)))

#print(rdd_from_list(sc,4))


In [28]:
"""
Graded cell

1 point
"""
# collect() method returns all elements in a RDD to the driver as a local list
print(rdd_from_list(sc, 10).collect())

result_rdd = rdd_from_list(sc, 3)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [1, 2, 3]

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


## Question

Generate a RDD from a file with the `textFile()` method.

In [16]:
def load_file_to_rdd(sc, path):
    """
    Create a RDD by loading an external file. We don't expect any formatting nor processing here.
    You don't need to raise an exception if the file does not exist.
    
    1 point
    """
    return sc.textFile(path)

In [9]:
"""
Graded cell

1 point
"""
result_rdd = load_file_to_rdd(sc, filePath)

assert isinstance(result_rdd, RDD)
assert result_rdd.take(1)[0] == 'policyID,statecode,county,eq_site_limit,hu_site_limit,fl_site_limit,fr_site_limit,tiv_2011,tiv_2012,eq_site_deductible,hu_site_deductible,fl_site_deductible,fr_site_deductible,point_latitude,point_longitude,line,construction,point_granularity'

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3) (PBDMOCD21322.ADLYON2.UNIV-LYON2.FR executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=3, Le chemin d’accès spécifié est introuvable
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:166)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: CreateProcess error=3, Le chemin d’accès spécifié est introuvable
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(ProcessImpl.java:453)
	at java.lang.ProcessImpl.start(ProcessImpl.java:139)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=3, Le chemin d’accès spécifié est introuvable
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:166)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.IOException: CreateProcess error=3, Le chemin d’accès spécifié est introuvable
	at java.lang.ProcessImpl.create(Native Method)
	at java.lang.ProcessImpl.<init>(ProcessImpl.java:453)
	at java.lang.ProcessImpl.start(ProcessImpl.java:139)
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
	... 15 more


---
## Part B - Classic Spark operations

### Operations

RDDs have two sets of parallel operations:

* transformations : which return pointers to new RDDs without computing them, it rather waits for an action to compute itself.
* actions : which return values to the driver after running the computation. The `collect()` funcion is an operation which retrieves all elements of the distributed RDD to the driver.

RDD transformations are _lazy_ in a sense they do not compute their results immediately.

The following exercises study the usage of the most common Spark RDD operations.

### .map() and flatMap() transformation

The `.map(function)` applies the function given in argument to each of the elements inside the RDD. 

The `.flatMap(function)` applies the function given in argument to each of the elements inside the RDD, then flattens the list so that there are no more nested elements inside it. 

# Question 1

Suppose we have a RDD containing only lists of 2 elements :

```
matrix = [[1,3], [2,5], [8,9]]
matrix_rdd = sc.parallelize(matrix)
```

This data structure is reminiscent of a matrix.

Create an operation `.op1()` which multiplies the first column (or first coordinate of each element) of the matrix by 2, and removes 3 to the second column (second coordinate).

In [17]:
sc.parallelize([[1,3], [2,9]]).map(lambda row: row[0]).collect()

[1, 2]

In [38]:
def op1(sc, mat):
    """
    Multiply the first coordinate by 2, remove 3 to the second
    """
    result_rdd = mat.map(lambda row:[row[0] * 2, row[1] - 3])
    
    return result_rdd

In [40]:
"""
Graded cell

1 point
"""
matrix = [[1,3], [2,5], [8,9]]
matrix_rdd = sc.parallelize(matrix)
result_rdd = op1(sc, matrix_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [[2, 0], [4, 2], [16, 6]]

# Question 2

Suppose we have a RDD containing sentences :

```
sentences_rdd = sc.parallelize(['Hi everybody', 'My name is Fanilo', 'and your name is Antoine everybody'])
```

Create an operation `.op2()` which returns all the words in the rdd, after splitting each sentence by the whitespace character.

In [41]:
def op2(sc, sentences):
    """
    Return all words contained in the sentences.    
    """
    return(sentences.flatMap(lambda sentence: sentence.split()))

In [42]:
"""
Graded cell

1 point
"""
sentences_rdd = sc.parallelize(['Hi everybody', 'My name is Fanilo', 'and your name is Antoine everybody'])
result_rdd = op2(sc, sentences_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == ['Hi', 'everybody', 'My', 'name', 'is', 'Fanilo', 'and', 'your', 'name', 'is', 'Antoine', 'everybody']

### .filter() transformation

The `.filter(function)` transformation let's us filter elements verify a certain function.

# Question 3

Suppose we have a RDD containing numbers.

Create an operation `.op3()` which returns all the odd numbers.

In [43]:
sc.parallelize(range(20)).filter(lambda num: num > 5).collect()

[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [44]:
def op3(sc, numbers):
    """
    Return all numbers contained in the RDD that are odd.    
    """
 
    return (numbers.filter(lambda num: num % 2 != 0))

In [45]:
"""
Graded cell

1 point
"""
numbers = [1,2,3,4,5,6,7,8,9]
numbers_rdd = sc.parallelize(numbers)
result_rdd = op3(sc, numbers_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [1,3,5,7,9]

### .reduce() operation

The `.reduce(function)` transformation reduces all elements of the RDD into one using a specific method.

Do take note that, as in the Hadoop ecosystem, the function used to reduce the dataset should be associative and commutative.

# Question 4

Suppose we have a RDD containing numbers.

Create an operation `.op4()` which returns the sum of all squared odd numbers in the RDD, using the `.reduce()` operation.

_Hint: now's a good time to tell you that chaining transformations is possible..._

In [46]:
sc.parallelize(range(4)).reduce(lambda x,y: x+y)

6

In [47]:
def op4(sc, numbers):
    """
    Return the sum of all squared odd numbers.   
    """
    result = (
        numbers
        .filter(lambda num: num % 2 != 0)  # Keep only odd numbers
        .map(lambda num: num ** 2)          # Square each odd number
        .reduce(lambda x, y: x + y)         # Sum the squared odd numbers
    )
    return result
   


In [48]:
"""
Graded cell

1 point
"""
numbers = range(100)
numbers_rdd = sc.parallelize(numbers)
result = op4(sc, numbers_rdd)

assert result == 166650

---
## Part C - Paired RDDs

If you recall the classic MapReduce paradigm, you were dealing with key/value pairs to reduce your data in a distributed manner. We define a pair as a tuple of two elements, the first element being the key and the second the value.

Key/value pairs are good for solving many problems efficiently in a parallel fashion so let us delve into them.

```
pairs = [('b', 3), ('d', 4), ('a', 6), ('f', 1), ('e', 2)]
pairs_rdd = sc.parallelize(pairs)
```

### reduceByKey

The `.reduceByKey()` method works in a similar way to the `.reduce()`, but it performs a reduction on a key-by-key basis.

# Question

Time for the classic Hello world question !

In [49]:
sc.parallelize(range(10)).map(lambda num: (num % 2, num)).reduceByKey(lambda x,y: x+y).collect()

[(0, 20), (1, 25)]

In [50]:
def wordcount(sc, sentences):
    """
    Given a RDD of sentences, return the wordcount, after splitting sentences per whitespace.
    """
    result_rdd = (
        sentences
        .flatMap(lambda sentence: sentence.split())
        .map(lambda word: (word, 1))
        .reduceByKey(lambda x, y: x + y)
    )
    
    return result_rdd

    
    

In [51]:
"""
Graded cell

4 points
"""
sentences_rdd = sc.parallelize(['Hi everybody', 'My name is Fanilo', 'and your name is Antoine everybody'])
result_rdd = wordcount(sc, sentences_rdd)

assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [
    ('Hi', 1),
    ('everybody', 2),
    ('My', 1),
    ('name', 2),
    ('is', 2),
    ('Fanilo', 1),
    ('and', 1),
    ('your', 1),
    ('Antoine', 1)
]

### join

The `.join()` method joins two RDD of pairs together on their key element.

# Question

Let's give ourselves a `student-gender` RDD and a `student-grade` RDD. Compute the mean grade for each gender.

_Hint: this is a long exercise. Remember that the mean for a gender equals the sum of all grades divided by the count of the number of grades. You already know how to sum by key, and you can use the `countByKey()` function for returning a hashmap of gender to count of grades, then use that hashmap inside a map function to divide. Good luck !_

In [52]:
genders_rdd = sc.parallelize([('1', 'M'), ('2', 'M'), ('3', 'F'), ('4', 'F'), ('5', 'F'), ('6', 'M')])
grades_rdd = sc.parallelize([('1', 5), ('2', 12), ('3', 7), ('4', 18), ('5', 9), ('6', 5)])

genders_rdd.join(grades_rdd).collect()


[('1', ('M', 5)),
 ('4', ('F', 18)),
 ('2', ('M', 12)),
 ('3', ('F', 7)),
 ('5', ('F', 9)),
 ('6', ('M', 5))]

In [53]:
def mean_grade_per_gender(sc, genders, grades):
    """
    Given a RDD of studentID to grades and studentID to gender, compute mean grade for each gender returned as paired RDD.
    Assume all studentIDs are present in both RDDs, making inner join possible, no need to check that.
   """
     # Join the RDDs on student ID
    joined_rdd = genders.join(grades)

    # Map to (gender, (grade, 1))
    grade_count_rdd = joined_rdd.map(lambda x: (x[1][0], (x[1][1], 1)))

    # Reduce by key to get (gender, (sum_grades, count_grades))
    sum_count_rdd = grade_count_rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

    # Map to (gender, mean_grade)
    result_rdd = sum_count_rdd.map(lambda x: (x[0], x[1][0] / x[1][1]))

    return result_rdd


In [54]:
"""
Graded cell

4 points
"""
genders_rdd = sc.parallelize([('1', 'M'), ('2', 'M'), ('3', 'F'), ('4', 'F'), ('5', 'F'), ('6', 'M')])
grades_rdd = sc.parallelize([('1', 5), ('2', 12), ('3', 7), ('4', 18), ('5', 9), ('6', 5)])

result_rdd = mean_grade_per_gender(sc, genders_rdd, grades_rdd)
assert isinstance(result_rdd, RDD)
assert result_rdd.collect() == [('M', 7.333333333333333), ('F', 11.333333333333334)]

---
## Part D - Operations on a file

We provide a `FL_insurance_sample.csv` file inside the `data` folder to use in our computations, it will be loaded through  `load_file_to_rdd()` you have previously implemented.

## Question

The first line of the CSV is the header, and it is annoying to have it mixed with the data. In the lower-level RDD API we need to write code to specifically filter that first line.

**Hint** : `rdd.zipwithindex()` is a useful function when you need to filter by position in a file _(though computationally expensive)_.

In [55]:
sc.parallelize(['a', 'b', 'c', 'd']).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

In [56]:
def filter_header(sc, rdd):
    """
    From the FL insurance RDD, remove the first line.
    """
    # YOUR CODE HERE
    #raise NotImplementedError()
    
    # Add index to each line using zipWithIndex
    rdd_with_index = rdd.zipWithIndex()

    # Filter out the first line based on index
    result_rdd = rdd_with_index.filter(lambda x: x[1] > 0)

    # Remove the index added by zipWithIndex
    #result_rdd = filtered_rdd.map(lambda x: x[0])

    return result_rdd


In [58]:
"""
Graded cell

2 points
"""
header = 'policyID,statecode,county,eq_site_limit,hu_site_limit,fl_site_limit,fr_site_limit,tiv_2011,tiv_2012,eq_site_deductible,hu_site_deductible,fl_site_deductible,fr_site_deductible,point_latitude,point_longitude,line,construction,point_granularity'
file = load_file_to_rdd(sc, filePath)
result_rdd = filter_header(sc, file)

assert isinstance(result_rdd, RDD)
assert file.filter(lambda line: line==header).collect()
assert not result_rdd.filter(lambda line: line == header).collect()

In the following questions, we will work on the file without it's header, it will be stored inside the `file_rdd` variable. You can reuse this variable in your tests.

## Question

Let's try some statistics on the `county` variable, which is the second column of the dataset.

In [59]:
file_rdd = filter_header(sc, load_file_to_rdd(sc, filePath))

In [64]:
print(file_rdd.collect())

[('119736,FL,CLAY COUNTY,498960,498960,498960,498960,498960,792148.9,0,9979.2,0,0,30.102261,-81.711777,Residential,Masonry,1', 1), ('448094,FL,CLAY COUNTY,1322376.3,1322376.3,1322376.3,1322376.3,1322376.3,1438163.57,0,0,0,0,30.063936,-81.707664,Residential,Masonry,3', 2), ('206893,FL,CLAY COUNTY,190724.4,190724.4,190724.4,190724.4,190724.4,192476.78,0,0,0,0,30.089579,-81.700455,Residential,Wood,1', 3), ('333743,FL,CLAY COUNTY,0,79520.76,0,0,79520.76,86854.48,0,0,0,0,30.063236,-81.707703,Residential,Wood,3', 4), ('172534,FL,CLAY COUNTY,0,254281.5,0,254281.5,254281.5,246144.49,0,0,0,0,30.060614,-81.702675,Residential,Wood,1', 5), ('785275,FL,CLAY COUNTY,0,515035.62,0,0,515035.62,884419.17,0,0,0,0,30.063236,-81.707703,Residential,Masonry,3', 6), ('995932,FL,CLAY COUNTY,0,19260000,0,0,19260000,20610000,0,0,0,0,30.102226,-81.713882,Commercial,Reinforced Concrete,1', 7), ('223488,FL,CLAY COUNTY,328500,328500,328500,328500,328500,348374.25,0,16425,0,0,30.102217,-81.707146,Residential,Wood,1',

In [60]:
def get_county(sc, rdd):
    """
    From the FL insurance RDD, return a RDD containing all of the county.
    We assume the csv is correctly formatted and every line has the correct number of elements.
    """
    # YOUR CODE HERE
    #raise NotImplementedError()
    # Utilisez la transformation .map() pour extraire la deuxième colonne (index 1) de chaque ligne
    #country_rdd = rdd.map(lambda country:(country.upper(),1)).reduceByKey(lambda x, y: x + y)
    return rdd.map(lambda x: x[0].split(",")[2])

    
def county_count(sc, rdd):
    """
    Return a RDD of key,value with county as key, count as values
    """
    
    # YOUR CODE HERE
    #raise NotImplementedError()
    # Utilisez la transformation .countByValue() pour obtenir le décompte de chaque valeur unique
   
    return rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)
    


In [61]:
"""
Graded cell

4 points
"""

# CAREFUL: some tests are invisible so don't try to output a dictionary with what looks like the correct answers :)
file_rdd = filter_header(sc, load_file_to_rdd(sc, filePath))
county_rdd = get_county(sc, file_rdd)

result = dict(county_count(sc, county_rdd).collect())
assert result.get('CLAY COUNTY') == 346

# Postrequisites

In [62]:
sc.stop()