-----------------------------
# <div style="text-align:center;">TP Spark 2 : notes perso</div>
-------------------------------

# <div style="text-align:center;">Début du projet et pre-processings</div>

## Chargement des données

L’ensemble des ressources nécessaires pour les prochaines questions se trouvent dans la documentation de Spark.

Chargez le fichier ***train_clean.csv*** dans un *DataFrame*. La première ligne du fichier donne le nom de chaque colonne (aka le header), on veut que cette ligne soit utilisée pour nommer les colonnes du dataFrame.

In [1]:
import org.apache.spark.sql.DataFrame

val df: DataFrame = spark
  .read
  .option("header", true) // utilise la première ligne du (des) fichier(s) comme header
  .option("inferSchema", "true") // pour inférer le type de chaque colonne (Int, String, etc.)
  .csv("/home/p5hngk/Downloads/GitHub/INF_729---Introduction_au_framework_Hadoop/cours-spark-telecom-master/data/train_clean.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.22:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1573675034240)
SparkSession available as 'spark'


import org.apache.spark.sql.DataFrame
df: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


Affichez le nombre de lignes et le nombre de colonnes dans le DataFrame :

In [2]:
println(s"Nombre de lignes : ${df.count}")
println(s"Nombre de colonnes : ${df.columns.length}")

Nombre de lignes : 108129
Nombre de colonnes : 14


Affichez un extrait du DataFrame sous forme de tableau :

In [3]:
df.show()

+--------------+--------------------+--------------------+-------+--------------------+---------------------+-------+--------+----------+----------------+----------+-----------+-------------+------------+
|    project_id|                name|                desc|   goal|            keywords|disable_communication|country|currency|  deadline|state_changed_at|created_at|launched_at|backers_count|final_status|
+--------------+--------------------+--------------------+-------+--------------------+---------------------+-------+--------+----------+----------------+----------+-----------+-------------+------------+
|kkst1451568084| drawing for dollars|I like drawing pi...|   20.0| drawing-for-dollars|                False|     US|     USD|1241333999|      1241334017|1240600507| 1240602723|            3|           1|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...|  300.0|sponsor-dereck-bl...|                False|     US|     USD|1242429000|      1242432018|1240960224| 1240975592|   

Affichez le schéma du DataFrame, à savoir le nom de chaque colonne avec son type :

In [4]:
df.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- state_changed_at: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- launched_at: string (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)



Assignez le type *Int* aux colonnes qui vous semblent contenir des entiers :

In [5]:
val dfCasted: DataFrame = df
  .withColumn("goal", $"goal".cast("Int"))
  .withColumn("deadline" , $"deadline".cast("Int"))
  .withColumn("state_changed_at", $"state_changed_at".cast("Int"))
  .withColumn("created_at", $"created_at".cast("Int"))
  .withColumn("launched_at", $"launched_at".cast("Int"))
  .withColumn("backers_count", $"backers_count".cast("Int"))
  .withColumn("final_status", $"final_status".cast("Int"))

dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)



dfCasted: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


## Cleaning

Certaines opérations sur les colonnes sont déjà implémentées dans Spark, mais il est souvent nécessaire de faire appel à des fonctions plus complexes. Dans ce cas on peut créer des *UDFs (User Defined Functions)* qui permettent d’implémenter de nouvelles opérations sur les colonnes. Voir la partie User Defined Functions du fichier `spark_notes.md` pour comprendre comment ça fonctionne.

Affichez une description statistique des colonnes de type *Int* :

In [6]:
dfCasted
  .select("goal", "backers_count", "final_status")
  .describe()
  .show

+-------+-----------------+-------------------+-------------------+
|summary|             goal|      backers_count|       final_status|
+-------+-----------------+-------------------+-------------------+
|  count|           107615|             108128|             108128|
|   mean|36839.03430748502|  6434187.413250962| 1052360.7834973366|
| stddev|974215.3015529736|9.324061726649426E7|3.776049940184165E7|
|    min|                0|                  0|                  0|
|    max|        100000000|         1430423170|         1428977971|
+-------+-----------------+-------------------+-------------------+



Observez les autres colonnes, posez-vous les bonnes questions : quel cleaning faire pour chaque colonne ? Y a-t-il des colonnes inutiles ? Comment traiter les valeurs manquantes ? A-t-on des données dupliquées ? Quelles sont les valeurs de mes colonnes ? Des répartitions intéressantes ? Des "fuites du futur" (vous entendrez souvent le terme *data leakage*) ??? Proposez des cleanings à faire sur les données : des `groupBy-count`, des `show`, des `dropDuplicates`, etc.

