# Challenge - Kickstarter preprocessing
---
![](https://images.unsplash.com/photo-1530083727892-3c261661d7a4?ixlib=rb-1.2.1&ixid=eyJhcHBfaWQiOjEyMDd9&auto=format&fit=crop&w=1350&q=80)
Picture by [Steve Halama](https://unsplash.com/photos/GjSzvtZhMoA)

In this exercise, we will start working on the Kickstarter dataset, each record is about a specific campaign. Today, you will pre-process the dataset. 

During the next Spark course, you will apply machine learning to predict successful campaigns.

In [1]:
# Q0 - download csv file
!curl -O https://s3.eu-central-1.amazonaws.com/alex-image-hosting/train_clean.csv > train_clean.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

  0 31.1M    0 17046    0     0  13680      0  0:39:45  0:00:01  0:39:44 13702
curl: (23) Failed writing body (0 != 16384)


**Q1 - Have a look at our train_clean.csv file, with the linux ```head``` command.**

In [1]:
# Q1 - Unix command for file first lines
filename = "./train_clean.csv"
!head - n 2 $./arbres-paris.csv

'head' n'est pas reconnu en tant que commande interne
ou externe, un programme ex‚cutable ou un fichier de commandes.


**Q2 - Create the spark session variable, name it "preprocessing".**

In [2]:
# TODO
import findspark
findspark.init('C:/Spark/spark-2.4.4-bin-hadoop2/spark-2.4.4-bin-hadoop2.7')
from pyspark.sql import SparkSession

### STRIP_START

try:
    os.remove("metastore_db/db.lck")
    os.remove("metastore_db/dbex.lck")
except:
    pass

def build_spark_session(app_name, memory='4g', executors=4):
    return SparkSession.builder\
                      .appName(app_name)\
                      .config('spark.executor.memory', memory)\
                      .config('spark.executor.instances', executors)\
                      .getOrCreate()

spark_session = build_spark_session(app_name='ok-google')
### STRIP_END

## Loading & exploring data

**Q3 - Load the data from the train_clean.csv. We have seen at Q1 that this file has a header.**

In [3]:
filename = "./train_clean.csv"
!head - n 2 $filename

'head' n'est pas reconnu en tant que commande interne
ou externe, un programme ex‚cutable ou un fichier de commandes.


In [3]:
# TODO
df = spark_session.read.csv(filename, sep=",", \
                     header=True, \
                     inferSchema=True)

In [5]:
spark_session.sparkContext

In [101]:
df.columns

['project_id',
 'name',
 'desc',
 'goal',
 'keywords',
 'disable_communication',
 'country',
 'currency',
 'deadline',
 'state_changed_at',
 'created_at',
 'launched_at',
 'backers_count',
 'final_status']

**Q4 - Let's go for some exploration :**
	- 4.1) Number of lines and columns.
	- 4.2) Display the first 20  rows of the dataframe.
	- 4.3) Print the schema of the dataframe.

In [76]:
# Q4.1 Number of lines and columns.
df.count()

108129

In [107]:
# Q4.2 Display the first 20  rows of the dataframe.
df.show(20)

+--------------+--------------------+--------------------+-------+--------------------+---------------------+-------+--------+----------+----------------+----------+-----------+-------------+------------+
|    project_id|                name|                desc|   goal|            keywords|disable_communication|country|currency|  deadline|state_changed_at|created_at|launched_at|backers_count|final_status|
+--------------+--------------------+--------------------+-------+--------------------+---------------------+-------+--------+----------+----------------+----------+-----------+-------------+------------+
|kkst1451568084| drawing for dollars|I like drawing pi...|   20.0| drawing-for-dollars|                False|     US|     USD|1241333999|      1241334017|1240600507| 1240602723|            3|           1|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...|  300.0|sponsor-dereck-bl...|                False|     US|     USD|1242429000|      1242432018|1240960224| 1240975592|   

In [6]:
# Q4.3 Print the schema of the dataframe.
df.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- state_changed_at: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- launched_at: string (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)



In [102]:
df.dtypes

