In [150]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession,functions}

val conf = {new SparkConf().setAll(Map("spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12"
    ))}

    // Initialisation du SparkSession qui est le point d'entrée vers Spark SQL (donne accès aux dataframes, aux RDD,
    // création de tables temporaires, etc., et donc aux mécanismes de distribution des calculs)
val spark = {SparkSession
  .builder
  .config(conf)
  .appName("TP Spark : Preprocessor")
  .getOrCreate}

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@c15c311
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@64a23c0c


In [130]:
val df:DataFrame = spark
      .read
      .option("header", true) 
      .option("inferSchema", "true") // pour inférer le type de chaque colonne (Int, String, etc.)
      .csv("/home/jorge/Documents/Cours/Spark/RepoAdotTPs/data/train_clean.csv")

println(s"Nombre de lignes : ${df.count}")
println(s"Nombre de colonnes : ${df.columns.length}")
df.columns
df.printSchema()
println("\n")
println("Hello World ! from Preprocessor")
println("\n")

// val dfcasted:DataFrame = df
//     .withColumn("goal",$"goal".cast("Int"))
//     .withColumn("deadline",$"deadline".cast("Int"))
//     .withColumn("state_changed_at",$"state_changed_at".cast("Int"))
//     .withColumn("created_at",$"created_at".cast("Int"))
//     .withColumn("launched_at", $"launched_at".cast("Int"))
//     .withColumn("backers_count", $"backers_count".cast("Int"))
//     .withColumn("final_status", $"final_status".cast("Int"))

val dfCasted: DataFrame = df
  .withColumn("goal", $"goal".cast("Int"))
  .withColumn("deadline" , $"deadline".cast("Int"))
  .withColumn("state_changed_at", $"state_changed_at".cast("Int"))
  .withColumn("created_at", $"created_at".cast("Int"))
  .withColumn("launched_at", $"launched_at".cast("Int"))
  .withColumn("backers_count", $"backers_count".cast("Int"))
  .withColumn("final_status", $"final_status".cast("Int"))

dfCasted.printSchema()



Nombre de lignes : 108129
Nombre de colonnes : 14
root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- state_changed_at: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- launched_at: string (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)



Hello World ! from Preprocessor


root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = 

df: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]
dfCasted: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


In [4]:
dfCasted
  .select("goal", "backers_count", "final_status")
  .describe()
  .show

+-------+-----------------+-------------------+-------------------+
|summary|             goal|      backers_count|       final_status|
+-------+-----------------+-------------------+-------------------+
|  count|           107615|             108128|             108128|
|   mean|36839.03430748502|  6434187.413250962| 1052360.7834973366|
| stddev|974215.3015529736|9.324061726649426E7|3.776049940184165E7|
|    min|                0|                  0|                  0|
|    max|        100000000|         1430423170|         1428977971|
+-------+-----------------+-------------------+-------------------+



In [9]:
val n = 5

dfCasted
    .select("name","goal", "backers_count", "final_status")
    .show(n)

dfCasted
    .select("country","keywords","disable_communication","currency")
    .show(n)

dfCasted
    .select("deadline","state_changed_at","created_at","launched_at")
    .show(n)

+--------------------+----+-------------+------------+
|                name|goal|backers_count|final_status|
+--------------------+----+-------------+------------+
| drawing for dollars|  20|            3|           1|
|Sponsor Dereck Bl...| 300|            2|           0|
|       Mr. Squiggles|  30|            0|           0|
|Help me write my ...| 500|           18|           1|
|Support casting m...|2000|            1|           0|
+--------------------+----+-------------+------------+
only showing top 5 rows

+-------+--------------------+---------------------+--------+
|country|            keywords|disable_communication|currency|
+-------+--------------------+---------------------+--------+
|     US| drawing-for-dollars|                False|     USD|
|     US|sponsor-dereck-bl...|                False|     USD|
|     US|        mr-squiggles|                False|     USD|
|     US|help-me-write-my-...|                False|     USD|
|     US|support-casting-m...|                

n: Int = 5