In [7]:
dfCasted.groupBy("disable_communication").count.orderBy($"count".desc).show(100)

+---------------------+------+
|disable_communication| count|
+---------------------+------+
|                False|107293|
|                 True|   322|
|               2500.0|     8|
|               1000.0|     7|
|               5000.0|     6|
|              10000.0|     5|
|               2000.0|     4|
|  The Artist s Pro...|     3|
|               8000.0|     3|
|              25000.0|     3|
|               3000.0|     3|
|               7500.0|     3|
|              15000.0|     3|
|                300.0|     2|
|               4000.0|     2|
|                500.0|     2|
|               5500.0|     2|
|              20000.0|     2|
| launching-chad-ma...|     1|
| whispers-of-never...|     1|
| book-when-life-gi...|     1|
| the-1st-internati...|     1|
| well-done-the-wel...|     1|
| pearl-a-short-unc...|     1|
| no-regrets-a-broo...|     1|
| chuck-lobstermani...|     1|
| help-life-the-uni...|     1|
| wild-river-out-be...|     1|
| precious-waters-r...|     1|
| everyt

In [8]:
dfCasted.groupBy("country").count.orderBy($"count".desc).show(100)

+--------------------+-----+
|             country|count|
+--------------------+-----+
|                  US|91545|
|                  GB| 8743|
|                  CA| 3733|
|                  AU| 1877|
|                  NL|  702|
|               False|  428|
|                  NZ|  354|
|                  SE|  240|
|                  DK|  196|
|                  NO|  113|
|                  IE|  111|
|               999.0|    2|
|steve-sabos-comed...|    1|
|the-nashville-ses...|    1|
|the-swirly-twirli...|    1|
|the-next-golden-a...|    1|
|here-without-art-...|    1|
| rofl-metaphor-56-ep|    1|
|              2500.0|    1|
|fire-fox-in-radic...|    1|
|seven-zero-eight-...|    1|
|up-until-now-reco...|    1|
|smile-its-not-tha...|    1|
|hannuka-story-the...|    1|
|kuya-ko-my-big-br...|    1|
|if-the-world-was-...|    1|
|jack-chaps-dog-de...|    1|
|bring-mary-mcdono...|    1|
|umeos-the-21st-ce...|    1|
|faith-struggle-vi...|    1|
|              5000.0|    1|
|dont-trust-yo

In [9]:
dfCasted.groupBy("currency").count.orderBy($"count".desc).show(100)

+--------------------+-----+
|            currency|count|
+--------------------+-----+
|                 USD|91545|
|                 GBP| 8743|
|                 CAD| 3733|
|                 AUD| 1877|
|                 EUR|  814|
|                  US|  406|
|                 NZD|  354|
|                 SEK|  240|
|                 DKK|  196|
|                 NOK|  113|
|               False|   73|
|                  GB|   13|
|                  AU|    3|
|                  CA|    3|
|                  NL|    2|
|the-artists-proce...|    1|
|             77750.0|    1|
|                null|    1|
|the-artists-proce...|    1|
|the-soloist-first...|    1|
|              1750.0|    1|
|flutter-and-there...|    1|
|final-day-a-drama...|    1|
|the-artists-proce...|    1|
|              6300.0|    1|
|jack-the-radio-lo...|    1|
|                  NO|    1|
|the-arrangement-a...|    1|
|                  NZ|    1|
+--------------------+-----+



In [10]:
dfCasted.select("deadline").dropDuplicates.show()

+----------+
|  deadline|
+----------+
|1254767400|
|1263597060|
|1274115540|
|1274145060|
|1280372820|
|1281730020|
|1281769200|
|1283292000|
|1286168724|
|1287701689|
|1288926510|
|1289102340|
|1290314164|
|1290704714|
|1291537500|
|1292387613|
|1292400000|
|1293256800|
|1297473364|
|1297726200|
+----------+
only showing top 20 rows



In [11]:
dfCasted.groupBy("state_changed_at").count.orderBy($"count".desc).show(100)