[('project_id', 'string'),
 ('name', 'string'),
 ('desc', 'string'),
 ('goal', 'string'),
 ('keywords', 'string'),
 ('disable_communication', 'string'),
 ('country', 'string'),
 ('currency', 'string'),
 ('deadline', 'string'),
 ('state_changed_at', 'string'),
 ('created_at', 'string'),
 ('launched_at', 'string'),
 ('backers_count', 'int'),
 ('final_status', 'int')]

**Q5 - When printing the schema, we see that all columns are strings. Assign the integer type to columns you think appropriate. Have a look at the csv file. This new dataframe will be named dfCasted, print its schema.**

*Hint : Use the .withColumn(newColName, newColValue) to cast each column.*

In [21]:
import pyspark.sql.functions as f
dfCasted = df.withColumn("goal", df["goal"].cast("integer"))\
             .withColumn("deadline", df["deadline"].cast("integer"))\
             .withColumn("state_changed_at", df["state_changed_at"].cast("integer"))\
             .withColumn("created_at", df["created_at"].cast("integer"))\
             .withColumn("launched_at", df["launched_at"].cast("integer"))\
             .withColumn("gobackers_countal", df["backers_count"].cast("integer"))\
             .withColumn("final_status", df["final_status"].cast("integer"))
        

In [8]:
# TODO
dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- gobackers_countal: integer (nullable = true)



You should get the following result : 

```
root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
```

**Q6 - We could have done this much faster. Do you know how ?**

**Hint** : Have a look at parameters in cell n°4

In [109]:
# TODO : Write your answer
dfCasted.select("goal").show(3)

+----+
|goal|
+----+
|  20|
| 300|
|  30|
+----+
only showing top 3 rows



## Data Cleaning

**Q7 - Give a statistical description of these columns together : goal, backers_count, final_status**

In [111]:
# TODO
dfCasted.select("goal", "backers_count","final_status").describe().show()

+-------+-----------------+-------------------+-------------------+
|summary|             goal|      backers_count|       final_status|
+-------+-----------------+-------------------+-------------------+
|  count|           107615|             108128|             108128|
|   mean|36839.03430748502|  6434187.413250962| 1052360.7834973366|
| stddev|974215.3015529736|9.324061726649426E7|3.776049940184165E7|
|    min|                0|                  0|                  0|
|    max|        100000000|         1430423170|         1428977971|
+-------+-----------------+-------------------+-------------------+



**Q8 - Let's have a look at the **disable_communication** column. Group by values and display a descending value count. Show the top 10 values.**

*Hint : groupBy, count, orderBy, show*

What do you notice ? Considering the number of lines of our dataset, does this column provides information ?

In [142]:
dfCasted.groupBy("disable_communication").count().sort(f.desc("count")).show(10)

+---------------------+------+
|disable_communication| count|
+---------------------+------+
|                False|107293|
|                 True|   322|
|               2500.0|     8|
|               1000.0|     7|
|               5000.0|     6|
|              10000.0|     5|
|               2000.0|     4|
|               8000.0|     3|
|  The Artist s Pro...|     3|
|               3000.0|     3|
+---------------------+------+
only showing top 10 rows



**What shall you do with this **disable_communication** column ?**

In [22]:
# TODO
dfCasted = dfCasted.drop("disable_communication")

**Q9 - Houston, we have a problem ! We can see the future in our dataset ! Can you find it ? These informations must be removed.**

*Hint : There are two problematic columns, it has something to do with the supporters, and a change during the project.*

In [11]:
# TODO

## **Q10 - Country & Currency : Start with some exploration of these columns.**

- Try some groupBy and counting, just like *Q8*. Then, read below.

You may think that *country* and *currency* are redundant, in which case we could just delete one of the two columns. What about Euro ?

In [161]:
# TODO : Country value count
#dfCasted.count()
dfCasted.groupBy("country").count().sort(f.desc("count")).show(10)

+-------+-----+
|country|count|
+-------+-----+
|     US|91545|
|     GB| 8743|
|     CA| 3733|
|     AU| 1877|
|     NL|  702|
|  False|  428|
|     NZ|  354|
|     SE|  240|
|     DK|  196|
|     NO|  113|
+-------+-----+
only showing top 10 rows



In [144]:
# TODO : Currency value count
#dfCasted.count()
dfCasted.groupBy("currency").count().sort(f.desc("count")).show(10)

