# Étape 3 - Chargement, pré-traitement

In [9]:
import org.apache.spark.sql.functions._


case class Car(
  rownames: Int,
  mpg: Double,
  cylinders: Int,
  displacement: Double,
  horsepower: Double,
  weight: Double,
  acceleration: Double,
  year: Int,
  origin: Int,
  name: String
)

val rdd = spark.sparkContext.textFile("./data-car.csv")

val header = rdd.first()
val dataRDD = rdd.filter(_ != header)

val carRDD = dataRDD.map(line => {
  val cols = line.split(",")
  Car(
    cols(0).toInt,
    cols(1).toDouble,
    cols(2).toInt,
    cols(3).toDouble,
    cols(4).toInt,
    cols(5).toDouble,
    cols(6).toDouble,
    cols(7).toInt,
    cols(8).toInt,
    cols(9)
  )
})


lastException = null
defined class Car
rdd = ./data-car.csv MapPartitionsRDD[82] at textFile at <console>:49
header = rownames,mpg,cylinders,displacement,horsepower,weight,acceleration,year,origin,name
dataRDD = MapPartitionsRDD[83] at filter at <console>:52
carRDD = MapPartitionsRDD[84] at map at <console>:54


MapPartitionsRDD[84] at map at <console>:54

# Séance 2

## Étape 1 - RDD vers un dataframe

In [10]:
val carDF = carRDD.toDF()

carDF.show(10)

carDF = [rownames: int, mpg: double ... 8 more fields]


+--------+----+---------+------------+----------+------+------------+----+------+--------------------+
|rownames| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+--------+----+---------+------------+----------+------+------------+----+------+--------------------+
|       1|18.0|        8|       307.0|     130.0|3504.0|        12.0|  70|     1|chevrolet chevell...|
|       2|15.0|        8|       350.0|     165.0|3693.0|        11.5|  70|     1|   buick skylark 320|
|       3|18.0|        8|       318.0|     150.0|3436.0|        11.0|  70|     1|  plymouth satellite|
|       4|16.0|        8|       304.0|     150.0|3433.0|        12.0|  70|     1|       amc rebel sst|
|       5|17.0|        8|       302.0|     140.0|3449.0|        10.5|  70|     1|         ford torino|
|       6|15.0|        8|       429.0|     198.0|4341.0|        10.0|  70|     1|    ford galaxie 500|
|       7|14.0|        8|       454.0|     220.0|4354.0|         9.0|  70

[rownames: int, mpg: double ... 8 more fields]

In [11]:
carDF.printSchema()


root
 |-- rownames: integer (nullable = false)
 |-- mpg: double (nullable = false)
 |-- cylinders: integer (nullable = false)
 |-- displacement: double (nullable = false)
 |-- horsepower: double (nullable = false)
 |-- weight: double (nullable = false)
 |-- acceleration: double (nullable = false)
 |-- year: integer (nullable = false)
 |-- origin: integer (nullable = false)
 |-- name: string (nullable = true)



In [12]:
carDF.summary().show()


+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|summary|          rownames|               mpg|        cylinders|      displacement|        horsepower|            weight|      acceleration|             year|            origin|                name|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+
|  count|               392|               392|              392|               392|               392|               392|               392|              392|               392|                 392|
|   mean| 198.5204081632653|23.445918367346938|5.471938775510204|194.41198979591837|104.46938775510205|2977.5841836734694|15.541326530612247| 75.9795918367347|1.5765306122448979|                NULL|


In [13]:
println(s"Nombre de lignes dans le DataFrame : ${carDF.count()}")


Nombre de lignes dans le DataFrame : 392


In [14]:

carDF.select("origin").distinct().show()
carDF.select(min("mpg"), max("mpg")).show()
carDF.select(min("horsepower"), max("horsepower")).show()


+------+
|origin|
+------+
|     1|
|     3|
|     2|
+------+

+--------+--------+
|min(mpg)|max(mpg)|
+--------+--------+
|     9.0|    46.6|
+--------+--------+

+---------------+---------------+
|min(horsepower)|max(horsepower)|
+---------------+---------------+
|           46.0|          230.0|
+---------------+---------------+



## Étape 2 - Extraction de dimensions

In [None]:
import org.apache.spark.sql.functions.monotonically_increasing_id

val dfOrigin = carDF
  .select("origin")
  .distinct()
  .withColumn("id_origin", monotonically_increasing_id())

  dfOrigin.show(3)



dfOrigin = [origin: int, id_origin: bigint]


+------+---------+
|origin|id_origin|
+------+---------+
|     1|        0|
|     3|        1|
|     2|        2|
+------+---------+