In [129]:
// dfCasted.groupBy("disable_communication").count.orderBy($"count".desc).show(100)
// dfCasted.groupBy("disable_communication").count.orderBy($"count".desc).show
// dfCasted.groupBy("country").count.orderBy($"count".desc).show(100)
// dfClean.groupBy("currency").count.orderBy($"count".desc).show(100)
// dfCasted.select("deadline").dropDuplicates.show()

// (dfCasted.select("deadline").dropDuplicates.count()
//  ,dfCasted.select("deadline").count())

// dfCasted.groupBy("state_changed_at").count.orderBy($"count".desc).show(100)
// dfCasted.groupBy("backers_count").count.orderBy($"count".desc).show(100)
// dfCasted.select("goal", "final_status").show(30)
// dfCasted.groupBy("country", "currency").count.orderBy($"count".desc).show(50)

// Cleaning à faire: 
// Only keep rows with "True" or "False" in disable_communication
// drop disable_communication
// drop rows where regex or different from the main countries (after
// trying to fill with currency stage later)

// same than above with the country culumn
// dropduplicates in id column --done
// filter rows where state_changed_at null
// infer country from / currency if country is null (befre dropping countries)
// US -> US , GB ->GB, CA->CA, AU-AU, NL->NL
// 

In [128]:
val dfClean:DataFrame = dfCasted
    .dropDuplicates("deadline")
    .filter(!isnull($"state_changed_at"))
    .withColumn("country",when($"country" === "False",$"currency").otherwise($"country"))
    .filter(($"disable_communication"==="True") || ($"disable_communication"==="False"))
    .drop("disable_communication")
    .filter($"country" rlike ".{2}")
    .filter($"currency" rlike ".{3}")
    .drop("backers_count","state_changed_at")

dfClean
    .select("name","goal","final_status")
    .show(n)

dfClean
    .select("country","keywords","currency")
    .show(n)

dfClean
    .select("deadline","created_at","launched_at")
    .show(n)

// df.filter($"country" === "False")
//   .groupBy("currency")
//   .count
//   .orderBy($"count".desc)
//   .show(50)

+--------------------+-----+------------+
|                name| goal|final_status|
+--------------------+-----+------------+
|"""""""""""""""""...| 5000|           0|
|Anatomy of a Cred...|30000|           0|
|Gnash Open Source...| 5000|           0|
|Metaphysical Arch...| 2000|           0|
|Help Me Finish My...| 1600|           0|
+--------------------+-----+------------+
only showing top 5 rows

+-------+--------------------+--------+
|country|            keywords|currency|
+-------+--------------------+--------+
|     US|lostles-at-tinys-...|     USD|
|     US|anatomy-of-a-cred...|     USD|
|     US|gnash-open-source...|     USD|
|     US|metaphysical-arch...|     USD|
|     US|help-me-finish-my...|     USD|
+-------+--------------------+--------+
only showing top 5 rows

+----------+----------+-----------+
|  deadline|created_at|launched_at|
+----------+----------+-----------+
|1244264400|1241415164| 1241480901|
|1245026160|1241829376| 1242056094|
|1247607960|1240955180| 12421618

dfClean: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


In [None]:
// Important:  col equivalent to $

In [14]:
// Solution proposée

// def cleanCountry(country: String, currency: String): String = {
//   if (country == "False")
//     currency
//   else
//     country
// }

// def cleanCurrency(currency: String): String = {
//   if (currency != null && currency.length != 3)
//     null
//   else
//     currency
// }

// val cleanCountryUdf = udf(cleanCountry _)
// val cleanCurrencyUdf = udf(cleanCurrency _)

// val dfCountry: DataFrame = dfNoFutur
//   .withColumn("country2", cleanCountryUdf($"country", $"currency"))
//   .withColumn("currency2", cleanCurrencyUdf($"currency"))
//   .drop("country", "currency")

// // ou encore, en utilisant sql.functions.when:
// dfNoFutur
//   .withColumn("country2", when($"country" === "False", $"currency").otherwise($"country"))
//   .withColumn("currency2", when($"country".isNotNull && length($"currency") =!= 3, null).otherwise($"currency"))
//   .drop("country", "currency")

df2: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 11 more fields]


In [224]:
val df2:DataFrame = dfClean
    .withColumn("days_campaign",datediff(from_unixtime($"deadline"),from_unixtime($"launched_at")))
    .withColumn("hours_prepa",(($"launched_at"-$"created_at")/60).cast("Int"))
    .drop("launched_at","deadline","created_at")
    .withColumn("name",lower($"name"))
    .withColumn("desc",lower($"desc"))
    .withColumn("keywords",lower($"keywords"))
    

val df3:DataFrame = df2
    .withColumn("text",concat($"name",lit(" "),$"desc",lit(" "),$"keywords"))
    .withColumn("days_campaign",when(isnull($"days_campaign"),-1).otherwise($"days_campaign"))
    .withColumn("hours_prepa",when(isnull($"hours_prepa"),-1).otherwise($"hours_prepa"))
    .withColumn("goal",when(isnull($"goal"),-1).otherwise($"goal"))
    .withColumn("country",when(isnull($"country")," ").otherwise($"country"))
    .withColumn("currency",when(isnull($"currency")," ").otherwise($"currency"))

df3.columns

df2: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 8 more fields]
df3: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]
res196: Array[String] = Array(project_id, name, desc, goal, keywords, country, currency, final_status, days_campaign, hours_prepa, text)


In [226]:
df3
    .select("name","goal","final_status")
    .show(n)

df3
    .select("country","keywords","currency")
    .show(n)


df3
    .select("days_campaign","hours_prepa","text")
    .show(n)

+--------------------+-----+------------+
|                name| goal|final_status|
+--------------------+-----+------------+
|"""""""""""""""""...| 5000|           0|
|anatomy of a cred...|30000|           0|
|gnash open source...| 5000|           0|
|metaphysical arch...| 2000|           0|
|help me finish my...| 1600|           0|
+--------------------+-----+------------+
only showing top 5 rows