+--------+-----+
|currency|count|
+--------+-----+
|     USD|91545|
|     GBP| 8743|
|     CAD| 3733|
|     AUD| 1877|
|     EUR|  814|
|      US|  406|
|     NZD|  354|
|     SEK|  240|
|     DKK|  196|
|     NOK|  113|
+--------+-----+
only showing top 10 rows



In [137]:
dfCasted.select("currency", "country").count()

108129

- Try selecting *goal* and *final_status*, and show some values.

In [140]:
# TODO
dfCasted.select("goal", "final_status").show(5)

+----+------------+
|goal|final_status|
+----+------------+
|  20|           1|
| 300|           0|
|  30|           0|
| 500|           1|
|2000|           0|
+----+------------+
only showing top 5 rows



- Try showing value count for country and currency in the same table.

In [171]:
# TODO
#dfCasted.select("country", "currency").distinct().show()
#df.agg(*(f.count(f.col(c)).alias(c) for c in ["country", "currency"])).show()
dfCasted.groupBy("currency","country").count().sort(f.desc("count")).show(10)
#A rédiger le sql

+--------+-------+-----+
|currency|country|count|
+--------+-------+-----+
|     USD|     US|91545|
|     GBP|     GB| 8743|
|     CAD|     CA| 3733|
|     AUD|     AU| 1877|
|     EUR|     NL|  702|
|      US|  False|  405|
|     NZD|     NZ|  354|
|     SEK|     SE|  240|
|     DKK|     DK|  196|
|     NOK|     NO|  113|
+--------+-------+-----+
only showing top 10 rows



**Q11 - Now, there is something else : Some values for *country* have the value *False*. Display these records, and groupBy *currency*, descending.**

*Hint : The instruction chain is the following : dfCasted.filter().groupBy().count().orderBy().show(), fill 3 parentheses.*

In [11]:
# TODO
dfCasted.filter(f.col("country")=="False").groupBy("currency").count().sort(f.desc("count")).show(5, truncate=False, vertical=True)

-RECORD 0-------
 currency | NZ  
 count    | 1   
-RECORD 1-------
 currency | NO  
 count    | 1   
-RECORD 2-------
 currency | NL  
 count    | 2   
-RECORD 3-------
 currency | AU  
 count    | 3   
-RECORD 4-------
 currency | CA  
 count    | 3   
only showing top 5 rows



*Definition - Custom functions :* Some column operations are already defined inside Spark, but we often need to apply more complex or more custom function. In this case, we can create User Defined Functions (UDF) and apply them on columns.

**Q12 - In this question, we will create two UDF.**
- **udfCountry(country, currency)** : If country=False, take the currency value, else, leave the country value.
- **udfCurrency(currency)** : If the length of currency is different than 3, assign a null value, else, leave the currency value.

In [6]:
from pyspark.sql.types import StringType, IntegerType, DoubleType

In [7]:
def f_country(country:str, currency:str) -> bool:
    return currency if country is False else country
from typing import Optional
def f_currency(currency:str)->Optional:
    return currency if len(currency)==3 else Null
udf_country = f.udf(f_country, StringType())
udf_currency = f.udf(f_currency, StringType())

In [None]:
#Q16

df = df\
     .withColumn('hours_prepa', datediff(col('dead_line_clean'), coll('lauched_at_clean')))\
     .#round au lieu datediff 3600,2 hours_prepa diff
def filter_udf(input_prepa, days_campagns, goal)->bool:
    def _gt(value)->booll:
        try return value==0
    exept:
        return False
    return _gt(input_prepa and _gt(days_campagns))

udf_filter = f.udf(filter_udf, BooleanType())

df.filter(udf_filter(col('hours_prepa'), col('days_campagns'), col('goal')))



In [None]:
#Q24
df\
.repartition(8*10)
.write\
.mode(overwrite)
.paquet(paquetquet_file_path)#fichier paquet

In [170]:
#check the input output
input_count = spark.read.csv(input_countut_path).count()
output_count = spark.read.csv(input_countut_pat).count()


NameError: name 'spark' is not defined