+----------------+-----+
|state_changed_at|count|
+----------------+-----+
|            null|   85|
|      1414814340|   26|
|      1420088342|   20|
|      1425185942|   17|
|      1401483613|   14|
|      1427860741|   13|
|      1414800010|   12|
|      1401595142|   12|
|      1409543941|   11|
|      1414817940|   11|
|      1430452741|   10|
|      1372651141|   10|
|      1409544011|   10|
|      1359694743|   10|
|      1417496340|   10|
|      1343793542|   10|
|      1343879941|    9|
|      1383289142|    9|
|      1425078013|    9|
|      1349074740|    9|
|      1423976340|    9|
|      1346471944|    9|
|      1364788742|    9|
|      1404197943|    9|
|      1383278341|    8|
|      1420091941|    8|
|      1412135943|    8|
|      1412136011|    8|
|      1362113941|    7|
|      1388552340|    7|
|      1378011540|    7|
|      1414900740|    7|
|      1425272344|    7|
|      1409554741|    7|
|      1430452742|    7|
|      1370059144|    7|
|      1404187140|    7|


In [12]:
dfCasted.groupBy("backers_count").count.orderBy($"count".desc).show(100)

+-------------+-----+
|backers_count|count|
+-------------+-----+
|            0|12773|
|            1| 8788|
|            2| 5954|
|            3| 4202|
|            4| 3174|
|            5| 2665|
|            6| 2242|
|            7| 2000|
|            8| 1693|
|            9| 1453|
|           10| 1413|
|           11| 1258|
|           13| 1221|
|           12| 1191|
|           14| 1174|
|           15| 1084|
|           16| 1043|
|           18| 1041|
|           17| 1005|
|           19|  982|
|           20|  899|
|           21|  873|
|           22|  826|
|           23|  825|
|           24|  813|
|           26|  788|
|           25|  785|
|           27|  742|
|           28|  713|
|           29|  677|
|           34|  668|
|           30|  657|
|           32|  633|
|           33|  610|
|           38|  591|
|           31|  589|
|           35|  580|
|           36|  574|
|           40|  572|
|           37|  562|
|           39|  553|
|           41|  530|
|         

In [13]:
dfCasted.select("goal", "final_status").orderBy($"goal".desc).show(30)

+---------+------------+
|     goal|final_status|
+---------+------------+
|100000000|           0|
|100000000|           0|
|100000000|           0|
|100000000|           0|
|100000000|           0|
|100000000|           0|
|100000000|           0|
| 73000000|           0|
| 70000000|           0|
| 50000000|           0|
| 50000000|           0|
| 50000000|           0|
| 50000000|           0|
| 39023437|           0|
| 30000000|           0|
| 30000000|           0|
| 25000000|           0|
| 25000000|           0|
| 22000000|           0|
| 21474836|           0|
| 21474836|           0|
| 21000000|           0|
| 20000000|           0|
| 20000000|           0|
| 17400000|           0|
| 16250000|           0|
| 16000000|           0|
| 15000000|           0|
| 15000000|           0|
| 11000000|           0|
+---------+------------+
only showing top 30 rows



In [14]:
dfCasted.select("goal", "final_status").show(30)

+-----+------------+
| goal|final_status|
+-----+------------+
|   20|           1|
|  300|           0|
|   30|           0|
|  500|           1|
| 2000|           0|
|  700|           0|
|  250|           0|
| 1000|           1|
| 5000|           0|
| 3500|           0|
|30000|           0|
|  300|           0|
| 1500|           1|
|  500|           1|
|  500|           0|
| 1000|           1|
|  600|           0|
| 1500|           1|
| 3500|           0|
| 1000|           1|
|  365|           1|
|  500|           1|
|  400|           1|
|  100|           1|
|  250|           1|
| 3000|           1|
|  640|           0|
| 3500|           1|
|  300|           1|
| 1000|           1|
+-----+------------+
only showing top 30 rows



In [15]:
dfCasted.groupBy("country", "currency").count.orderBy($"count".desc).show(50)

+--------------------+--------------------+-----+
|             country|            currency|count|
+--------------------+--------------------+-----+
|                  US|                 USD|91545|
|                  GB|                 GBP| 8743|
|                  CA|                 CAD| 3733|
|                  AU|                 AUD| 1877|
|                  NL|                 EUR|  702|
|               False|                  US|  405|
|                  NZ|                 NZD|  354|
|                  SE|                 SEK|  240|
|                  DK|                 DKK|  196|
|                  NO|                 NOK|  113|
|                  IE|                 EUR|  111|
|               False|                  GB|   13|
|               False|                  AU|    3|
|               False|                  CA|    3|
|               False|                  NL|    2|
|noah-clean-prison...|               False|    1|
|hannuka-story-the...|               False|    1|


Enlevez la colonne *disable_communication*. Cette colonne est très largement majoritairement à *false*, il n'y a que 322 *true* (négligeable), le reste est non-identifié :

In [16]:
val df2: DataFrame = dfCasted.drop("disable_communication")

df2: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 11 more fields]


### Les fuites du futur

