# Apache Spark Data Pipeline

## Imports

In [3]:
println(sc)
println(spark)
import org.apache.spark.sql.DataFrame
val sqlc = spark
import sqlc.sql
import sqlc.implicits._
import org.apache.spark.ml.feature.{HashingTF, StringIndexer, VectorAssembler}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{TrainValidationSplit, ParamGridBuilder, CrossValidator}
// import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.GBTRegressor
// org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.Pipeline

import scala.math.abs

org.apache.spark.SparkContext@37120b93
org.apache.spark.sql.SparkSession@4986ea1f


## Pipeline Operators

In [4]:
implicit class RichPipes[Y](y: Y) {
  def |>[Z](f: Y => Z) = f(y)
  def &>[X, Z](f: (X, Y) => Z): (X => Z) = (x: X) => f(x, y)
}
implicit class RichFunction2[A,B,Z](f: (A,B) => Z) {
  def %(b: B): (A => Z) = (a: A) => f(a,b)
}

## Helper Functions

// 
def table(tbl_name: String) = sql(s"SELECT * FROM $tbl_name")

def cols_to_lower(df: DataFrame): Array[String] 
  = df.columns.map(upper => upper + " AS " + upper.toLowerCase)

def take_to_str(x: Array[String], take_amt: Int): String 
  = x.take(take_amt).mkString(", ")

def selectStmt1(df_str: String = "df"): String => DataFrame 
  = (cols: String) => sql(s"SELECT $cols FROM $df_str")

def selectStmt2(df: DataFrame, df_str: String = "df"): String => DataFrame 
  = (cols: String) => {
    df.createOrReplaceTempView(df_str);
    val result = sql(s"SELECT $cols FROM $df_str")
    sql(s"drop table $df_str")
    result
  }

def show(limit: Int): Unit = (df: DataFrame) => df.show(limit)
def take(limit: Int) = (df: DataFrame) => df.take(limit)
def grouped(size: Int) = (x: Array[String]) => x.grouped(size).toList

def map_cols(f: Array[String] => String) = (x: List[Array[String]]) => x.map(f)
def map_df(f: String => DataFrame) = (x: List[String]) => x.map(f)
def map_show(f: DataFrame => Unit) = (x: List[DataFrame]) => x.map(f)

def groupedSelects(take_amt: Int)
  = (df: DataFrame) => { 
    df |> cols_to_lower |> grouped(take_amt) |> 
      map_cols(take_to_str _ % take_amt) |> map_df(selectStmt2(df, "df")) 
}
      
def take_cols(df: DataFrame, take_amt: Int): DataFrame = {   
  val df_str = "df"; 
  val result = { 
    df.createOrReplaceTempView(df_str);
    df |> cols_to_lower |> take_to_str _ % take_amt; 
  } |> selectStmt1(df_str)
  sql(s"drop table $df_str")
  result
}

def display(cols: Int, rows: Int) 
  = (df: DataFrame) => { df |> groupedSelects(cols) } map(_.show(rows))

def createOrReplaceTempView(view_name: String) 
  = (df: DataFrame) => df.createOrReplaceTempView(view_name)

## Read "On-Time Performance" Dataset

In [55]:
val source = {
  spark.read.format("csv").option("header", "true").load("data/")
    .createOrReplaceTempView("source")
  val col_num = table("source").columns.size - 1
  table("source") |> take_cols _ % col_num
}
// val delay_group 
//   = spark.read.format("csv").option("header", "true").load("lookups/DelayGroups/")//.orderBy("code")
// delay_group.createOrReplaceTempView("delay_group")

"'On-Time Performance'" |> println 
"Record Count: "+source.count |> println 
source |> display(7,5)

'On-Time Performance'
Record Count: 495827
+----+-----+------------+-----------+----------+--------------+------+
|year|month|day_of_month|day_of_week|   fl_date|unique_carrier|fl_num|
+----+-----+------------+-----------+----------+--------------+------+
|2016|    7|          22|          5|2016-07-22|            F9|  1462|
|2016|    7|          22|          5|2016-07-22|            F9|  1595|
|2016|    7|          22|          5|2016-07-22|            F9|  1596|
|2016|    7|          22|          5|2016-07-22|            F9|  1465|
|2016|    7|          22|          5|2016-07-22|            F9|   504|
+----+-----+------------+-----------+----------+--------------+------+
only showing top 5 rows