- BinaryType – Données binaires.
- BooleanType – Valeurs booléennes.
- ByteType – Valeur d'octet.
- DateType – Valeur d'horodatage.
- DoubleType – Valeur double à virgule flottante.
- IntegerType – Valeur d'entier.
- LongType – Valeur d'entier long.
- NullType – Valeur null.
- ShortType – Valeur d'entier court.
- StringType – Chaîne de texte.
- TimestampType – Valeur d'horodatage (généralement en secondes à partir du 01/01/1970).
- UnknownType – Valeur de type non identifié.

**Q13 - In this question we will apply our two UDF. Using the .withColumn operation, you can change a column, just like you did for type casting. withColumn will create two new columns : country2 and currency2.**

*Hint : df.withcolumn(country2, newColValue).withcolumn(currency2, newColValue)*

Also, you can add a drop statement (on country and currency) after the two withColumns, as we have created our new columns.

Check your dataframe once transformations are applied. Schema and first lines.

In [23]:
 dfCasted = dfCasted.withColumn("country2", udf_country(f.col("country"),f.col("currency")))\
   .withColumn("currency2", udf_currency(f.col("currency")))\
   .drop("country", "currency")
dfCasted.printSchema()
    

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- gobackers_countal: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)



In [28]:
dfCasted.select("country2", "currency2").show(5)

+--------+---------+
|country2|currency2|
+--------+---------+
|      US|      USD|
|      US|      USD|
|      US|      USD|
|      US|      USD|
|      US|      USD|
+--------+---------+
only showing top 5 rows



In [None]:
# TODO

**Q14 - We will do one more cleanup on the column final_status**, which will be the label for our classification algorithm in next course.

First, count the number of elements for each values in final_status.

Finally, we need to delete records with **final_status** different than 0 or 1.

In [29]:
# TODO : final_status count
dfCasted.select("final_status").count()

108129

In [36]:
dfCasted.filter(f.col("final_status")==1).count(),dfCasted.filter(f.col("final_status")==0).count()

(34419, 73266)

In [34]:
# TODO : filter
dfCasted.filter((f.col("final_status")==1)|(f.col("final_status")==0))\
        .select("final_status")\
        .show(5)

+------------+
|final_status|
+------------+
|           1|
|           0|
|           0|
|           1|
|           0|
+------------+
only showing top 5 rows



In [None]:
# TODO : Check processing

In [24]:
def is_valid_label(label:int)->bool:
    return label in [0,1]
udf_is_valide = f.udf(is_valid_label, BooleanType())
dfCasted = dfCasted.filter(udf_is_valide(f.col('final_status')))

In [66]:
dfCasted.select('final_status').show(5)

+------------+
|final_status|
+------------+
|           1|
|           0|
|           0|
|           1|
|           0|
+------------+
only showing top 5 rows



## Feature engineering

It's sometimes useful to add features to our dataframe, to help our model learning. We will work with the time data.

**Q15 - Our dates columns are in unix timestamps. We first need to convert them to dates.**

In [70]:
dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- gobackers_countal: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)



In [92]:
dfCasted.select("deadline","state_changed_at","created_at", "launched_at").show(2)

+----------+----------------+----------+-----------+
|  deadline|state_changed_at|created_at|launched_at|
+----------+----------------+----------+-----------+
|1241333999|      1241334017|1240600507| 1240602723|
|1242429000|      1242432018|1240960224| 1240975592|
+----------+----------------+----------+-----------+
only showing top 2 rows



In [25]:
# TODO
from pyspark.sql.functions import to_date
dfCasted = dfCasted.withColumn("deadline",f.from_unixtime('deadline').cast(DateType()) )\
        .withColumn("state_changed_at",f.from_unixtime('state_changed_at').cast(DateType()) )\
        .withColumn("created_at",f.from_unixtime('created_at').cast(DateType()) )\
        .withColumn("launched_at",f.from_unixtime('launched_at').cast(DateType())) \

dfCasted.select("deadline","state_changed_at","created_at", "launched_at")\
        .show(5)

+----------+----------------+----------+-----------+
|  deadline|state_changed_at|created_at|launched_at|
+----------+----------------+----------+-----------+
|2009-05-03|      2009-05-03|2009-04-24| 2009-04-24|
|2009-05-15|      2009-05-16|2009-04-28| 2009-04-29|
|2009-05-22|      2009-05-22|2009-05-12| 2009-05-12|
|2009-05-29|      2009-05-29|2009-04-29| 2009-04-29|
|2009-05-31|      2009-05-31|2009-05-01| 2009-05-01|
+----------+----------------+----------+-----------+
only showing top 5 rows