[origin: int, id_origin: bigint]

In [44]:
val carDF_withOriginID = carDF
  .join(dfOrigin, Seq("origin"))
  .drop("origin")

  carDF_withOriginID.show(3)


+--------+----+---------+------------+----------+------+------------+----+--------------------+---------+
|rownames| mpg|cylinders|displacement|horsepower|weight|acceleration|year|                name|id_origin|
+--------+----+---------+------------+----------+------+------------+----+--------------------+---------+
|       1|18.0|        8|       307.0|     130.0|3504.0|        12.0|  70|chevrolet chevell...|        0|
|       2|15.0|        8|       350.0|     165.0|3693.0|        11.5|  70|   buick skylark 320|        0|
|       3|18.0|        8|       318.0|     150.0|3436.0|        11.0|  70|  plymouth satellite|        0|
+--------+----+---------+------------+----------+------+------------+----+--------------------+---------+
only showing top 3 rows



carDF_withOriginID = [rownames: int, mpg: double ... 8 more fields]


[rownames: int, mpg: double ... 8 more fields]

In [45]:
val dfCylinders = carDF
  .select("cylinders")
  .distinct()
  .withColumn("id_cylinders", monotonically_increasing_id())

dfCylinders.show(3)


+---------+------------+
|cylinders|id_cylinders|
+---------+------------+
|        6|           0|
|        3|           1|
|        4|           2|
+---------+------------+
only showing top 3 rows



dfCylinders = [cylinders: int, id_cylinders: bigint]


[cylinders: int, id_cylinders: bigint]

In [51]:
val carDF_enriched = carDF_withOriginID
  .join(dfCylinders, Seq("cylinders"))
  .drop("cylinders")

carDF_enriched.show(20)


+--------+----+------------+----------+------+------------+----+--------------------+---------+------------+
|rownames| mpg|displacement|horsepower|weight|acceleration|year|                name|id_origin|id_cylinders|
+--------+----+------------+----------+------+------------+----+--------------------+---------+------------+
|       1|18.0|       307.0|     130.0|3504.0|        12.0|  70|chevrolet chevell...|        0|           3|
|       2|15.0|       350.0|     165.0|3693.0|        11.5|  70|   buick skylark 320|        0|           3|
|       3|18.0|       318.0|     150.0|3436.0|        11.0|  70|  plymouth satellite|        0|           3|
|       4|16.0|       304.0|     150.0|3433.0|        12.0|  70|       amc rebel sst|        0|           3|
|       5|17.0|       302.0|     140.0|3449.0|        10.5|  70|         ford torino|        0|           3|
|       6|15.0|       429.0|     198.0|4341.0|        10.0|  70|    ford galaxie 500|        0|           3|
|       7|14.0|    

carDF_enriched = [rownames: int, mpg: double ... 8 more fields]


[rownames: int, mpg: double ... 8 more fields]

## Étape 3 - Tables Hive, SQL

In [18]:
import org.apache.spark.sql.hive.HiveContext

val hc = new HiveContext(sc)

carDF_enriched.write.mode("overwrite").saveAsTable("cars")
dfOrigin.write.mode("overwrite").saveAsTable("dim_origin")
dfCylinders.write.mode("overwrite").saveAsTable("dim_cylinders")

hc.sql("SELECT name, horsepower FROM cars ORDER BY horsepower DESC LIMIT 10").show(3)
hc.sql("SELECT COUNT(*) FROM dim_origin").show()


+--------------------+----------+
|                name|horsepower|
+--------------------+----------+
|  pontiac grand prix|     230.0|
|    pontiac catalina|     225.0|
|buick estate wago...|     225.0|
+--------------------+----------+
only showing top 3 rows

+--------+
|count(1)|
+--------+
|       3|
+--------+



lastException = null
hc = org.apache.spark.sql.hive.HiveContext@753d0b5b




org.apache.spark.sql.hive.HiveContext@753d0b5b

# Séance 3

## Étape 1 - Agrégations

In [25]:
val agg1 = hc.sql("""
  SELECT 
    do.origin,
    CASE 
      WHEN do.origin = 1 THEN 'USA'
      WHEN do.origin = 2 THEN 'Europe'  
      WHEN do.origin = 3 THEN 'Japan'
    END as region,
    ROUND(AVG(c.mpg), 2) as mpg_moyen,
    COUNT(*) as nb_modeles,
    ROUND(MIN(c.mpg), 2) as mpg_min,
    ROUND(MAX(c.mpg), 2) as mpg_max
  FROM cars c
  JOIN dim_origin do ON c.id_origin = do.id_origin
  GROUP BY do.origin
  ORDER BY mpg_moyen DESC
""")
agg1.show()


