Upload data into HDFS

In [1]:
! hadoop fs -rm    -r  /tmp/kickstarter*
! hadoop fs -mkdir -p  /tmp/kickstarter
! hadoop fs -put   -p  ks-projects-201801.csv     /tmp/kickstarter/
! hadoop fs -ls        /tmp/kickstarter/ks-projects-201801.csv

20/06/08 12:03:50 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.


Deleted /tmp/kickstarter


-rwxr-xr-x   1 root root   58030359 2020-05-29 11:04 /tmp/kickstarter/ks-projects-201801.csv




In [2]:
// Import necessary libraries

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.evaluation.MulticlassMetrics

// Set log level to ERROR (less verbose)
sc.setLogLevel("ERROR")

// read data from HDFS into a spark dataframe
var df = spark
    .read
    .format("csv")
    .option("header","true")
    .option("inferSchema","true")
    .option("quote", "\"")
    .option("escape","\"")
    .load("hdfs://localhost:9000/tmp/kickstarter/ks-projects-201801.csv")

//caching for execution speed
df.cache()

Intitializing Scala interpreter ...

Spark Web UI available at http://8139f9c065d1:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1591617837926)
SparkSession available as 'spark'


import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.evaluation.MulticlassMetrics
df: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 13 more fields]
res0: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: int, name: string ... 13 more fields]