Dans les datasets construits a posteriori des évènements, il arrive que des données ne pouvant être connues qu'après la résolution de chaque évènement soient insérées dans le dataset. On a des fuites depuis le futur ! Par exemple, on a ici le nombre de "backers" dans la colonne backers_count. Il s'agit du nombre total de personnes ayant investi dans chaque projet, or ce nombre n'est connu qu'après la fin de la campagne.

Il faut savoir repérer et traiter ces données pour plusieurs raisons :

* pendant l'entraînement (si on ne les a pas enlevées) elles facilitent le travail du modèle puisqu'elles contiennent des informations directement liées à ce qu'on veut prédire. Par exemple, si `backers_count = 0` on est sûr que la campagne a raté.
* au moment d'appliquer notre modèle, les données du futur ne sont pas présentes (puisqu'elles ne sont pas encore connues). On ne peut donc pas les utiliser comme input pour un modèle.


Ici, pour enlever les données du futur on retire les colonnes *backers_count* et *state_changed_at* :

In [17]:
val dfNoFutur: DataFrame = df2.drop("backers_count", "state_changed_at")

dfNoFutur: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


### Colonnes currency et country

On pourrait penser que les colonnes *currency* et *country* sont redondantes, auquel cas on pourrait enlever une des colonnes. Mais c'est oublier par exemple que tous les pays de la zone euro ont la même monnaie ! Il faut donc garder les deux colonnes.

Il semble y avoir des inversions entre ces deux colonnes et du nettoyage à faire. On remarque en particulier que lorsque `country = "False"` le country à l'air d'être dans currency. On le voit avec la commande :

In [18]:
df.filter($"country" === "False")
  .groupBy("currency")
  .count
  .orderBy($"count".desc)
  .show(50)

+--------+-----+
|currency|count|
+--------+-----+
|      US|  405|
|      GB|   13|
|      AU|    3|
|      CA|    3|
|      NL|    2|
|      NZ|    1|
|      NO|    1|
+--------+-----+



Créez deux udfs nommées *udf_country* et *udf_currency* telles que :

* ***cleanCountry*** : si `country = "False"` prendre la valeur de currency, sinon si country est une chaîne de caractères de taille autre que 2 remplacer par *null*, et sinon laisser la valeur country actuelle. On veut les résultat dans une nouvelle colonne *country2*.
* ***cleanCurrency*** : si `currency.length != 3` currency prend la valeur `null`, sinon laisser la valeur currency actuelle. On veut les résultats dans une nouvelle colonne `currency2`.

In [19]:
def cleanCountry(country: String, currency: String): String = {
  if (country == "False")
    currency
  else
    country
}

def cleanCurrency(currency: String): String = {
  if (currency != null && currency.length != 3)
    null
  else
    currency
}

val cleanCountryUdf = udf(cleanCountry _)
val cleanCurrencyUdf = udf(cleanCurrency _)

val dfCountry: DataFrame = dfNoFutur
  .withColumn("country2", cleanCountryUdf($"country", $"currency"))
  .withColumn("currency2", cleanCurrencyUdf($"currency"))
  .drop("country", "currency")

cleanCountry: (country: String, currency: String)String
cleanCurrency: (currency: String)String
cleanCountryUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StringType,Some(List(StringType, StringType)))
cleanCurrencyUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
dfCountry: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


In [20]:
dfNoFutur.show(10)

+--------------+--------------------+--------------------+----+--------------------+-------+--------+----------+----------+-----------+------------+
|    project_id|                name|                desc|goal|            keywords|country|currency|  deadline|created_at|launched_at|final_status|
+--------------+--------------------+--------------------+----+--------------------+-------+--------+----------+----------+-----------+------------+
|kkst1451568084| drawing for dollars|I like drawing pi...|  20| drawing-for-dollars|     US|     USD|1241333999|1240600507| 1240602723|           1|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...| 300|sponsor-dereck-bl...|     US|     USD|1242429000|1240960224| 1240975592|           0|
| kkst183622197|       Mr. Squiggles|So I saw darkpony...|  30|        mr-squiggles|     US|     USD|1243027560|1242163613| 1242164398|           0|
| kkst597742710|Help me write my ...|Do your part to h...| 500|help-me-write-my-...|     US|     USD|12435

On peut également faire ce qu'on vient de faire en utilisant `sql.functions.when` :              
***Attention à remplacer les % par des \$***
```scala
dfNoFutur
  .withColumn("country2", when(%"country" === "False", %"currency").otherwise(%"country"))
  .withColumn("currency2", when(%"country".isNotNull && length(%"currency") =!= 3, null).otherwise(%"currency"))
  .drop("country", "currency")
```