+-------+--------------------+--------+
|country|            keywords|currency|
+-------+--------------------+--------+
|     US|lostles-at-tinys-...|     USD|
|     US|anatomy-of-a-cred...|     USD|
|     US|gnash-open-source...|     USD|
|     US|metaphysical-arch...|     USD|
|     US|help-me-finish-my...|     USD|
+-------+--------------------+--------+
only showing top 5 rows

+-------------+-----------+--------------------+
|days_campaign|hours_prepa|                text|
+-------------+-----------+--------------------+
|           32|       1095|"""""""""""""""""...|
|           35| 

In [229]:
df3.write.parquet("/home/jorge/Documents/Git/spark_project_kickstarter_2019_2020/cleanData.parquet")

In [195]:
df.select("country").map(line => (line.toString(),line.toString.length())).orderBy($"_2".desc).show(100)

+--------------------+---+
|                  _1| _2|
+--------------------+---+
|[A compendium of ...|132|
|[Real life storie...|129|
|[A blend of Melod...| 89|
|[feature-film-som...| 54|
|[steve-sabos-come...| 52|
|[dont-hang-up-on-...| 52|
|[dont-trust-your-...| 52|
|[the-nashville-se...| 52|
|[hannuka-story-th...| 52|
|[bring-mary-mcdon...| 52|
|[i-speak-fluent-m...| 52|
|[if-the-world-was...| 52|
|[sexy-krazy-raw-a...| 52|
|[promote-validate...| 52|
|[ice-will-reveal-...| 52|
|[the-swirly-twirl...| 52|
|[randys-vision-fo...| 52|
|[faith-struggle-v...| 52|
|[umeos-the-21st-c...| 52|
|[intimacy-with-tr...| 52|
|[the-bowhunter-a-...| 52|
|[sexisgoodtv-web-...| 52|
|[ms-groundhog-ms-...| 52|
|[the-next-golden-...| 52|
|[going-home-what-...| 52|
|[chadd-thomas-unf...| 52|
|[jason-blum-weste...| 52|
|[in-the-grass-a-s...| 52|
|[no-pride-no-sham...| 52|
|[way-up-in-vermon...| 52|
|[together-alone-t...| 52|
|[the-foxhole-an-a...| 52|
|[esp-intuition-an...| 52|
|[up-until-now-rec...| 52|
|