# Spark Pipeline on Kickstarter Pledge Dataset

## 1. Overview

### 1.1. Instructions

- **Choosing any sufficiently large open dataset** (less than 100000 lines are not allowed)


- **Choosing one variable to predict**


- **Implementing at least two supervised learning models**: classification, regression, recommender system, etc. Unsupervised tasks (e.g. clusterisation, associative rules, etc.) are not allowed


- **Mandatory use of Apache Spark** (e.g. on Google Cloud as we did during our lab sessions)


- A **full machine learning pipeline must be implemented**, which include:
    - Reading the data
    - Transforming data (extracting features, dealing with missing values if any, etc.)
    - Building models (build at least two models to compare)
    - Evaluating quality (use cross-validation or train/test split)

### 1.2. Dataset

### 1.3. Summary & Conclusion

The notebook was also ran locally using the installation steps for Spark described [here](https://sparkbyexamples.com/spark/spark-installation-on-linux-ubuntu/).

## 2. Environment Set-Up

We need the following libraries installed to set up the environment:

- kaggle (see documentation [here](https://github.com/Kaggle/kaggle-api#datasets))
- pyspark (see documentation [here](https://spark.apache.org/docs/latest/api/python/index.html))

In [1]:
!pip install kaggle
!pip install pyspark



## 3. Dataset Download

In [2]:
# Removes existing files that may have been downloaded locally
!rm -f kickstarter-projects.zip
!rm -f ks-projects-201612.csv ks-projects-201801.csv

<span style="color:red">**On Google Cloud**: To download the kaggle dataset we have to upload our kaggle.json file in /root/.kaggle. the kaggle.json file can be downloaded here:</span>

> ``https://www.kaggle.com/<username>/account``
 
<span style="color:red">**Locally**: To download the kaggle dataset we have to upload our kaggle.json file in ~/.kaggle.</span>
    
<span style="color:red">Run the cell below when using Google Cloud:</span>

In [3]:
# Dowloads the raw dataset from the kaggle source
!kaggle datasets download -d kemical/kickstarter-projects

Downloading kickstarter-projects.zip to /home/qlr/Programming/kickstarter_pledge_prediction
 98%|█████████████████████████████████████▏| 36.0M/36.8M [00:03<00:00, 7.85MB/s]
100%|██████████████████████████████████████| 36.8M/36.8M [00:04<00:00, 9.64MB/s]


In [4]:
# Unzips the raw dataset and keeps only the most recent instance
!unzip kickstarter-projects.zip
!rm -f ks-projects-201612.csv kickstarter-projects.zip
!ls

Archive:  kickstarter-projects.zip
  inflating: ks-projects-201612.csv  
  inflating: ks-projects-201801.csv  
draft_run1_DataProc.ipynb  README.md
ks-projects-201801.csv	   spark_pipeline_vDef.ipynb


We only keep 'ks-projects-201801.csv', the most recent dataset available.

<span style="color:red">Run the cell below when using Google Cloud:</span>

## 4. Library Imports & Spark Variables

In [5]:
from pyspark.context import SparkContext
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import unix_timestamp, ceil, isnan, when, count, col
from pyspark.sql.session import SparkSession
from pyspark.sql.types import FloatType

In [6]:
dataset_path = "ks-projects-201801.csv" #if run on a local computer
#dataset_path = "/user/qlr/ks-projects-201801.csv" # if run on Google Cloud
dataset_format = "csv"
spark_context = "local" #if run on a local computer
#spark_context = "yarn" #if run on Google Cloud

## 5. Starting a Spark Session & Loading the Dataset

<span style="color:red">Comment out the following cell when when running the notebook Google Cloud as a spark session is automatically instantiated.</span>

In [7]:
sc = SparkContext(spark_context)
spark = SparkSession(sc)

In [8]:
spark #Spark UI on Google Cloud should return v2.3.4 (version), yarn (Master), PySparkShell (AppName)

In [9]:
campaigns = (spark
             .read
             .format(dataset_format)
             .options(header=True)
             .load(dataset_path))

## 6. Data Preprocessing

In [10]:
raw_columns_to_keep = [
    "ID","name","category","deadline","launched","country","usd_goal_real", #features
    "state" # target
]

replace_start_end_dates_with_duration = [
    "ID","name","category","total_duration","country","usd_goal_real", #features
    "state" # target
]

kept_columns_for_decision_tree = [
    "total_duration","usd_goal_real","category","country", #features
    "state" # target
]

deadline_format = "yyyy-MM-dd"
launched_format = "yyyy-MM-dd HH:mm:ss"

In [11]:
# Checks the type of each columns
campaigns.dtypes

[('ID', 'string'),
 ('name', 'string'),
 ('category', 'string'),
 ('main_category', 'string'),
 ('currency', 'string'),
 ('deadline', 'string'),
 ('goal', 'string'),
 ('launched', 'string'),
 ('pledged', 'string'),
 ('state', 'string'),
 ('backers', 'string'),
 ('country', 'string'),
 ('usd pledged', 'string'),
 ('usd_pledged_real', 'string'),
 ('usd_goal_real', 'string')]

In [12]:
# Drops NAs, Nulls, and Duplicates as pySpark can raise an error during .fit() procedures
campaigns = campaigns.dropna()
campaigns = campaigns.dropDuplicates()
for column in campaigns.columns:
    campaigns = campaigns.where(col(column).isNotNull())

# Checks for N/A
campaigns.select([count(when(isnan(c), c)).alias(c) for c in campaigns.columns]).show()

+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+
| ID|name|category|main_category|currency|deadline|goal|launched|pledged|state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+
|  0|   0|       0|            0|       0|       0|   0|       0|      0|    0|      0|      0|          0|               0|            0|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+



In [13]:
# Keeps only the relevant columns
campaigns = campaigns.select(raw_columns_to_keep)
# --
print("The dataset contains " + str(campaigns.count()) + " rows.")
campaigns.show(n=5)

The dataset contains 374855 rows.
+----------+--------------------+--------------+----------+-------------------+-------+-------------+----------+
|        ID|                name|      category|  deadline|           launched|country|usd_goal_real|     state|
+----------+--------------------+--------------+----------+-------------------+-------+-------------+----------+
|  10018239|             Borders|         Drama|2016-04-08|2016-02-25 17:40:34|     GB|      4926.39|    failed|
|1002250421|Spiele für iOS un...|  Mobile Games|2015-06-24|2015-06-03 17:12:38|     DE|      2240.39|  canceled|
|1002289150|Odyssey Skateboar...|Graphic Design|2014-12-20|2014-11-20 06:55:12|     US|       700.00|    failed|
|1005414218|Debut EP Album Pr...|           R&B|2014-11-26|2014-10-27 17:46:28|     US|      5500.00|    failed|
|1005696636|GBS Detroit Prese...|    Indie Rock|2013-06-08|2013-05-23 21:05:24|     US|      1200.00|successful|
+----------+--------------------+--------------+----------+---

In [14]:
# Computes a duration time (in day) from the launch and deadline features before dropping them
launch_times = unix_timestamp('launched', format = launched_format)
deadline_times = unix_timestamp('deadline', format = deadline_format)
time_difference = deadline_times - launch_times
campaigns = campaigns.\
    withColumn("total_duration",ceil(time_difference/(3600*24))).\
    select(replace_start_end_dates_with_duration)
# --
campaigns.show(n=5)

+----------+--------------------+--------------+--------------+-------+-------------+----------+
|        ID|                name|      category|total_duration|country|usd_goal_real|     state|
+----------+--------------------+--------------+--------------+-------+-------------+----------+
|  10018239|             Borders|         Drama|            43|     GB|      4926.39|    failed|
|1002250421|Spiele für iOS un...|  Mobile Games|            21|     DE|      2240.39|  canceled|
|1002289150|Odyssey Skateboar...|Graphic Design|            30|     US|       700.00|    failed|
|1005414218|Debut EP Album Pr...|           R&B|            30|     US|      5500.00|    failed|
|1005696636|GBS Detroit Prese...|    Indie Rock|            16|     US|      1200.00|successful|
+----------+--------------------+--------------+--------------+-------+-------------+----------+
only showing top 5 rows



In [15]:
# Cleans the target labels
# 'undefied', 'live' -> dropped
# 'suspended', 'cancelled' -> renamed to 'failed'
for condition in ['state!="undefined"', 'state!="live"']:
    campaigns = campaigns.where(condition)
campaigns = campaigns.\
    withColumn("state",when(col("state") == "canceled", "failed").\
    when(col("state") == "suspended", "failed").\
    when(col("state") == "failed", "failed").\
    otherwise("successful"))
campaigns.select("state").groupBy('state').count().orderBy(col("count").desc()).show()

+----------+------+
|     state| count|
+----------+------+
|    failed|237451|
|successful|134609|
+----------+------+



In [16]:
# Casts the relevant column(s) to their end types
for column in ["total_duration", "usd_goal_real"]:
    campaigns = campaigns.withColumn(column,col(column).cast(FloatType()))

### 6.1. Exploring the dataset

### 6.2. Finalizing our dataset for a decision tree classifier

In [17]:
# Removes ID and name from the dataset
decision_tree_dataset = campaigns.select(kept_columns_for_decision_tree)
# --
print(f"The dataset contains " + str(decision_tree_dataset.count()) + " rows.")
decision_tree_dataset.show(n=5)
campaigns.dtypes

The dataset contains 372060 rows.
+--------------+-------------+--------------+-------+----------+
|total_duration|usd_goal_real|      category|country|     state|
+--------------+-------------+--------------+-------+----------+
|          43.0|      4926.39|         Drama|     GB|    failed|
|          21.0|      2240.39|  Mobile Games|     DE|    failed|
|          30.0|        700.0|Graphic Design|     US|    failed|
|          30.0|       5500.0|           R&B|     US|    failed|
|          16.0|       1200.0|    Indie Rock|     US|successful|
+--------------+-------------+--------------+-------+----------+
only showing top 5 rows



[('ID', 'string'),
 ('name', 'string'),
 ('category', 'string'),
 ('total_duration', 'float'),
 ('country', 'string'),
 ('usd_goal_real', 'float'),
 ('state', 'string')]

## 7. Running a Logistic Regression Pipeline

### 7.1. Creating a data processing pipeline

### 7.2. Creating a model pipeline

#### 7.2.1. Building and fitting the model

#### 7.2.2. Evaluating the model

## 8. Running a DecisionTreeClassifier Pipeline

### 8.1. Creating a data processing pipeline

We will rely on indexing and assembling our data pipeline using the following stages:
- **StringIndexer** for all categorical columns
- **OneHotEncoder** for all categorical index columns
- **VectorAssembler** for all feature columns into one vector column

In [64]:
# Declares hyperparameters
training_size = 0.7
test = 0.3
max_depth_grid = list(range(2,10))

In [19]:
# Creates pipeline stages to string index each categorical feature column, and the label column
categorical_feature_columns = decision_tree_dataset.columns[2:4]
string_indexing_feature_columns = [StringIndexer(inputCol=column, 
                                                 outputCol='strindexed_' + column,
                                                 handleInvalid="skip")
                                   for column in categorical_feature_columns]
string_indexing_label_column = [StringIndexer(inputCol='state', 
                                              outputCol='label',
                                              handleInvalid="skip")]

In [20]:
# Creates pipeline stages to one-hot encode each categorical feature column
if spark_context == "local":
    onehot_encoding_feature_columns = [OneHotEncoder(inputCol='strindexed_' + column, 
                                                     outputCol='onehot_' + column,
                                                  handleInvalid="keep") 
                                      for column in categorical_feature_columns]
else: #Google Cloud's version of PySpark does not support/need handleInvalid attributes
    onehot_encoding_feature_columns = [OneHotEncoder(inputCol='strindexed_' + column, 
                                                 outputCol='onehot_' + column) 
                                  for column in categorical_feature_columns]

In [21]:
# Creates a pipeline stage to vector assemble each categorical feature column
processed_feature_columns = list(map(lambda col_name: "onehot_" + col_name, categorical_feature_columns))
processed_feature_columns += ["total_duration", "usd_goal_real"]

if spark_context == "local":
    vectorassembler_stage = VectorAssembler(inputCols=processed_feature_columns, 
                                            outputCol='features',
                                            handleInvalid="skip")
else: #Google Cloud's version of PySpark does not support/need handleInvalid attributes
    vectorassembler_stage = VectorAssembler(inputCols=processed_feature_columns, 
                                            outputCol='features')

In [22]:
# Assembles the data processing pipeline
data_processing_pipeline = Pipeline(
    stages = string_indexing_feature_columns +
    string_indexing_label_column + 
    onehot_encoding_feature_columns + 
    [vectorassembler_stage]
)

In [23]:
# Fits the data processing pipeline
pipeline_model = data_processing_pipeline.fit(decision_tree_dataset)

In [24]:
final_columns = processed_feature_columns + ['features', 'label']
decision_tree_dataset_prepped = pipeline_model.transform(decision_tree_dataset).select(final_columns)
# --
print(f"The dataset contains " + str(decision_tree_dataset_prepped.count()) + " rows.")        
decision_tree_dataset_prepped.show(5)

The dataset contains 370775 rows.
+------------------+---------------+--------------+-------------+--------------------+-----+
|   onehot_category| onehot_country|total_duration|usd_goal_real|            features|label|
+------------------+---------------+--------------+-------------+--------------------+-----+
| (1429,[49],[1.0])|(225,[1],[1.0])|          43.0|      4926.39|(1656,[49,1430,16...|  0.0|
| (1429,[57],[1.0])|(225,[4],[1.0])|          21.0|      2240.39|(1656,[57,1433,16...|  0.0|
| (1429,[52],[1.0])|(225,[0],[1.0])|          30.0|        700.0|(1656,[52,1429,16...|  0.0|
|(1429,[107],[1.0])|(225,[0],[1.0])|          30.0|       5500.0|(1656,[107,1429,1...|  0.0|
| (1429,[20],[1.0])|(225,[0],[1.0])|          16.0|       1200.0|(1656,[20,1429,16...|  1.0|
+------------------+---------------+--------------+-------------+--------------------+-----+
only showing top 5 rows



### 8.2. Creating a model pipeline using cross-validation

#### 8.2.1. Building and fitting the model

In [26]:
# Builds the estimator
decision_tree_with_crossvalidation = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [27]:
# Builds a parameter grid
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, max_depth_grid).\
    build()

In [28]:
# Builds the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [29]:
# Builds the cross-validation model
cv = CrossValidator(estimator=decision_tree_with_crossvalidation, 
                    estimatorParamMaps=param_grid, 
                    evaluator=evaluator, 
                    numFolds=5)

In [31]:
# Fits the cross-validation model
cv_model = cv.fit(decision_tree_dataset_prepped)

#### 8.2.2. Evaluating the model

In [32]:
# Predicts on training data
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_training_cv = cv_model.transform(decision_tree_dataset_prepped)
pred_training_cv.select(show_columns).show(5)

+--------------------+-----+----------+-----------------+--------------------+
|            features|label|prediction|    rawPrediction|         probability|
+--------------------+-----+----------+-----------------+--------------------+
|(1656,[0,1429,165...|  1.0|       1.0|[16269.0,19840.0]|[0.45055249383810...|
|(1656,[0,1429,165...|  0.0|       0.0| [22014.0,9358.0]|[0.70170852989927...|
|(1656,[0,1429,165...|  1.0|       1.0|[16269.0,19840.0]|[0.45055249383810...|
|(1656,[0,1429,165...|  1.0|       0.0| [22014.0,9358.0]|[0.70170852989927...|
|(1656,[0,1429,165...|  1.0|       1.0|[16269.0,19840.0]|[0.45055249383810...|
+--------------------+-----+----------+-----------------+--------------------+
only showing top 5 rows



In [46]:
# Provides a confusion matrix
label_and_pred = cv_model.transform(decision_tree_dataset_prepped).select('label', 'prediction')
confusion_matrix = label_and_pred.rdd.zipWithIndex().countByKey()

In [50]:
def process_confusion_matrix(matrix):
    items = []
    for item in matrix: 
        items.append(item)
        print(item, matrix[item])
    true_positives = matrix[items[2]]
    true_negatives = matrix[items[0]]
    false_positives = matrix[items[1]]
    false_negatives = matrix[items[3]]
    precision = true_positives/(true_positives+false_positives)
    recall = true_positives/(true_positives+false_negatives)
    print("\nPrecision score:", precision)
    print("Recall score:", recall)
    print("F1 score:", (precision*recall)/(precision+recall))
    
process_confusion_matrix(confusion_matrix)

Row(label=0.0, prediction=0.0) 210815
Row(label=0.0, prediction=1.0) 26636
Row(label=1.0, prediction=1.0) 37338
Row(label=1.0, prediction=0.0) 95986

Precision score: 0.5836433551130147
Recall score: 0.28005460382226754
F1 score: 0.18924672323084876


In [35]:
print('The best MaxDepth is:', cv_model.bestModel._java_obj.getMaxDepth())

The best MaxDepth is: 8


In [83]:
print(cv_model.bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_a85fc1ee317d, depth=8, numNodes=291, numClasses=2, numFeatures=1656


### 8.3. Creating a model pipeline using Train-Test split

#### 8.3.1. Building and fitting the model

In [75]:
# Splits the dataset between training and validation sets
training, test = decision_tree_dataset_prepped.randomSplit([training_size, test], seed=0)

TypeError: '<' not supported between instances of 'DataFrame' and 'float'

In [84]:
# Builds the estimator
decision_tree_with_traintestsplit = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [85]:
# Builds the evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [86]:
# Fits the cross-validation model
traintest_model = decision_tree_with_traintestsplit.fit(training)

#### 8.3.2. Evaluating the model

In [87]:
# Predicts on training data
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_test = traintest_model.transform(test)
pred_test.select(show_columns).show(5)

+--------------------+-----+----------+-----------------+--------------------+
|            features|label|prediction|    rawPrediction|         probability|
+--------------------+-----+----------+-----------------+--------------------+
|(1656,[0,1429,165...|  1.0|       1.0|[11984.0,14733.0]|[0.44855335554141...|
|(1656,[0,1429,165...|  1.0|       1.0|[11984.0,14733.0]|[0.44855335554141...|
|(1656,[0,1429,165...|  1.0|       0.0|[76302.0,24381.0]|[0.75784392598551...|
|(1656,[0,1429,165...|  0.0|       0.0|[29035.0,15749.0]|[0.64833422650946...|
|(1656,[0,1429,165...|  0.0|       0.0|[29035.0,15749.0]|[0.64833422650946...|
+--------------------+-----+----------+-----------------+--------------------+
only showing top 5 rows



In [88]:
accuracy = evaluator.evaluate(pred_test)
print("The test error is", 1.0 - accuracy)

The test error is 0.629766864951931


In [89]:
print(traintest_model)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_8e97afc23f0e, depth=5, numNodes=21, numClasses=2, numFeatures=1656