On a montré ici l'utilisation d'udfs, mais de façon générale toujours privilégier les fonctions déjà codées dans Spark car elles sont optimisées.

Pour une classification, l’équilibrage entre les différentes classes cibles dans les données d’entraînement doit être contrôlé (et éventuellement corrigé). Affichez le nombre d’éléments de chaque classe (colonne ***final_status***). 

In [21]:
dfCountry.groupBy("final_status").count.orderBy($"count".desc).show(30)

+------------+-----+
|final_status|count|
+------------+-----+
|           0|73266|
|           1|34419|
|           2|   20|
|           3|   19|
|           5|   17|
|          11|   11|
|          14|   10|
|          22|   10|
|          23|   10|
|          18|   10|
|           6|    9|
|           7|    9|
|           9|    8|
|           4|    7|
|          13|    6|
|          21|    6|
|          19|    6|
|           8|    6|
|          49|    5|
|          32|    5|
|          17|    5|
|          31|    4|
|          25|    4|
|          42|    4|
|          52|    4|
|          57|    4|
|          41|    4|
|          28|    4|
|          33|    4|
|          15|    4|
+------------+-----+
only showing top 30 rows



In [22]:
dfCountry.printSchema

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)



L'énoncé du TP nous propose de conservez uniquement les lignes qui nous intéressent pour le modèle, à savoir lorsque `final_status` vaut 0 (Fail) ou 1 (Success). Les autres valeurs n'étant pas définie, on les enlève : 

In [23]:
val dfCountry2: DataFrame = dfCountry
  .filter($"final_status" === 0 || $"final_status" === 1)

dfCountry2.groupBy("final_status").count.orderBy($"count".desc).show()

+------------+-----+
|final_status|count|
+------------+-----+
|           0|73266|
|           1|34419|
+------------+-----+



dfCountry2: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


On aurait toutefois pu passer toutes les valeurs différentes de 1 à 0 en considérant que les campagnes qui ne sont pas un Success sont un Fail. Le code serait :
```scala
def cleanFinalStatus(final_status: Int): Int = {
  if (final_status >== 1)
    0
  else
    final_status
}

val cleanFinalStatusUdf = udf(cleanFinalStatus _)

val dfCountry2: DataFrame = dfCountry
  .withColumn("final_status2", cleanFinalStatusUdf($"final_status"))
  .drop("final_status")
```

## Ajouter et manipuler des colonnes

Il est parfois utile d’ajouter des *features* (colonnes dans un DataFrame) pour aider le modèle lors de son apprentissage. Ici nous allons créer de nouvelles features à partir de celles déjà présentes dans les données. Dans certains cas on peut ajouter des features en allant chercher des sources de données supplémentaires.

Les dates ne sont pas directement exploitables par un modèle sous leur forme initiale dans nos données : il s’agit de timestamps Unix (nombre de secondes depuis le 1er janvier 1970 0h00 UTC). Nous allons traiter ces données pour en extraire des informations pour aider les modèles. Nous allons, entre autres, nous servir des fonctions liées aux dates de l'objet [functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$) :          
```scala
def from_unixtime(ut: Column): Column

Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the yyyy-MM-dd HH:mm:ss format.
```

```scala
def datediff(end: Column, start: Column): Column

Returns the number of days from start to end.
Only considers the date part of the input. For example:
dateddiff("2018-01-10 00:00:00", "2018-01-09 23:59:59")
// returns 1
```

Nous allons maintenant définir un nouveau *DataFrame* à partir de ***dfCountry2*** sur lequel nous allons effectuer les opérations suivantes :
* Ajoutez une colonne ***days_campaign*** qui représente la durée de la campagne en jours (le nombre de jours entre *launched_at* et *deadline*).
* Ajoutez une colonne ***hours_prepa*** qui représente le nombre d’heures de préparation de la campagne entre *created_at* et *launched_at*. On pourra arrondir le résultat à 3 chiffres après la virgule.

In [24]:
val dfCountry3: DataFrame = dfCountry2
    .withColumn("days_campaign", datediff(from_unixtime($"deadline") , from_unixtime($"launched_at")))
    .withColumn("hours_prepa", ((($"launched_at" - $"created_at")/3.6).cast("Int")/1000))

dfCountry3.select("project_id","days_campaign", "hours_prepa", "deadline", "launched_at", "created_at").orderBy($"hours_prepa".asc).show(10)

