In [None]:
//seance 1 etape 3
case class TitanicGroup(
  rownames: Int,
  pClass: String,
  sex: String,
  age: String,
  survived: String,
  freq: Int
)


defined class TitanicGroup


In [None]:
//seance 2 etape 1

val rawRdd = sc.textFile("Titanic.csv")

val header = rawRdd.first()
val dataRdd = rawRdd.filter(_ != header)

val titanicRdd = dataRdd.map(_.split(",")).map(cols => TitanicGroup(
  cols(0).toInt,
  cols(1),
  cols(2),
  cols(3),
  cols(4),
  cols(5).toInt
))

val titanicDf = titanicRdd.toDF()

titanicDf.show()        
titanicDf.printSchema()
println(s"Nombre total de lignes : ${titanicDf.count()}")
titanicDf.summary().show()

In [None]:
//seance 2 etape 2
import org.apache.spark.sql.functions.monotonically_increasing_id


val dfPClass = titanicDf
  .select("pClass")
  .distinct()
  .withColumn("id_pclass", monotonically_increasing_id)


val dfSex = titanicDf
  .select("sex")
  .distinct()
  .withColumn("id_sex", monotonically_increasing_id)

val dfAge = titanicDf
  .select("age")
  .distinct()
  .withColumn("id_age", monotonically_increasing_id)

val dfSurvived = titanicDf
  .select("survived")
  .distinct()
  .withColumn("id_survived", monotonically_increasing_id)

val titanicWithPClass = titanicDf
  .join(dfPClass, Seq("pClass"), "left")
  .drop("pClass")


val titanicWithSex = titanicWithPClass
  .join(dfSex, Seq("sex"), "left")
  .drop("sex")


val titanicWithAge = titanicWithSex
  .join(dfAge, Seq("age"), "left")
  .drop("age")


val titanicFinalDf = titanicWithAge
  .join(dfSurvived, Seq("survived"), "left")
  .drop("survived")



In [None]:
//seance 2 etape 3
import org.apache.spark.sql.hive.HiveContext

val hc = new HiveContext(sc)
dfPClass.write.mode("overwrite").saveAsTable("dim_pclass")
dfSex.write.mode("overwrite").saveAsTable("dim_sex")
dfAge.write.mode("overwrite").saveAsTable("dim_age")
dfSurvived.write.mode("overwrite").saveAsTable("dim_survived")
titanicFinalDf.write.mode("overwrite").saveAsTable("fact_titanic")

hc.sql("""
  SELECT s.label AS sexe, surv.label AS survie, COUNT(*) AS nb_passagers
  FROM fact_titanic f
  JOIN dim_sex s ON f.id_sex = s.id_sex
  JOIN dim_survived surv ON f.id_survived = surv.id_survived
  GROUP BY s.label, surv.label
""").show()

hc.sql("""
  SELECT c.label AS classe, a.label AS tranche_age, COUNT(*) AS nb
  FROM fact_titanic f
  JOIN dim_pclass c ON f.id_pclass = c.id_pclass
  JOIN dim_age a ON f.id_age = a.id_age
  GROUP BY c.label, a.label
""").show()


In [None]:
import org.apache.spark.sql.hive.HiveContext

val hc = new HiveContext(sc)

titanicDf.write.mode("overwrite").saveAsTable("titanic")

val survieParClasse = hc.sql("""
  SELECT pClass,
         ROUND(SUM(CASE WHEN survived = 'Yes' THEN freq ELSE 0 END) * 100.0 / SUM(freq), 2) AS survival_rate
  FROM titanic
  GROUP BY pClass
""")

val survieParSexe = hc.sql("""
  SELECT sex,
         ROUND(SUM(CASE WHEN survived = 'Yes' THEN freq ELSE 0 END) * 100.0 / SUM(freq), 2) AS survival_rate
  FROM titanic
  GROUP BY sex
""")

hc.sql("""
  SELECT pClass, sex, SUM(freq) AS total_passengers
  FROM titanic
  GROUP BY pClass, sex
""").show()

hc.sql("""
  SELECT age, SUM(CASE WHEN survived = 'Yes' THEN freq ELSE 0 END) AS survivors
  FROM titanic
  GROUP BY age
""").show()

val tauxMortaliteCroise = hc.sql("""
  SELECT pClass, sex, age,
         ROUND(SUM(CASE WHEN survived = 'No' THEN freq ELSE 0 END) * 100.0 / SUM(freq), 2) AS death_rate
  FROM titanic
  GROUP BY pClass, sex, age
""")

hc.sql("""
  SELECT survived, SUM(freq) AS total_passengers
  FROM titanic
  GROUP BY survived
""").show()


+------+------+----------------+
|pClass|   sex|total_passengers|
+------+------+----------------+
|   2nd|  Male|             179|
|   2nd|Female|             106|
|  Crew|Female|              23|
|  Crew|  Male|             862|
|   3rd|Female|             196|
|   1st|  Male|             180|
|   1st|Female|             145|
|   3rd|  Male|             510|
+------+------+----------------+

+-----+---------+
|  age|survivors|
+-----+---------+
|Adult|      654|
|Child|       57|
+-----+---------+

+--------+----------------+
|survived|total_passengers|
+--------+----------------+
|     Yes|             711|
|      No|            1490|
+--------+----------------+



lastException = null
hc = org.apache.spark.sql.hive.HiveContext@7c1167ac
survieParClasse = [pClass: string, survival_rate: decimal(27,2)]
survieParSexe = [sex: string, survival_rate: decimal(27,2)]
tauxMortaliteCroise = [pClass: string, sex: string ... 2 more fields]




[pClass: string, sex: string ... 2 more fields]

In [17]:
//seance 3 etape 2
survieParClasse.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/survival_rate_by_class")

  survieParSexe.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/survival_rate_by_sex")

  tauxMortaliteCroise.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/death_rate_class_sex_age")