+------+------+---------+----------+-------+-------+
|origin|region|mpg_moyen|nb_modeles|mpg_min|mpg_max|
+------+------+---------+----------+-------+-------+
|     3| Japan|    30.45|        79|   18.0|   46.6|
|     2|Europe|     27.6|        68|   16.2|   44.3|
|     1|   USA|    20.03|       245|    9.0|   39.0|
+------+------+---------+----------+-------+-------+



agg1 = [origin: int, region: string ... 4 more fields]


[origin: int, region: string ... 4 more fields]

In [26]:
val agg2 = hc.sql("""
  SELECT 
    do.origin,
    CASE 
      WHEN do.origin = 1 THEN 'USA'
      WHEN do.origin = 2 THEN 'Europe'  
      WHEN do.origin = 3 THEN 'Japan'
    END as region,
    ROUND(AVG(c.horsepower / c.weight * 1000), 3) as ratio_puissance_poids_moyen,
    ROUND(MAX(c.horsepower / c.weight * 1000), 3) as ratio_max,
    COUNT(*) as nb_vehicules
  FROM cars c
  JOIN dim_origin do ON c.id_origin = do.id_origin
  GROUP BY do.origin
  ORDER BY ratio_puissance_poids_moyen DESC
""")
agg2.show()


agg2 = [origin: int, region: string ... 3 more fields]


+------+------+---------------------------+---------+------------+
|origin|region|ratio_puissance_poids_moyen|ratio_max|nb_vehicules|
+------+------+---------------------------+---------+------------+
|     3| Japan|                     35.705|   45.361|          79|
|     1|   USA|                     34.968|    72.91|         245|
|     2|Europe|                     33.336|   50.582|          68|
+------+------+---------------------------+---------+------------+



[origin: int, region: string ... 3 more fields]

In [52]:
val agg3 = hc.sql("""
  SELECT 
    (year + 1900) as annee,
    ROUND(AVG(horsepower), 1) as puissance_moyenne,
    ROUND(AVG(mpg), 1) as mpg_moyen,
    COUNT(*) as nb_modeles,
    ROUND(MIN(horsepower), 0) as puissance_min,
    ROUND(MAX(horsepower), 0) as puissance_max
  FROM cars
  GROUP BY year
  ORDER BY year ASC
""")
agg3.show()


+-----+-----------------+---------+----------+-------------+-------------+
|annee|puissance_moyenne|mpg_moyen|nb_modeles|puissance_min|puissance_max|
+-----+-----------------+---------+----------+-------------+-------------+
| 1970|            147.8|     17.7|        29|         46.0|        225.0|
| 1971|            107.0|     21.1|        27|         60.0|        180.0|
| 1972|            120.2|     18.7|        28|         54.0|        208.0|
| 1973|            130.5|     17.1|        40|         46.0|        230.0|
| 1974|             94.2|     22.8|        26|         52.0|        150.0|
| 1975|            101.1|     20.3|        30|         53.0|        170.0|
| 1976|            101.1|     21.6|        34|         52.0|        180.0|
| 1977|            105.1|     23.4|        28|         58.0|        190.0|
| 1978|             99.7|     24.1|        36|         48.0|        165.0|
| 1979|            101.2|     25.1|        29|         65.0|        155.0|
| 1980|             77.5|

agg3 = [annee: int, puissance_moyenne: double ... 4 more fields]


[annee: int, puissance_moyenne: double ... 4 more fields]

In [53]:
val agg4 = hc.sql("""
  SELECT 
    dc.cylinders as nb_cylindres,
    ROUND(AVG(c.acceleration), 2) as acceleration_moyenne,
    ROUND(AVG(c.horsepower), 1) as puissance_moyenne,
    ROUND(AVG(c.mpg), 1) as mpg_moyen,
    COUNT(*) as nb_vehicules,
    ROUND(AVG(c.displacement), 1) as cylindree_moyenne
  FROM cars c
  JOIN dim_cylinders dc ON c.id_cylinders = dc.id_cylinders
  GROUP BY dc.cylinders
  ORDER BY dc.cylinders ASC
""")
agg4.show()


+------------+--------------------+-----------------+---------+------------+-----------------+
|nb_cylindres|acceleration_moyenne|puissance_moyenne|mpg_moyen|nb_vehicules|cylindree_moyenne|
+------------+--------------------+-----------------+---------+------------+-----------------+
|           3|               13.25|             99.3|     20.6|           4|             72.5|
|           4|               16.58|             78.3|     29.3|         199|            109.7|
|           5|               18.63|             82.3|     27.4|           3|            145.0|
|           6|               16.25|            101.5|     20.0|          83|            218.4|
|           8|               12.96|            158.3|     15.0|         103|            345.0|
+------------+--------------------+-----------------+---------+------------+-----------------+