+--------------+-------------+-----------+--------+-----------+----------+
|    project_id|days_campaign|hours_prepa|deadline|launched_at|created_at|
+--------------+-------------+-----------+--------+-----------+----------+
|kkst1677718959|         null| -16651.746|    null| 1344222924|1404169212|
|kkst1985544974|         null|  -8018.505|    null| 1381313492|1410180112|
| kkst701794135|         null|   -3927.48|    null| 1357429366|1371568297|
|kkst1948088233|         null|  -3405.279|    null| 1370638345|1382897352|
| kkst877166482|         null|  -3118.938|    null| 1335298561|1346526738|
|kkst1022204726|         null|   -2447.99|    null| 1408807273|1417620039|
| kkst406922818|         null|  -2161.731|    null| 1271256372|1279038607|
| kkst286648046|         null|  -1871.667|    null| 1416696076|1423434079|
| kkst530098841|         null|  -1656.264|    null| 1335826917|1341789469|
| kkst507710890|         null|  -1648.251|    null| 1310694245|1316627951|
+--------------+---------

dfCountry3: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 11 more fields]


On constate ci-dessus que des dates des projets ont une **date de lancement antérieure à la date de création**, ce qui n'est pas cohérent. Nous allons donc tenter de nettoyer ces données. Pour cela, commençons par comptabiliser le nombre d'heures de prépa négatives :

In [25]:
dfCountry3.groupBy($"hours_prepa" < 0).count.orderBy($"count".desc).show()

+-----------------+------+
|(hours_prepa < 0)| count|
+-----------------+------+
|            false|107615|
|             true|    70|
+-----------------+------+



On constate donc qu'il n'y a que 70 projets dont le les dates de lancement sont antérieures aux dates de création. Aussi, nous avons deux possibilités pour taiter ces cas :
* Supprimer les lignes de ces projets ;
* Fixer la date de création égale à celle de lancement ;

Compte tenu du fait qu'il n'y a que 70 valeurs concernées, on peut choisir des les négliger et donc de supprimer les lignes correspondantes. Cependant, pour des raisons pédagogiques, nous allons choisir la deuxième option.

In [26]:
def cleanHoursPrepa(created_at: Int, launched_at: Int, hours_prepa: Int): Int = {
  if (hours_prepa < 0)
    launched_at
  else
    created_at
}

val cleanHoursPrepaUdf = udf(cleanHoursPrepa _)

val dfCountry4: DataFrame = dfCountry3
  .withColumn("created_at2", cleanHoursPrepaUdf($"created_at", $"launched_at", $"hours_prepa"))
  .withColumn("hours_prepa", ((($"launched_at" - $"created_at2")/3.6).cast("Int")/1000))
  .drop("created_at")


dfCountry4.groupBy($"hours_prepa" < 0).count.orderBy($"count".desc).show()

+-----------------+------+
|(hours_prepa < 0)| count|
+-----------------+------+
|            false|107685|
+-----------------+------+



cleanHoursPrepa: (created_at: Int, launched_at: Int, hours_prepa: Int)Int
cleanHoursPrepaUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function3>,IntegerType,Some(List(IntegerType, IntegerType, IntegerType)))
dfCountry4: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 11 more fields]


A titre de vérification avec le projet ayant l'id *kkst1677718959* qui était précédemment dans le cas que nous souhaitions traiter :

In [27]:
dfCountry4.filter($"project_id" === "kkst1677718959").show()

+--------------+--------------------+-----------+----+--------+--------+-----------+------------+--------+---------+-------------+-----------+-----------+
|    project_id|                name|       desc|goal|keywords|deadline|launched_at|final_status|country2|currency2|days_campaign|hours_prepa|created_at2|
+--------------+--------------------+-----------+----+--------+--------+-----------+------------+--------+---------+-------------+-----------+-----------+
|kkst1677718959|"Chris Bartick's ...| NY Harbor"|null|  1500.0|    null| 1344222924|           1|      US|     null|         null|        0.0| 1344222924|
+--------------+--------------------+-----------+----+--------+--------+-----------+------------+--------+---------+-------------+-----------+-----------+



* Supprimons les colonnes ***launched_at***, ***created_at2***, et ***deadline*** qui ne sont pas exploitables pour un modèle.

In [28]:
val dfCountry5: DataFrame = dfCountry4
 .drop("launched_at", "created_at2", "deadline")

dfCountry5.show(3)