+-----------------+------+---------------+----+------------+--------+---------+
|origin_airport_id|origin|dest_airport_id|dest|crs_dep_time|dep_time|dep_delay|
+-----------------+------+---------------+----+------------+--------+---------+
|            10397|   ATL|          12953| LGA|      

List((), (), (), (), (), (), ())

## Feature Engineering: 
##### Kept the use of UDFs to a minimum because their execution isn't optimized & I didn't want to spend \$ on AWS EMR

#### SQL Expression String Interpolations

In [26]:
// Delay Cause Indicator
def max_of_cols(cols: Seq[String]) 
  = (a: Double, b: Double, c: Double, d: Double, e: Double) => {
  cols.zip(List(a,b,c,d,e)).sortWith(_._2 > _._2).head._1
  }
val col_name_seq = Seq("carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay")
val max_of_cols_udf = spark.udf.register("max_of_cols", max_of_cols(col_name_seq))

// Weekday Indicator
val taxi_time = "ABS(taxi_in + taxi_out) AS taxi_time"
val delay_time = "arr_delay + dep_delay AS delay_time"
val air_delay_time = "actual_elapsed_time - crs_elapsed_time AS air_delay_time"

// Arrival/Departure Taxi & Delay Indicators
def time_ind(arr: String, del: String, alias: String) 
    = s"CASE WHEN $arr > $del THEN 'arr/in' WHEN $arr < $del THEN 'del/out' ELSE 'both' END AS $alias"
val taxi_ind = time_ind("taxi_in","taxi_out","taxi_ind")
val delay_ind = time_ind("arr_delay","dep_delay","delay_ind")

// Weekday Indicator
def day_of_year(date_str: String) = s"date_format($date_str, 'EEE')"
def weekpart(day_expr: String) 
  = s"CASE WHEN $day_expr NOT IN ('Sat', 'Sun') THEN 'wd' ELSE 'we' END"
def weekday(date_str: String) = { date_str |>  day_of_year |> weekpart } + " AS weekday"

// Cartesian Coordinate 
def cc(a: Int, z: Int) 
  = (v: String) => { 
    val i = List(a,z).map(_.toString); val A = i(0); val Z = i(1);
    val Pi = scala.math.Pi
    (s"sin(2*$Pi*($v-$A)/($Z-$A))", s"cos(2*$Pi*($v-$A)/($Z-$A))")
  }
def months_of_year_cc(date_str: String) = s"dayofmonth($date_str)" |> cc(0,12)
def week_of_year_cc(date_str: String) = s"weekofyear($date_str)" |> cc(0,52)
def day_of_week_cc(date_str: String) = s"date_format($date_str, 'D') + 1" |> cc(0,7)
def minute_of_day_cc(hhmm: String) = {
  s"(INT(SUBSTRING($hhmm,1,2)) * 60) + INT(SUBSTRING($hhmm,3,2))" |> cc(0,1440)
}
val (x_moy, y_moy) = "fl_date" |> months_of_year_cc
val (x_woy, y_woy) = "fl_date" |> week_of_year_cc
val (x_dow, y_dow) = "fl_date" |> day_of_week_cc
val (x_whls_on_mod, y_whls_on_mod) = "wheels_on" |> minute_of_day_cc
val (x_whls_off_mod, y_whls_off_mod) = "wheels_off" |> minute_of_day_cc

// UNUSED: Pre-Processing for Feature Hashing Trick
def stringify(include: Seq[String], exclude: Seq[String]) 
  =  include.diff(exclude
           ).map(x => s"CONCAT('$x=',$x)"
           ).zip(1 to include.size
           ).map(t => t._1+" AS str"+t._2
           ).mkString(", ")

// Create Bucket Case Statements
val splits: Array[Double] = Array(Double.NegativeInfinity) ++ (
    -15 to 180 by 15).map(_.toDouble).toSeq ++ Array(Double.PositiveInfinity)

def get_bucket_cases(x: String, y: String)(splits: Array[AnyVal]) = {
  val buckets = splits.sliding(2).toArray.map(_.mkString(" to "))
  val bucket_map = 1 to buckets.size map(_ - 1) zip buckets
  val cases = bucket_map.map(m => x+"_bkidx = "+m._1+" THEN '"+m._2+"'").mkString(" WHEN ")  
  "CASE WHEN "+cases+" END AS "+y
  }