In [3]:
// This function creates a feature for the length of the project name sorted into discrete bins
def discretizeNameLength(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame={
    // Get the number of characters of each name
    var tmpdf = df.withColumn("namelength", length($"name"))
    // set number of bins to discretize name length
    val bins = 5
    // Use the built in QuantileDiscretizer to discretize the lenghts into 5 bins
    val discretizerLength = new QuantileDiscretizer()
      .setInputCol("namelength")
      .setOutputCol("namelengthBinned")
      .setNumBuckets(bins)
    // Fit the discretizer to the data and transform the dataframe
    discretizerLength.fit(tmpdf).transform(tmpdf)
        .na.fill(0,Array("namelengthBinned")) // Any NULL will be filled with 0
        .drop("namelength") // Remove the namelength column as this will not be required and return the transformed DataFrame
}

// This function drops invalid dates
// Kickstarter started in 2009 and any dates before this should be dropped from the dataset
def dropBadDates(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame = {
    df.filter((col("launched")>"2009-01-01 00:00:00")&&(col("deadline")>"2009-01-01 00:00:00"))
}

// This function formats the dates into a consistant format
// It also extracts features including the month and year from the dates as well as the project duration
def formatDates(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame = {
    df
    // The specific time the project is uploaded is not relevant to the model, so only the dates are formated as "yyyy-MM-dd"
    .withColumn("launched", date_format(col("launched"), "yyyy-MM-dd"))
    .withColumn("deadline", date_format(col("deadline"), "yyyy-MM-dd"))
    // Extract the month from the dates as an additional feature
    .withColumn("launched_month", date_format(col("launched"), "MM").cast(IntegerType))
    .withColumn("deadline_month", date_format(col("deadline"), "MM").cast(IntegerType))
    // Extract the year from the dates as an additional feature
    .withColumn("launched_year", date_format(col("launched"), "yyyy").cast(IntegerType))
    .withColumn("deadline_year", date_format(col("deadline"), "yyyy").cast(IntegerType))
    // Create a feature to show how long the project was opened for
    .withColumn("Duration(Days)",datediff(col("deadline"),col("launched")).cast(DoubleType))
}

// This function creates a feature which converts the project state column to a binary outcome
// It drops live and undefined projects as the outcome is undefined
// a 1 is assigned for "Successful" and 0 for everything else
def binaryState(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame={   
    df.filter((col("state")==="successful")||(col("state")==="failed"))
    .withColumn("stateBinary", when($"state" === "successful", 1).otherwise(0))  
}


def discretizeContinousVar(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame = {
    // The below code is modified from an example posted by user jarias 
    // on stackoverflow:
    // https://stackoverflow.com/questions/43639252/how-to-use-spark-quantilediscretizer-on-multiple-columns

    // number of discrete bins
    val bins = 100

    // define columns with DoubleType as continuous
    val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

    // apply QuantileDiscretizer on continuous columns, output as colname_discrete
    val discretizer = new QuantileDiscretizer()
      .setInputCols(continuous)
      .setOutputCols(continuous.map(x => s"${x}_discrete"))
      .setNumBuckets(bins) // set bins

    // apply discretizer to dataframe
    val pipeline = new Pipeline().setStages(Array(discretizer))
    val model = pipeline.fit(df)
    model.transform(df)
}


// This function checks that the currency and country columns are valid and drops invalid rows
def checkCurrencyCountry(df:org.apache.spark.sql.DataFrame):org.apache.spark.sql.DataFrame={ 
    // valid kickstarter currencies according to: 
    // https://www.kickstarter.com/blog/new-view-kickstarter-in-your-currency
    val currencyList = List("AUD","GBP","CAD","DKK","EUR","HKD",
                            "JPY","MXN","NZD","NOK","SGD","SEK",
                            "CHF","USD")
    // valid country codes for kickstarter based on: 
    // https://help.kickstarter.com/hc/en-us/articles/115005128594-Who-can-use-Kickstarter-
    // and
    // https://www.realifewebdesigns.com/web-marketing/abbreviations-countries.asp
    val validKsCountries = List("NL","MX","AT","HK","AU","CA","GB",
                                "DE","ES","US","FR","CH","SG","IT",
                                "SE","JP","NZ","IE","BE","NO","LU",
                                "DK") 

    // drops rows with invalid currencies or countries (if there are any)
    df.filter((col("currency").isin(currencyList:_*))&&(col("country").isin(validKsCountries:_*)))
}

// This function applys a series of functions to a dataframe and returns the transformed dataframe
def cleaningpipeline(df:org.apache.spark.sql.DataFrame, fns:org.apache.spark.sql.DataFrame=>org.apache.spark.sql.DataFrame*) = {
    var tmpdf = df
    for (fn<-fns) {
        tmpdf = fn(tmpdf)
    }
    tmpdf
}

discretizeNameLength: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
dropBadDates: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
formatDates: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
binaryState: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
discretizeContinousVar: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
checkCurrencyCountry: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
cleaningpipeline: (df: org.apache.spark.sql.DataFrame, fns: org.apache.spark.sql.DataFrame => org.apache.spark.sql.DataFrame*)org.apache.spark.sql.DataFrame


In [4]:
// Apply the pipeline fucntion to get a transformed and cleaned dataframe
var cleaneddf = cleaningpipeline(df,
                        dropBadDates,
                        formatDates,
                        discretizeContinousVar,
                        checkCurrencyCountry,
                        discretizeNameLength,
                        binaryState)

cleaneddf: org.apache.spark.sql.DataFrame = [ID: int, name: string ... 26 more fields]


In [5]:
// Drop features which will not be used by the ML model and rename the "stateBinary" column as the label
var dfSelected = cleaneddf.drop("ID", "name", "deadline","Duration(Days)",
                          "goal", "launched", "pledged", "state", "backers", "usd pledged", 
                          "usd_pledged_real", "usd_goal_real", "usd pledged_discrete", "goal_discrete", 
                          "pledged_discrete","usd_pledged_real_discrete")
                          .withColumnRenamed("stateBinary","label").cache() //caching for execution speed

// Create an array of column names which will be combined as features
var featureCols = dfSelected.drop("label").columns

// For each of the feature columns create a string indexer to transform them into a form that can be one-hot-encoded
val indexers = featureCols.map(name=>new StringIndexer()
    .setInputCol(name)
    .setOutputCol(name+"_indexed")
    .setHandleInvalid("keep"))

// One hot encode all of the feature columns
val encoder = new OneHotEncoderEstimator()
    .setInputCols(featureCols.map(_+"_indexed"))
    .setOutputCols(featureCols.map(_+"_encoded"))

// Make an assembler which takes the encoded output from the encoder and assembles them as vector
val assembler = (new VectorAssembler().setInputCols(encoder.getOutputCols).setOutputCol("features"))

// Split the original data into training and test sets using a 80/20% splt
val Array(training, test) = dfSelected.randomSplit(Array(0.8, 0.2),seed=42)

//caching for execution speed
training.cache()
test.cache()

// Create Logistic Regression model
// Parameters below were identified as optimal during the tuning phase of the project
val lr = new LogisticRegression().
        setFitIntercept(true).
        setMaxIter(100).
        setElasticNetParam(0).
        setTol(0).
        setStandardization(true).
        setRegParam(0.01).
        setThreshold(0.6).
        setElasticNetParam(0).
        setLabelCol("label").
        setFeaturesCol("features")

// Put everything into the pipeline
// This pipeline will take in the training dataset, index and encode features and add
// then assemble the required variables into the 'features' column, then run the logistic
// regression on it.
val pipeline = new Pipeline().setStages(indexers++
                                        Array(encoder,
                                             assembler,
                                             lr)) 

dfSelected: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [category: string, main_category: string ... 10 more fields]
featureCols: Array[String] = Array(category, main_category, currency, country, launched_month, deadline_month, launched_year, deadline_year, Duration(Days)_discrete, usd_goal_real_discrete, namelengthBinned)
indexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_02fc6f34599a, strIdx_545d7e3677e4, strIdx_67311c2c531a, strIdx_1946a039d494, strIdx_0cc5f3ce2c80, strIdx_16865631b86c, strIdx_d94343450b2e, strIdx_b3f9908ff06b, strIdx_748a92592383, strIdx_f6b5e4daad61, strIdx_8701264b898b)
encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_991b1e5481b8
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_77...

In [6]:
// Fit the pipleine to the training set 
val model = pipeline.fit(training)

model: org.apache.spark.ml.PipelineModel = pipeline_66d9e7bc5226


In [7]:
// Transform the test data using the model to predict whether a project will be successful or not
val results = model.transform(test)

// Put the actual and predicted in a RDD for analysis
val evaluation_results_only = results
                                    .select("label", "prediction")
                                    .as[(Double, Double)]
                                    .rdd.cache()

// Get the metrics of how the model performed on the test data
val metrics = new MulticlassMetrics(evaluation_results_only)

// Print confusion matrix
println("Confusion matrix:")
println(metrics.confusionMatrix)

// Print Overall Statistics
val accuracy = metrics.accuracy
println("\nSummary Statistics:")
println(s"Accuracy = $accuracy")

// Precision by label
val labels = metrics.labels
labels.foreach { l =>
println(s"Precision($l) = " + metrics.precision(l))
}

// Recall by label
labels.foreach { l =>
println(s"Recall($l) = " + metrics.recall(l))
}

// False positive rate by label
labels.foreach { l =>
println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}

// F-measure by label
labels.foreach { l =>
println(s"F1-Score($l) = " + metrics.fMeasure(l))
}

// Weighted stats
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
       

Confusion matrix:
35678.0  17886.0  
3832.0   8816.0   

Summary Statistics:
Accuracy = 0.6719929922068507
Precision(0.0) = 0.9030118957226019
Precision(1.0) = 0.3301625346415999
Recall(0.0) = 0.6660816966619372
Recall(1.0) = 0.6970271979759646
FPR(0.0) = 0.3029728020240354
FPR(1.0) = 0.3339183033380629
F1-Score(0.0) = 0.7666587876313472
F1-Score(1.0) = 0.4480813214739517
Weighted precision: 0.793584620924189
Weighted recall: 0.6719929922068507
Weighted F1 score: 0.705803235889084
Weighted false positive rate: 0.308884097568949


results: org.apache.spark.sql.DataFrame = [category: string, main_category: string ... 36 more fields]
evaluation_results_only: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[314] at rdd at <console>:51
metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@3716b2dd
accuracy: Double = 0.6719929922068507
labels: Array[Double] = Array(0.0, 1.0)