**Q16 - Add a **days_campaign** column, which represents the duration of the campaign, in days. This is the difference between *launched_at* and *deadline*. Here we work with a date difference.**

Add a **hours_prep**, which represents the number of hours of preparation. This is the difference between *created_at* and *launched_at*. You may round to 2 digits after comma. Here we work with a timestamp difference.

Finally, apply a filter : we want to delete records with **days_campaign** AND **hours_prep** equal to zero, and we want the records with **goal** greater than zero

In [None]:
df = df\
     .withColumn('hours_prepa', datediff(col('dead_line_clean'), coll('lauched_at_clean')))\
     .#round au lieu datediff 3600,2 hours_prepa diff

In [26]:
from pyspark.sql.functions import round, datediff

# Date difference : datediff(Col1, Col2)
dfCasted = dfCasted.withColumn("days_campaign",datediff(f.col("deadline"),f.col("launched_at")))
dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- deadline: date (nullable = true)
 |-- state_changed_at: date (nullable = true)
 |-- created_at: date (nullable = true)
 |-- launched_at: date (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- gobackers_countal: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)



In [28]:
?f.round

In [31]:
dfCasted = dfCasted.withColumn("hours_prep",f.round(datediff(f.col("launched_at"),f.col("created_at")),2))
dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- deadline: date (nullable = true)
 |-- state_changed_at: date (nullable = true)
 |-- created_at: date (nullable = true)
 |-- launched_at: date (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- gobackers_countal: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |--  hours_prep: integer (nullable = true)
 |-- hours_prep: integer (nullable = true)



In [35]:
# TODO : Filter
#Finally, apply a filter : we want to delete records with days_campaign AND hours_prep equal to zero, and we want the records with goal greater than zero

dfCasted.filter((f.col("days_campaign")==0)&(f.col("hours_prep")==0)&(f.col("goal")>0)).show(2)

+----------+----+----+----+--------+--------+----------------+----------+-----------+-------------+------------+-----------------+--------+---------+-------------+-----------+----------+
|project_id|name|desc|goal|keywords|deadline|state_changed_at|created_at|launched_at|backers_count|final_status|gobackers_countal|country2|currency2|days_campaign| hours_prep|hours_prep|
+----------+----+----+----+--------+--------+----------------+----------+-----------+-------------+------------+-----------------+--------+---------+-------------+-----------+----------+
+----------+----+----+----+--------+--------+----------------+----------+-----------+-------------+------------+-----------------+--------+---------+-------------+-----------+----------+



**Q17 - At this point, we don't need these columns anymore : *created_at*, *launched_at*, *deadline*.**

In [None]:
# TODO : Drops

We will now work on text data, we will gather every text values into one.

**Q18 - Pass the columns *name*, *desc* and *keywords* into lowercase.**

In [None]:
# A little search for passing strings to lower case ?

**Q19 - Create a new column called *text* which contains the three previous columns. Be careful to include a space between them so that we can split them later.**

In [None]:
from pyspark.sql.functions import concat_ws

# Hint : Google("pyspark concat_ws"), don't forget the separator parameter

**Q20 - You can now delete these three text columns.**

In [None]:
# TODO

## Processing null values

**Q21 - There are various techniques to handle null values to make them usable by an algorithm. Can you find 3 different methods ?**

In [None]:
# TODO : Write your answer

**Q22 - For the columns *days_campaign*, *hours_prep* and *goal* : replace null values by **-1**.**

In [None]:
# Look for na.fill at the following adress :
# https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

# TODO

**Q23 - For the columns **country2** and **currency2** : replace null values by **"unknown"**.**

In [None]:
# TODO

## Exporting Dataframe

Well done, you have done a pretty good pipeline for pre-processing your dataset.

**Q24 - Finally, export your dataframe to the *parquet* format.**

*parquet* always exports a folder that may contain multiple files, this is due to the distributed nature of Spark.

The export function creates a directory with the name given in parameter. Give it **"kickstarter.parquet"**.

In [None]:
# TODO