val Seq(taxi_time_btk, air_delay_time_btk, delay_time_btk) 
  = Seq("taxi_time", "air_delay_time", "delay_time"
       ).map(x => splits |> get_bucket_cases(x,x+"_bkt"))

// UNUSED: Creat Java SQL Timestamp
def create_ts(date_str: String) = s"date_format($date_str, 'yyyy-MM-dd 00:00:00')"

## Interpolated SQL Expressions 

In [27]:
"\n"+"** Potential Labels **"  |> println
s"$taxi_time"                  |> println
s"$delay_time"                 |> println
"\n"+"** Time Indicators **"   |> println
s"$taxi_ind"                   |> println
s"$delay_ind"                  |> println
"\n"+"** Weekday **"           |> println
val wd = weekday("date_str")
s"$wd"                         |> println
"** Cartesian Coordinates **"  |> println
"\n"+"** months_of_year_cc **" |> println
s"x = $x_moy"+s", y = $y_moy"  |> println
"\n"+"** week_of_year_cc **"   |> println
s"x = $x_woy"+s", y = $y_woy"  |> println
"\n"+"** day_of_week_cc **"    |> println
s"x = $x_dow"+s", y = $y_dow"  |> println
"\n"+"** minite_of_day_cc **"  |> println
s"x = $x_whls_on_mod"+s", y = $y_whls_on_mod"   |> println
s"x = $x_whls_off_mod"+s", y = $y_whls_off_mod" |> println
"\n"+"** bucket case statement **"              |> println
"- taxi_time_btk"                               |> println
taxi_time_btk                                   |> println
"\n"+"- air_delay_time_btk"                     |> println
air_delay_time_btk                              |> println
"\n"+"- delay_time_btk"                         |> println
delay_time_btk                                  |> println

"\n"+"** Stringify **" |> println
val (xSeq, ySeq) = (Seq("a","b","c","d","e","f","g"), Seq("e","f","g"))
stringify(xSeq, ySeq)  |> println


** Potential Labels **
ABS(taxi_in + taxi_out) AS taxi_time
arr_delay + dep_delay AS delay_time

** Time Indicators **
CASE WHEN taxi_in > taxi_out THEN 'arr/in' WHEN taxi_in < taxi_out THEN 'del/out' ELSE 'both' END AS taxi_ind
CASE WHEN arr_delay > dep_delay THEN 'arr/in' WHEN arr_delay < dep_delay THEN 'del/out' ELSE 'both' END AS delay_ind

** Weekday **
CASE WHEN date_format(date_str, 'EEE') NOT IN ('Sat', 'Sun') THEN 'wd' ELSE 'we' END AS weekday
** Cartesian Coordinates **

** months_of_year_cc **
x = sin(2*3.141592653589793*(dayofmonth(fl_date)-0)/(12-0)), y = cos(2*3.141592653589793*(dayofmonth(fl_date)-0)/(12-0))

** week_of_year_cc **
x = sin(2*3.141592653589793*(weekofyear(fl_date)-0)/(52-0)), y = cos(2*3.141592653589793*(weekofyear(fl_date)-0)/(52-0))

** day_of_week_cc **
x = sin(2*3.141592653589793*(date_format(fl_date, 'D') + 1-0)/(7-0)), y = cos(2*3.141592653589793*(date_format(fl_date, 'D') + 1-0)/(7-0))