agg4 = [nb_cylindres: int, acceleration_moyenne: double ... 4 more fields]


[nb_cylindres: int, acceleration_moyenne: double ... 4 more fields]

In [54]:
val agg5 = hc.sql("""
  SELECT 
    c.name as modele,
    (c.year + 1900) as annee,
    c.mpg as mpg,
    c.horsepower as puissance,
    ROUND(c.horsepower / c.weight * 1000, 3) as ratio_performance,
    CASE 
      WHEN do.origin = 1 THEN 'USA'
      WHEN do.origin = 2 THEN 'Europe'  
      WHEN do.origin = 3 THEN 'Japan'
    END as origine
  FROM cars c
  JOIN dim_origin do ON c.id_origin = do.id_origin
  ORDER BY c.mpg DESC
  LIMIT 10
""")
agg5.show()


agg5 = [modele: string, annee: int ... 4 more fields]


+--------------------+-----+----+---------+-----------------+-------+
|              modele|annee| mpg|puissance|ratio_performance|origine|
+--------------------+-----+----+---------+-----------------+-------+
|           mazda glc| 1980|46.6|     65.0|           30.806|  Japan|
| honda civic 1500 gl| 1980|44.6|     67.0|           36.216|  Japan|
|vw rabbit c (diesel)| 1980|44.3|     48.0|           23.022| Europe|
|           vw pickup| 1982|44.0|     52.0|           24.413| Europe|
|  vw dasher (diesel)| 1980|43.4|     48.0|           20.557| Europe|
|volkswagen rabbit...| 1978|43.1|     48.0|           24.181| Europe|
|           vw rabbit| 1980|41.5|     76.0|           35.448| Europe|
|          datsun 210| 1980|40.8|     65.0|           30.806|  Japan|
|      datsun b210 gx| 1978|39.4|     70.0|           33.816|  Japan|
|      toyota starlet| 1981|39.1|     58.0|           33.048|  Japan|
+--------------------+-----+----+---------+-----------------+-------+



[modele: string, annee: int ... 4 more fields]

In [30]:
val agg6 = hc.sql("""
  SELECT 
    CASE 
      WHEN do.origin = 1 THEN 'USA'
      WHEN do.origin = 2 THEN 'Europe'  
      WHEN do.origin = 3 THEN 'Japan'
    END as region,
    CASE 
      WHEN year BETWEEN 70 AND 74 THEN '1970-1974'
      WHEN year BETWEEN 75 AND 79 THEN '1975-1979'
      WHEN year BETWEEN 80 AND 82 THEN '1980-1982'
    END as periode,
    ROUND(AVG(c.displacement), 1) as cylindree_moyenne,
    ROUND(MIN(c.displacement), 1) as cylindree_min,
    ROUND(MAX(c.displacement), 1) as cylindree_max,
    COUNT(*) as nb_modeles
  FROM cars c
  JOIN dim_origin do ON c.id_origin = do.id_origin
  GROUP BY do.origin, 
    CASE 
      WHEN year BETWEEN 70 AND 74 THEN '1970-1974'
      WHEN year BETWEEN 75 AND 79 THEN '1975-1979'
      WHEN year BETWEEN 80 AND 82 THEN '1980-1982'
    END
  ORDER BY do.origin, periode
""")
agg6.show()


+------+---------+-----------------+-------------+-------------+----------+
|region|  periode|cylindree_moyenne|cylindree_min|cylindree_max|nb_modeles|
+------+---------+-----------------+-------------+-------------+----------+
|   USA|1970-1974|            293.8|         90.0|        455.0|       102|
|   USA|1975-1979|            237.2|         85.0|        400.0|       105|
|   USA|1980-1982|            151.9|         86.0|        350.0|        38|
|Europe|1970-1974|            102.5|         68.0|        121.0|        27|
|Europe|1975-1979|            115.8|         79.0|        183.0|        28|
|Europe|1980-1982|            111.2|         89.0|        146.0|        13|
| Japan|1970-1974|             96.6|         70.0|        156.0|        21|
| Japan|1975-1979|            104.3|         78.0|        156.0|        24|
| Japan|1980-1982|            105.4|         70.0|        168.0|        34|
+------+---------+-----------------+-------------+-------------+----------+



agg6 = [region: string, periode: string ... 4 more fields]


[region: string, periode: string ... 4 more fields]

