# Une fonction pour simplifier l'accès aux données

In [5]:
def extractField(s: String, fieldNumber: Int): String = {
    val fields = s.split(';')
    if (fieldNumber >= fields.length) "" else fields(fieldNumber)
}

extractField: (s: String, fieldNumber: Int)String


In [39]:
def extractField1(s: String, fieldNumber: Int): String = {
    val fields = s.split(',')
    if (fieldNumber >= fields.length) "" else fields(fieldNumber)
}

extractField1: (s: String, fieldNumber: Int)String


In [6]:
println(extractField("2;CASSIOPEE;2009;33;3", 0))
println(extractField("2;CASSIOPEE;2009;33;3", 1))
println(extractField("2;CASSIOPEE;2009;33;3", 2))
println(extractField("2;CASSIOPEE;2009;33;3", 3))
println(extractField("2;CASSIOPEE;2009;33;3", 4))
println(extractField("2;CASSIOPEE;2009;33;3", 5))

2
CASSIOPEE
2009
33
3



# Charger les données
1. Créer le RDD `lignes` à partir du répertoire `prenoms_sample.txt`

In [53]:
val lignes = sc.textFile("prenoms_sample.txt")
lignes.take(10).foreach(println)

2;MELODIE;1988;75;33
2;MELODIE;1992;83;6
2;MELODIE;1999;29;4
2;MELODIE;2009;21;3
2;MÉLODY;1998;17;3
2;MIA;2008;60;3
2;MIA;2013;54;10
2;MICHÈLE;1954;21;40
2;MICHÈLE;1971;08;6
2;MICHELINE;1926;32;4


lignes: org.apache.spark.rdd.RDD[String] = prenoms_sample.txt MapPartitionsRDD[99] at textFile at <console>:27


# Transformer les lignes en prénoms
1. En appliquant la méthode `map`, créer le RDD `prenoms` à partir de `lignes`

In [7]:
val prenoms = lignes.map(l => (
    extractField(l, 0).charAt(0),
    extractField(l, 1),
    extractField(l, 2).toInt,
    extractField(l, 3).toInt,
    extractField(l, 3).toDouble.toInt
))
prenoms.take(10).foreach(println)


(2,MELODIE,1988,75,75)
(2,MELODIE,1992,83,83)
(2,MELODIE,1999,29,29)
(2,MELODIE,2009,21,21)
(2,MÉLODY,1998,17,17)
(2,MIA,2008,60,60)
(2,MIA,2013,54,54)
(2,MICHÈLE,1954,21,21)
(2,MICHÈLE,1971,8,8)
(2,MICHELINE,1926,32,32)


prenoms: org.apache.spark.rdd.RDD[(Char, String, Int, Int, Int)] = MapPartitionsRDD[4] at map at <console>:28


# Interroger les données
La documentation des méthodes d'un RDD est disponible ([RDD](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [PairRDDFunctions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions)).

1. Rappeler ce que sont les *transformations* et les *actions*
1. Donner, pour chaque prénom, son nombre d'occurences (`map` et `reduceByKey`)

In [8]:
//une transformation retourne un nouvel RDD par transformation du RDD courant
//une action déclenche le calcul d’une valeur sur un RDD
val rdd1 = sc.textFile("prenoms.txt")
val rdd2 = rdd1.map(x => (extractField(x, 1),1))
val rdd3 = rdd2.reduceByKey((x,y) =>(x+y))
rdd3.take(10).foreach(println)




(JOVANE,1)
(SALAHDINE,3)
(JENNYFER,321)
(KONA,2)
(YUSSRA,1)
(NATIVA,1)
(MOHAMED-SAÏD,1)
(CAGLAR,2)
(HAFIZA,2)
(PAUL,10622)


rdd1: org.apache.spark.rdd.RDD[String] = prenoms.txt MapPartitionsRDD[6] at textFile at <console>:29
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:30
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:31


1. Donner le nombre total de naissances avec un prénom féminin (`filter`, `map`, `reduce` ou `sum`)

In [39]:
//TODO
                  
prenoms.filter(l=>l._1=='2').map(l=>l._5).reduce((x,y)=>x+y)



res20: Int = 208359


1. Donner l'effectif maximal et minimal par prénom (`map`, `aggregateByKey`)

In [60]:
//TODO
/*val resultat_effectif = prenoms.map(l=>(l._2,l._5)).aggregateByKey((Int.MaxValue,Int.MinValue))(
{case((m,n),c)=>(n min c, n max c)},{case((m,n),(m1,n1))=>(m min m1, n max n1)}
)*/


val resultat_effectif = lignes.map(l => (extractField(l, 1),extractField(l, 4).toDouble.toInt))
            .aggregateByKey((Int.MaxValue , Int.MinValue))(
               { case ((k,u),v) => (k min v, u max v)    
               },
                { case ((k,u),(k1, u1)) => (k min k1, u max u1)    
               },                
            )

resultat_effectif.take(10).foreach(println)

(BRICE,(5,16))
(LISANDRO,(3,3))
(WISSEM,(3,3))
(JOËLLE,(3,30))
(JENNYFER,(12,12))
(JOSUE,(10,13))
(NICOLLE,(4,7))
(DANIELE,(7,330))
(ANNE-SOPHIE,(7,12))
(EMMANUEL,(3,48))


resultat_effectif: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[101] at aggregateByKey at <console>:35


1. Sur le modèle des prénoms, charger les données des départements
1. Donner, pour chaque nom de département, le prénom le plus fréquent depuis l'année 2000

In [50]:
//TODO
val prenoms = sc.textFile("prenoms_sample.txt")
val departements = sc.textFile("dpts.txt")

val class_a = prenoms.map( z => (extractField(z, 3).toInt,extractField(z, 1),extractField(z, 4)))

val class_b = departements.map(z => (extractField1(z, 0),extractField1(z, 6)))

//val v_join = class_a.join(class_b)
val v_join = class_a.join(class_b, class_a("dpt") === class_b("dep"))

//class_b.take(10).foreach(println)
v_join.take(10).foreach(println)




<console>: 43: error: value join is not a member of org.apache.spark.rdd.RDD[(Int, String, String)]

In [18]:
//TODO

In [None]:
val class_b = departements.map(z => (extractField(z, 1)))
val class_a = d.map(z => (extractField(z, 1)))

val v_join = class_a.join(class_b).map({case(w,n)=>(n,w)}).sortByKey().top(1)