+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+
|    project_id|                name|                desc|goal|            keywords|final_status|country2|currency2|days_campaign|hours_prepa|
+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+
|kkst1451568084| drawing for dollars|I like drawing pi...|  20| drawing-for-dollars|           1|      US|      USD|            9|      0.615|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...| 300|sponsor-dereck-bl...|           0|      US|      USD|           17|      4.268|
| kkst183622197|       Mr. Squiggles|So I saw darkpony...|  30|        mr-squiggles|           0|      US|      USD|           10|      0.218|
+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+

dfCountry5: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 8 more fields]


Pour exploiter les données sous forme de texte, nous allons commencer par réunir toutes les colonnes textuelles en une seule. En faisant cela, on rend indiscernable le texte du nom de la campagne, de sa description et des keywords, ce qui peut avoir des conséquences sur la qualité du modèle. Mais on cherche à construire ici un premier benchmark de modèle, avec une solution simple qui pourra servir de référence pour des modèles plus évolués.


Mettons les colonnes ***name***, ***desc***, et ***keywords*** en minuscules avec la fonction :
```scala
def lower(e: Column): Column

Converts a string column to lower case.
```

In [29]:
val dfCountry6: DataFrame = dfCountry5
 .withColumn("name", lower($"name"))
 .withColumn("desc", lower($"desc"))
 .withColumn("keywords", lower($"keywords"))

dfCountry6.show(5)

+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+
|    project_id|                name|                desc|goal|            keywords|final_status|country2|currency2|days_campaign|hours_prepa|
+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+
|kkst1451568084| drawing for dollars|i like drawing pi...|  20| drawing-for-dollars|           1|      US|      USD|            9|      0.615|
|kkst1474482071|sponsor dereck bl...|i  dereck blackbu...| 300|sponsor-dereck-bl...|           0|      US|      USD|           17|      4.268|
| kkst183622197|       mr. squiggles|so i saw darkpony...|  30|        mr-squiggles|           0|      US|      USD|           10|      0.218|
| kkst597742710|help me write my ...|do your part to h...| 500|help-me-write-my-...|           1|      US|      USD|           30|      0.815|

dfCountry6: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 8 more fields]


Ajoutons alors une colonne ***text***, qui contient la concaténation des Strings des colonnes ***name***, ***desc***, et ***keywords***. ATTENTION à bien mettre des espaces entre les chaînes de caractères concaténées, car on fera par la suite un split en se servant des espaces entre les mots. Pour cela, on se sert de la fonction :
```scala
def concat_ws(sep: String, exprs: Column*): Column

Concatenates multiple input string columns together into a single string column, using the given separator.
```

In [30]:
dfCountry6.withColumn("text", concat_ws(" ", $"name", $"desc", $"keywords")).select($"text").head()

res25: org.apache.spark.sql.Row = [drawing for dollars i like drawing pictures. and then i color them too. so i thought i would suggest something for me to draw and then if someone wants... drawing-for-dollars]


In [31]:
val dfCountry7: DataFrame = dfCountry6
 .withColumn("text", concat_ws(" ", $"name", $"desc", $"keywords"))

dfCountry7.show(3)

+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+--------------------+
|    project_id|                name|                desc|goal|            keywords|final_status|country2|currency2|days_campaign|hours_prepa|                text|
+--------------+--------------------+--------------------+----+--------------------+------------+--------+---------+-------------+-----------+--------------------+
|kkst1451568084| drawing for dollars|i like drawing pi...|  20| drawing-for-dollars|           1|      US|      USD|            9|      0.615|drawing for dolla...|
|kkst1474482071|sponsor dereck bl...|i  dereck blackbu...| 300|sponsor-dereck-bl...|           0|      US|      USD|           17|      4.268|sponsor dereck bl...|
| kkst183622197|       mr. squiggles|so i saw darkpony...|  30|        mr-squiggles|           0|      US|      USD|           10|      0.218|mr. squiggles so ...|
+--------------+

dfCountry7: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


## Valeurs nulles

Il y plusieurs façons de traiter les valeurs nulles pour les rendre exploitables par un modèle. Nous avons déjà vu que parfois les valeurs nulles peuvent être comblées en utilisant les valeurs d’une autre colonne (parce que le dataset a été mal préparé). On peut aussi décider de supprimer les exemples d’entraînement contenant des valeurs nulles, mais on risque de perdre beaucoup de données. On peut également les remplacer par la valeur moyenne ou médiane de la colonne. On peut enfin leur attribuer une valeur particulière, distincte des autres valeurs de la colonne.

Remplaçons les valeurs nulles des colonnes ***days_campaign***, ***hours_prepa***, et ***goal*** par la valeur -1 et par "*unknown*" pour les colonnes ***country2*** et ***currency2***.