In [None]:
val agg7 = hc.sql("""
  SELECT 
    CASE 
     WH EN name LIKE '%chevrolet%' THEN 'Chevrolet'
      WHEN name LIKE '%ford%' THEN 'Ford'
      WHEN name LIKE '%toyota%' THEN 'Toyota'
      WHEN name LIKE '%plymouth%' THEN 'Plymouth'
      WHEN name LIKE '%dodge%' THEN 'Dodge'
      WHEN name LIKE '%amc%' THEN 'AMC'
      WHEN name LIKE '%pontiac%' THEN 'Pontiac'
      WHEN name LIKE '%buick%' THEN 'Buick'
      WHEN name LIKE '%volkswagen%' OR name LIKE '%vw%' THEN 'Volkswagen'
      WHEN name LIKE '%datsun%' THEN 'Datsun'
      ELSE 'Autres'
    END as constructeur,
    COUNT(*) as nb_modeles,
    ROUND(AVG(mpg), 1) as mpg_moyen,
    ROUND(AVG(horsepower), 1) as puissance_moyenne,
    (MIN(year) + 1900) as premiere_annee,
    (MAX(year) + 1900) as derniere_annee
  FROM cars
  GROUP BY 
    CASE 
      WHEN name LIKE '%chevrolet%' THEN 'Chevrolet'
      WHEN name LIKE '%ford%' THEN 'Ford'
      WHEN name LIKE '%toyota%' THEN 'Toyota'
      WHEN name LIKE '%plymouth%' THEN 'Plymouth'
      WHEN name LIKE '%dodge%' THEN 'Dodge'
      WHEN name LIKE '%amc%' THEN 'AMC'
      WHEN name LIKE '%pontiac%' THEN 'Pontiac'
      WHEN name LIKE '%buick%' THEN 'Buick'
      WHEN name LIKE '%volkswagen%' OR name LIKE '%vw%' THEN 'Volkswagen'
      WHEN name LIKE '%datsun%' THEN 'Datsun'
      ELSE 'Autres'
    END
  HAVING nb_modeles >= 3
  ORDER BY nb_modeles DESC
""")
agg7.show()


+------------+----------+---------+-----------------+--------------+--------------+
|constructeur|nb_modeles|mpg_moyen|puissance_moyenne|premiere_annee|derniere_annee|
+------------+----------+---------+-----------------+--------------+--------------+
|      Autres|       113|     25.3|             97.4|          1970|          1982|
|        Ford|        48|     19.5|            112.2|          1970|          1982|
|   Chevrolet|        43|     20.5|            112.9|          1970|          1982|
|    Plymouth|        31|     21.7|            111.4|          1970|          1982|
|       Dodge|        28|     22.1|            117.2|          1970|          1982|
|         AMC|        27|     18.1|            114.7|          1970|          1980|
|      Toyota|        25|     28.4|             83.4|          1970|          1982|
|      Datsun|        23|     31.1|             83.8|          1970|          1982|
|  Volkswagen|        21|     31.9|             64.2|          1970|        

agg7 = [constructeur: string, nb_modeles: bigint ... 4 more fields]


[constructeur: string, nb_modeles: bigint ... 4 more fields]

In [55]:
val agg8 = hc.sql("""
  SELECT 
    CASE 
      WHEN do.origin = 1 THEN 'USA'
      WHEN do.origin = 2 THEN 'Europe'  
      WHEN do.origin = 3 THEN 'Japan'
    END as region,
    dc.cylinders,
    COUNT(*) as nb_vehicules_haute_perf,
    ROUND(AVG(c.horsepower), 1) as puissance_moyenne,
    ROUND(AVG(c.mpg), 1) as mpg_moyen,
    ROUND(AVG(c.weight), 0) as poids_moyen
  FROM cars c
  JOIN dim_origin do ON c.id_origin = do.id_origin
  JOIN dim_cylinders dc ON c.id_cylinders = dc.id_cylinders
  WHERE c.horsepower > 180
  GROUP BY do.origin, dc.cylinders
  ORDER BY puissance_moyenne DESC
""")
agg8.show()


+------+---------+-----------------------+-----------------+---------+-----------+
|region|cylinders|nb_vehicules_haute_perf|puissance_moyenne|mpg_moyen|poids_moyen|
+------+---------+-----------------------+-----------------+---------+-----------+
|   USA|        8|                     17|            208.6|     12.9|     4398.0|
+------+---------+-----------------------+-----------------+---------+-----------+



agg8 = [region: string, cylinders: int ... 4 more fields]


[region: string, cylinders: int ... 4 more fields]