** minite_of_day_cc **
x = sin(2*3.141592653589793*((INT(SUBST

#### Transfromers

In [28]:
// Bucketizer
def bcktzr(splits: Array[Double])(x: String, y: String)
  = (df: DataFrame) => {    
    new Bucketizer()
      .setInputCol(x)
      .setOutputCol(y)
      .setSplits(splits)
      .transform(df) 
}
// String Indexer
def strIndxr(cols_for_idx: Array[String]) 
  = (df: DataFrame) => {
    def strIndxr(col: String) 
      = new StringIndexer().setInputCol(col).setOutputCol(col+"_idx")
    val strIndxr_stages = cols_for_idx.map(strIndxr)
    val strIndxr_pipeline = new Pipeline().setStages(strIndxr_stages)
    strIndxr_pipeline.fit(df).transform(df)
  }

## Pre-Processing & Feature Engineering

In [56]:
def pre_process(df: DataFrame) = {
  def filter_null(cols: Seq[String]) 
    = cols map{(col_str: String) => s"$col_str IS NOT null"} mkString(" AND ") 
  val delay_causes = Seq("carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay")
  
  val stage1 
    = df.selectExpr(taxi_time, air_delay_time, delay_time, "cancelled",
        "DOUBLE(year)", "DOUBLE(month)", "DOUBLE(day_of_month)", "DOUBLE(day_of_week)", "fl_date", weekday("fl_date"),
        x_moy+" AS x_fl_date_moy", y_moy+" AS y_fl_date_moy", 
        x_woy+" AS x_fl_date_woy", y_woy+" AS y_fl_date_woy", 
        x_dow+" AS x_fl_date_dow", y_dow+" AS y_fl_date_dow", 
        "unique_carrier", "fl_num", taxi_ind, delay_ind,
        "origin", "dest", "dep_del15", "arr_del15", "arr_delay_group", "dep_delay_group", 
        x_whls_on_mod+" AS x_whls_on_mod", y_whls_on_mod+" AS y_whls_on_mod",
        x_whls_off_mod+" AS x_whls_off_mod", y_whls_off_mod+" AS y_whls_off_mod",
        "flights", "distance", "distance_group", 
        "DOUBLE(carrier_delay)", "DOUBLE(weather_delay)", "DOUBLE(nas_delay)", 
        "DOUBLE(security_delay)", "DOUBLE(late_aircraft_delay)"
    ).where(delay_causes |> filter_null
    ).selectExpr("*", 
      "max_of_cols(carrier_delay, weather_delay, nas_delay, security_delay, late_aircraft_delay) AS delay_cause")
    stage1.createOrReplaceTempView("stage1")
    val stage2 = { 
      stage1 |> bcktzr(splits)("taxi_time", "taxi_time_bkidx") |> bcktzr(splits)("delay_time", "delay_time_bkidx") |>
         bcktzr(splits)("air_delay_time", "air_delay_time_bkidx") 
    }.selectExpr("*", taxi_time_btk, delay_time_btk, air_delay_time_btk)
    stage2.createOrReplaceTempView("stage2")
    stage2
}

In [60]:
{ source |> pre_process }.count()

103485

In [31]:
{ source |> pre_process }.select("taxi_time_bkidx", "delay_time_bkidx", "air_delay_time_bkidx", 
                                 "taxi_time_bkt", "delay_time_bkt", "air_delay_time_bkt") |> display(6,5)

+---------------+----------------+--------------------+-------------+--------------+------------------+
|taxi_time_bkidx|delay_time_bkidx|air_delay_time_bkidx|taxi_time_bkt|delay_time_bkt|air_delay_time_bkt|
+---------------+----------------+--------------------+-------------+--------------+------------------+
|            3.0|             4.0|                 2.0|     15 to 30|      30 to 45|           0 to 15|
|            3.0|             4.0|                 2.0|     15 to 30|      30 to 45|           0 to 15|
|            9.0|             8.0|                 7.0|   105 to 120|     90 to 105|          75 to 90|
|            5.0|            13.0|                 3.0|     45 to 60|    165 to 180|          15 to 30|
|            4.0|            12.0|                 0.0|     30 to 45|    150 to 165|  -Infinity to -15|
+---------------+----------------+--------------------+-------------+--------------+------------------+
only showing top 5 rows



List(())

In [53]:
pre_processed_df.count

103485

## Save tables for faster model runs by reducing Directed Acyclic Graph Complexity

In [37]:
val pre_processed = source |> pre_process
pre_processed.write.format("parquet").mode("overwrite").save("tables/pre_process/pre_process.parquet")
val pre_processed_df = spark.read.format("parquet").load("tables/pre_process/pre_process.parquet")
pre_processed_df.write.format("csv").mode("overwrite").save("tables/pre_process/pre_process.csv")

// Indexing Categorical Features
val cols_for_idx = Array(
  "weekday", "unique_carrier", "taxi_ind", "delay_ind", "origin", "dest", 
  "dep_del15", "arr_del15", "arr_delay_group", "dep_delay_group", 
  "distance", "flights", "distance_group"
)
val strIndxed = pre_processed_df |> strIndxr(cols_for_idx)
strIndxed.write.format("parquet").mode("overwrite").save("tables/strIndxed/strIndxed.parquet")
val strIndxed_df = spark.read.format("parquet").load("tables/strIndxed/strIndxed.parquet")

In [61]:
strIndxed_df.count

103485

In [38]:
strIndxed_df |> display(7,5)

+---------+--------------+----------+---------+------+-----+------------+
|taxi_time|air_delay_time|delay_time|cancelled|  year|month|day_of_month|
+---------+--------------+----------+---------+------+-----+------------+
|     29.0|           2.0|      32.0|     0.00|2016.0|  7.0|        22.0|
|     23.0|           1.0|      31.0|     0.00|2016.0|  7.0|        22.0|
|    118.0|          79.0|      97.0|     0.00|2016.0|  7.0|        23.0|
|     46.0|          28.0|     168.0|     0.00|2016.0|  7.0|        23.0|
|     32.0|         -33.0|     155.0|     0.00|2016.0|  7.0|        23.0|
+---------+--------------+----------+---------+------+-----+------------+
only showing top 5 rows

+-----------+----------+-------+-------------------+------------------+--------------------+-------------------+
|day_of_week|   fl_date|weekday|      x_fl_date_moy|     y_fl_date_moy|       x_fl_date_woy|      y_fl_date_woy|
+-----------+----------+-------+-------------------+------------------+------------

List((), (), (), (), (), (), (), (), ())

## Feature Counts For GBTRegressor MaxBin tuning

In [39]:
strIndxed_df |> createOrReplaceTempView("test")
val idxed_cols = cols_for_idx.map(x => x+"_idx")
val feature_cols = Array("year", "month", "day_of_month", "day_of_week",
    "x_fl_date_moy", "y_fl_date_moy", "x_fl_date_woy", "y_fl_date_woy", "x_fl_date_dow", "y_fl_date_dow"
    ) ++ idxed_cols ++ Array("taxi_time_bkt", "delay_time_bkt", "air_delay_time_bkt", 
                             "carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay")
val count_cols = feature_cols.map(x => s"COUNT(DISTINCT($x)) AS $x"+"_count").mkString(", ")
val df = sql(s"SELECT $count_cols FROM test")
df.columns zip df.collect()(0).toSeq map println

(year_count,2)
(month_count,12)
(day_of_month_count,31)
(day_of_week_count,7)
(x_fl_date_moy_count,28)
(y_fl_date_moy_count,28)
(x_fl_date_woy_count,50)
(y_fl_date_woy_count,50)
(x_fl_date_dow_count,326)
(y_fl_date_dow_count,231)
(weekday_idx_count,2)
(unique_carrier_idx_count,13)
(taxi_ind_idx_count,3)
(delay_ind_idx_count,3)
(origin_idx_count,106)
(dest_idx_count,105)
(dep_del15_idx_count,2)
(arr_del15_idx_count,1)
(arr_delay_group_idx_count,12)
(dep_delay_group_idx_count,15)
(distance_idx_count,191)
(flights_idx_count,1)
(distance_group_idx_count,11)
(taxi_time_bkt_count,13)
(delay_time_bkt_count,14)
(air_delay_time_bkt_count,15)
(carrier_delay_count,699)
(weather_delay_count,382)
(nas_delay_count,423)
(security_delay_count,101)
(late_aircraft_delay_count,466)


Array((), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), (), ())

## Cross-Validated Gradient Boosted Tree Regressor

In [40]:
cols_for_idx.map(x => x+"_idx")

Array(weekday_idx, unique_carrier_idx, taxi_ind_idx, delay_ind_idx, origin_idx, dest_idx, dep_del15_idx, arr_del15_idx, arr_delay_group_idx, dep_delay_group_idx, distance_idx, flights_idx, distance_group_idx)

In [64]:
def cv_gbt(df: DataFrame) = {
  val idxed_cols = cols_for_idx.map(x => x+"_idx")
  val feature_cols = Array("year", "month", "day_of_month", "day_of_week",
    "x_fl_date_moy", "y_fl_date_moy", "x_fl_date_woy", "y_fl_date_woy", "x_fl_date_dow", "y_fl_date_dow"
    ) ++ idxed_cols ++ Array("x_whls_on_mod", "y_whls_on_mod", "x_whls_off_mod", "y_whls_off_mod",
    "carrier_delay", "weather_delay", "nas_delay", "security_delay", "late_aircraft_delay")
    
  val featAssembler = new VectorAssembler(
    ).setInputCols(feature_cols
    ).setOutputCol("feat_vec")
    
  var df_wFeatVec = featAssembler.transform(df).drop(
    "weekday_idx", "unique_carrier_idx", "taxi_ind_idx", "delay_ind_idx", 
    "origin_idx", "dest_idx", "dep_del15_idx", "arr_del15_idx", "arr_delay_group_idx", "dep_delay_group_idx", 
    "distance_idx", "flights_idx", "distance_group_idx")
    
  println(df_wFeatVec.count())
    
  val Array(training, test) 
    = df_wFeatVec.randomSplit(Array(0.8, 0.2), seed = 12345)
        .map(_.withColumnRenamed("taxi_time", "label").withColumnRenamed("feat_vec", "features"))
    
  val gbt = new GBTRegressor()

  val paramGrid = new ParamGridBuilder() // Expensive
    .addGrid(gbt.labelCol, Array("label"))
    .addGrid(gbt.featuresCol, Array("features"))
    .addGrid(gbt.maxIter, Array(10))
    .addGrid(gbt.maxBins, Array(250))
    .build()
    
  val cv = new CrossValidator() // Expensive
    .setEstimator(gbt)
    .setEvaluator(new RegressionEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3) 

  val cvModel = cv.fit(training)
  // TODO: Select more for EDA
  cvModel.transform(test).drop("features") //.select("features", "label", "prediction")
}
{ strIndxed_df |> cv_gbt }.write.format("parquet").mode("overwrite").save("tables/gbt/gbt.parquet")

103485


## Save For Exploratory Data Analysis in a Python notebook
##### Note: Although the ipynb can run Apache Toree's PySpark kernel as well as Spark's (Scala), there was a matplotlib dependency issue I didnt wan't to solve at the time

In [67]:
val gbt_res_df = spark.read.format("parquet").load("tables/gbt/gbt.parquet/")
// gbt_res_df.write.format("csv").mode("overwrite").save("tables/gbt/gbt.csv")

Name: Syntax Error.
Message: 
StackTrace: 

In [68]:
gbt_res_df.count()

20870

In [62]:
gbt_res_df.count

20870

## Regression Metrics

In [49]:
val reg_eval = new RegressionEvaluator(
  ).setLabelCol("label"
  ).setPredictionCol("prediction")

val List(rmse, mse, r2, mae) 
  = List("rmse", "mse", "r2","mae"
      ).map(reg_eval.setMetricName(_).evaluate(gbt_res_df))

s"Root Mean Squared Error (rmse) - $rmse" |> println
s"      Mean Squared Error (mse) - $mse"  |> println
s"   Median Absolute Error (mae) - $mae"  |> println
s"Coef of Determination     (r2) - $r2"   |> println

Root Mean Squared Error (rmse) - 12.060027132075415
      Mean Squared Error (mse) - 145.44425442639516
   Median Absolute Error (mae) - 8.552334236413296
Coef of Determination     (r2) - 0.6533716108894303


## Backlog

In [25]:
// UNUSED: Hashing Trick
def hashing_trick(numFeatures: Int, binary: Boolean) 
    = (df: DataFrame) => {
      val include = df.drop("taxi_time", "delay_time", "cancelled").columns
      val exclude = df.select("taxi_time", "delay_time", "cancelled").columns
      val stringed_exprs = stringify(include, exclude)
      val stringed_cols = (1 to include.size).map("str"+_.toString).mkString(", ")
      val stringed_array = "ARRAY("+stringed_cols+") AS raw_feats"
      df.createOrReplaceTempView("df")
      val all_cols_df = sql(s"SELECT *, $stringed_exprs FROM df").selectExpr("*", stringed_array)
      all_cols_df.createOrReplaceTempView("all_cols_df")
      val selected_cols = all_cols_df.columns.take(34) ++ all_cols_df.columns.takeRight(1) mkString(", ")
      val df2 = sql(s"SELECT $selected_cols FROM all_cols_df")

      val hashingTF = new HashingTF()
       .setInputCol("raw_feats")
       .setOutputCol("hashed_feats")
       .setNumFeatures(include.size)
       .setBinary(binary)

      hashingTF.transform(df2)
  }