In [39]:
dfCountry7.filter("days_campaign is null").count

res34: Long = 70


In [40]:
dfCountry7.filter("days_campaign is null").show(70)

+--------------+--------------------+--------------------+----+--------+------------+--------+---------+-------------+-----------+--------------------+
|    project_id|                name|                desc|goal|keywords|final_status|country2|currency2|days_campaign|hours_prepa|                text|
+--------------+--------------------+--------------------+----+--------+------------+--------+---------+-------------+-----------+--------------------+
|kkst1096095848|"help """""""""""...|"""""""""""""""""...|null|   750.0|           0|      US|     null|         null|        0.0|"help """""""""""...|
|kkst2016848438|"""""""""""""""""...| receive the albu...|null|   100.0|           0|      US|     null|         null|        0.0|"""""""""""""""""...|
| kkst406922818|"""""""""""""""""...|   become a part ...|null|  1200.0|           1|      US|     null|         null|        0.0|"""""""""""""""""...|
|   kkst5141767|" """""""""""""""...|"""""""""""""""""...|null| 10000.0|           0|   

In [42]:
dfCountry7.filter("hours_prepa is null").count

res37: Long = 0


In [43]:
dfCountry7.filter("goal is null").count

res38: Long = 70


In [44]:
dfCountry7.filter("country2 is null").count

res39: Long = 0


In [45]:
dfCountry7.filter("currency2 is null").count

res40: Long = 70


In [96]:
dfCountry7.withColumn("currency2", when($"currency2".isNull, "unknown").otherwise("currency2")).filter($"currency2" === "unknown").show()

+--------------+--------------------+--------------------+----+--------+------------+--------+---------+-------------+-----------+--------------------+
|    project_id|                name|                desc|goal|keywords|final_status|country2|currency2|days_campaign|hours_prepa|                text|
+--------------+--------------------+--------------------+----+--------+------------+--------+---------+-------------+-----------+--------------------+
|kkst1096095848|"help """""""""""...|"""""""""""""""""...|null|   750.0|           0|      US|  unknown|         null|        0.0|"help """""""""""...|
|kkst2016848438|"""""""""""""""""...| receive the albu...|null|   100.0|           0|      US|  unknown|         null|        0.0|"""""""""""""""""...|
| kkst406922818|"""""""""""""""""...|   become a part ...|null|  1200.0|           1|      US|  unknown|         null|        0.0|"""""""""""""""""...|
|   kkst5141767|" """""""""""""""...|"""""""""""""""""...|null| 10000.0|           0|   

In [98]:
val dfCountry8: DataFrame = dfCountry7 
 .withColumn("days_campaign", when($"days_campaign".isNull, -1).otherwise("days_campaign"))
 .withColumn("hours_prepa", when($"hours_prepa".isNull, -1).otherwise("hours_prepa"))
 .withColumn("goal", when($"goal".isNull, -1).otherwise("goal"))
 .withColumn("country2", when($"country2".isNull, "unknown").otherwise("country2"))
 .withColumn("currency2", when($"currency2".isNull, "unknown").otherwise("currency2"))


dfCountry8.filter($"currency2".isNull).show()

+----------+----+----+----+--------+------------+--------+---------+-------------+-----------+----+
|project_id|name|desc|goal|keywords|final_status|country2|currency2|days_campaign|hours_prepa|text|
+----------+----+----+----+--------+------------+--------+---------+-------------+-----------+----+
+----------+----+----+----+--------+------------+--------+---------+-------------+-----------+----+



dfCountry8: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


## Sauvegarder un DataFrame
Sauvegardons le DataFrame final au format parquet sur notre machine :
```scala
monDataFrameFinal.write.parquet("/path/ou/les/donnees/seront/sauvegardees")
```
Attention ! Lorsqu’on sauvegarde un output en Spark, le résultat est toujours un répertoire contenant un ou plusieurs fichiers. Cela est dû à la nature distribuée de Spark. Toutefois, pour écrire sur un seul et unique fichier, il est possible d'utiliser la commande suivante :
```scala
monDataFrameFinal.coalesce(1).write.parquet("/path/ou/les/donnees/seront/sauvegardees")
```
Cette commande fonctionne aussi pour d'autres formats comme *.csv* : `write.parquet`.

In [103]:
val monDataFrameFinal: DataFrame = dfCountry8

monDataFrameFinal.write.parquet("/home/p5hngk/Downloads/GitHub/INF_729---Introduction_au_framework_Hadoop/cours-spark-telecom-master/monDataFrameFinal")

monDataFrameFinal: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$