# Data Wrangling Pipeline

### IMPORTS

In [1]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import java.util.Calendar
import java.io
import java.text.SimpleDateFormat
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer, StringIndexerModel}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://ca224e759b02:4040
SparkContext available as 'sc' (version = 2.4.3, master = local[*], app id = local-1559985801832)
SparkSession available as 'spark'


import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.udf
import java.util.Calendar
import java.io
import java.text.SimpleDateFormat
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, StringIndexer, StringIndexerModel}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import spark.implicits._


### FUNCTIONS

##### Import and define schema

In [2]:
def import_data_set_schema(intake: String, outcome: String): (DataFrame, DataFrame) = {

    var in_df = spark.read.option("header","true").csv(intake)
    var out_df = spark.read.option("header","true").csv(outcome)
    val inCols = Seq("animal_id","name_in","datetime_in","monthYear_in","location_found","intake_type", "intake_condition","species_in","sex_in","age_in","breed_in","color_in")
    val outCols = Seq("animal_id","name_out","datetime_out","monthYear_out","dob","outcome_type","outcome_subtype","species_out","sex_out","age_out","breed_out","color_out")
    in_df = in_df.toDF(inCols: _*)
    out_df = out_df.toDF(outCols: _*)

    return (in_df, out_df)
    }

import_data_set_schema: (intake: String, outcome: String)(org.apache.spark.sql.DataFrame, org.apache.spark.sql.DataFrame)


##### Create Unique ID for each row based on animal_id and datetime

In [3]:
def create_unique_id(df1: DataFrame, df2: DataFrame): (DataFrame, DataFrame) = {

    df1.createOrReplaceTempView("df1")
    df2.createOrReplaceTempView("df2")

    val df1_unique = spark.sql(
    """SELECT CONCAT(animal_id, '_', CAST(rownum AS STRING)) AS animal_id_unique
            ,name_in
            ,datetime_in
            ,location_found
            ,intake_type
            ,intake_condition
            ,species_in
            ,sex_in
            ,age_in
            ,breed_in
            ,color_in
    FROM (
        SELECT `animal_id`
            ,`name_in`
            ,`datetime_in`
            ,`location_found`
            ,`intake_type`
            ,`intake_condition`
            ,`species_in`
            ,`sex_in`
            ,`age_in`
            ,`breed_in`
            ,`color_in`
            ,ROW_NUMBER() OVER (PARTITION BY `animal_id` ORDER BY `datetime_in` DESC) AS rownum
        FROM df1
    )
    """)
    df1_unique.createOrReplaceTempView("df1_unique")

    val df2_unique = spark.sql(
    """SELECT CONCAT(animal_id, '_', CAST(rownum AS STRING)) AS animal_id_unique
            ,datetime_out
            ,name_out
            ,dob
            ,outcome_type
            ,outcome_subtype
            ,species_out
            ,sex_out
            ,age_out
            ,breed_out
            ,color_out
    FROM (
        SELECT `animal_id`
            ,`datetime_out`
            ,`name_out`
            ,`dob`
            ,`outcome_type`
            ,`outcome_subtype`
            ,`species_out`
            ,`sex_out`
            ,`age_out`
            ,`breed_out`
            ,`color_out`
            ,ROW_NUMBER() OVER (PARTITION BY `animal_id` ORDER BY `datetime_out` DESC) AS rownum
        FROM df2 
    )
    """)
    df2_unique.createOrReplaceTempView("df2_unique")
   
    return (df1_unique, df2_unique)
}    

create_unique_id: (df1: org.apache.spark.sql.DataFrame, df2: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame, org.apache.spark.sql.DataFrame)


##### Convert age column into consistent format (years decimal)

In [4]:
def convert_age(df: DataFrame, sourceTable: String): DataFrame = {
    // define function
    def convert_age(input:String) : Float = {
        if (input == "NULL") {
            var age = -1.toFloat
            return age
        }
        if (input.split("\\D")(0).size > 0){
            var num = input.split("\\D")(0).toFloat
            if (input.matches(".*months?")){
                num = num / 12
            } else if (input.matches(".*weeks?")) {
                num = (num*7)/365
            } else if (input.matches(".*days?")) {
                num = num/365
            }
            return num
        } else {
            var age = -1.toFloat
            return age
        }

    }

    // Generate udf to apply to DataFrame column
    import org.apache.spark.sql.functions.udf
    val convertAge = udf[Float, String](convert_age)

    // Add new age column
    if (sourceTable == "in"){
        val df_updated = df.withColumn("age_years_in", convertAge(scala.Symbol("age_in")))
        return df_updated
    } else if (sourceTable == "out") {
        val df_updated = df.withColumn("age_years_out", convertAge(scala.Symbol("age_out")))
        return df_updated
    } else {
        return df
    }
    
}

convert_age: (df: org.apache.spark.sql.DataFrame, sourceTable: String)org.apache.spark.sql.DataFrame


##### Convert sex column into two separate columns including sex and a boolean variable for desexed

In [5]:
def convert_sex(df: DataFrame, sourceTable: String): DataFrame= {
    def extract_sex(input:String) : String = {
        var sex = "Unknown"
        if (input.toLowerCase.matches("(.*[^(\\w)]male)|^male.*")) {
            sex = "male"
        } else if (input.toLowerCase.matches("(.*[^(\\w)]female)|^female.*")) {
            sex = "female"
        }
        return sex
    }

    def extract_desexed(input:String) : String = {
        var desexed = "Unknown"
        if (input.toLowerCase.matches("(.*neutered.*)|(.*spayed.*)")) {
            desexed = "true"
        } else if (input.toLowerCase contains "intact"){
            desexed = "false"
        }
        return desexed
    }

    // Generate udf to apply to DataFrame column
    import org.apache.spark.sql.functions.udf
    val extractSex = udf[String, String](extract_sex)
    val extractDesex = udf[String, String](extract_desexed)

    // Add new sex and desexed columns
    if (sourceTable == "in") {
        var updated_df = df.withColumn("sex_only", extractSex(scala.Symbol("sex_in")))
        updated_df = updated_df.withColumn("desexed_in", extractDesex(scala.Symbol("sex_in")))
        return updated_df
    } else if (sourceTable == "out") {
        var updated_df = df.withColumn("sex_only", extractSex(scala.Symbol("sex_out")))
        updated_df = updated_df.withColumn("desexed_out", extractDesex(scala.Symbol("sex_out")))
        return updated_df
    } else {
        return df


    }
}

convert_sex: (df: org.apache.spark.sql.DataFrame, sourceTable: String)org.apache.spark.sql.DataFrame


##### Convert colour column into primary and secondary colour

In [7]:
def convert_colour(df: DataFrame, sourceTable: String): DataFrame = {

    def extract_prim (input:String) : String = {
        if (input.matches(".*/.*")){
            var pri_colour = input.toLowerCase.split("/")(0).split(" ")(0)
            return pri_colour
        } else {
            var pri_colour = input.toLowerCase.split(" ")(0)
            return pri_colour
        }   
    }

    def extract_sec (input:String) : String = {
        if (input.matches(".*/.*")){
            var sec_colour = input.toLowerCase.split("/")(1).split(" ")(0)
            return sec_colour
        } else if (input.matches(".+ .+")) {
            var sec_colour = input.toLowerCase.split(" ")(1)
            return sec_colour
        }  else {
            var sec_colour = null
            return sec_colour
        } 
    }

    // Generate udf to apply to DataFrame column
    val extractPrim = udf[String, String](extract_prim)
    val extractSec = udf[String, String](extract_sec)
    
    // Add new colour cols

    if (sourceTable == "in") {
        var updated_df = df.withColumn("prim_colour_in", extractPrim(scala.Symbol("color_in")))
        updated_df = updated_df.withColumn("sec_colour_in", extractSec(scala.Symbol("color_in")))
        return updated_df
    } else if (sourceTable == "out") {
        var updated_df = df.withColumn("prim_colour_out", extractPrim(scala.Symbol("color_out")))
        updated_df = updated_df.withColumn("sec_colour_out", extractSec(scala.Symbol("color_out")))
        return updated_df
    } else {
        return df
    }    

}

convert_colour: (df: org.apache.spark.sql.DataFrame, sourceTable: String)org.apache.spark.sql.DataFrame


##### Convert breed column into primary and secondary breed

In [8]:
def convert_breed(df: DataFrame, sourceType: String): DataFrame = {
    
    def extract_mix (input:String) : Boolean = {
        if (input.toLowerCase.matches(".*mix.*") || input.matches(".*/.*")){
            var mix_bool = true
            return mix_bool
        } else {
            var mix_bool = false
            return mix_bool
        }   
    }
    
    def extract_prim_breed (input:String) : String = {
        if (input.matches(".*/.*")){
            var pri_breed = input.toLowerCase.split("/")(0).replaceAll("mix", "").trim
            return pri_breed
        } else {
            var pri_breed = input.toLowerCase.replaceAll("mix", "").trim
            return pri_breed
        }
    }  
    
    
    // Generate udf to apply to DataFrame column
    val extractMix = udf[Boolean, String](extract_mix)
    val extractPrim = udf[String, String](extract_prim_breed)
    
    if (sourceType == "in") {
        var updated_df = df.withColumn("mix_bool", extractMix(scala.Symbol("breed_in")))
        updated_df = updated_df.withColumn("prim_breed_in", extractPrim(scala.Symbol("breed_in")))
        return updated_df
    } else if (sourceType == "out") {
        var updated_df = df.withColumn("mix_bool_out", extractMix(scala.Symbol("breed_out")))
        updated_df = df.withColumn("prim_breed_out", extractPrim(scala.Symbol("breed_out")))
        return updated_df
    } else {
        return df
    } 
    
}

convert_breed: (df: org.apache.spark.sql.DataFrame, sourceType: String)org.apache.spark.sql.DataFrame


##### Datetime column wrangling into consistent format and generation of new date related columns

In [9]:
def Date_Time_Wrangler (tgtDF: DataFrame,colName: String,sourceType: String): DataFrame={

//Section 1: Variable definition
        val DBColName      = "Date of Birth"
        var dateColName    = ""
        var timeColName    = ""
        var monthColName   = ""
        var MorAftColName  = ""
        lazy val DBweekDate = "BirthWeekDate_out"
        var WeekColName    = ""

//Section 2: UDF function declar
        //Unify Full Format UDF variable definition
        //Function check length of string to definded which format current it is. 
        //***Potential bug or exception could occur when input data formate change
        var unifyTimeString = udf {
            (dt: String) =>{
                val  s24 = new SimpleDateFormat("M/d/yy HH:mm")
                val  d12 = new SimpleDateFormat("M/d/yy hh:mm:ss a")
                val  l24 = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
                val  uniFormat = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss a")
                //Remove DateTime string extra space
                var cleanStr = dt.trim().replaceAll(" +", " ")
                
                if (cleanStr.size >= 18){
                   uniFormat.format(l24.parse(cleanStr))
                }else{
                    uniFormat.format(s24.parse(cleanStr))
                }
            }
        }
        //UDF of unify Date format
        //Function check length of string to definded which format current it is. 
        //***Potential bug or exception could occur
        val unifyDateFormat = udf{
            (dt: String) =>{
                val lformat = new SimpleDateFormat("MM/dd/yyyy");
                val sformat = new SimpleDateFormat("M/d/yy");
                if (dt.size == 10) {dt}
                else {lformat.format(sformat.parse(dt))}
            }
        }
    
        //UDF of generate 3 charactors week date string
        var getWeekDate = udf{
            (dt: String) =>{
            val shortWeek = new SimpleDateFormat("E")
            val lformat = new SimpleDateFormat("MM/dd/yyyy")
            shortWeek.format(lformat.parse(dt))
            }
        }
 
        //UDF of generate 3 charactors week date string
        var getMonth = udf{
            (dt: String) =>{
            val Month = new SimpleDateFormat("MMM")
            val lformat = new SimpleDateFormat("MM/dd/yyyy")
            Month.format(lformat.parse(dt))
            }
        }
    
//Section 3: proceed DateTime data proceessing        
    
        colName match{
            //If passed Date of Birth column
            case DBColName => {
                //unify Date formate
                var format_df = tgtDF.withColumn(colName, unifyDateFormat(col(colName)))
                //Generate 3 characters week date column.
                var addWeek_df = format_df.withColumn(DBweekDate, getWeekDate(col(colName)))
                return addWeek_df
            }
            //Processing other Date and Time Column
            //It's consider Other datetime column that has both date and time
            case _ => {
                //Initial column name based on passed in source type
                sourceType.toLowerCase match{
                    //Initialized columns' name for in data source
                    case "in" =>{
                        WeekColName   = "weekday_in"
                        dateColName   = "date_in"
                        timeColName   = "time_in"
                        monthColName  = "month_in"
                        MorAftColName = "ampm_in"
                    }
                    //Initialized columns' name for out data source
                    case "out"=>{
                        WeekColName   = "weekday_out"
                        dateColName   = "date_out"
                        timeColName   = "time_out"
                        monthColName  = "month_out"
                        MorAftColName = "ampm_out"
                    }
                    //Other string will return origional Data Frame
                    case _ => {
                        println("Error in DateTimeWrangle: Please provid data source type as \"In\" or \"Out\"")
                        return tgtDF
                    } 
                }
                //Unified Date Time format
                var trim_df = tgtDF.withColumn(colName, unifyTimeString(col(colName)))
                //Generate Date column
                trim_df = trim_df.withColumn(dateColName, split(col(colName), "\\s")
                                 .getItem(0))
                //Generate Time column and drop the target column
                trim_df = trim_df.withColumn(timeColName, split(col(colName), "\\s")
                                 .getItem(1))
                //Generate AM or PM column
                trim_df = trim_df.withColumn(MorAftColName, split(col(colName),"\\s")
                                 .getItem(2)).drop(colName)
                //Gerate 3 Characters Month Column
                trim_df = trim_df.withColumn(monthColName, getMonth(col(dateColName)))
                //Generate 3 characters week date column.
                trim_df = trim_df.withColumn(WeekColName, getWeekDate(col(dateColName)))
                return trim_df
            }
        }
}

import java.util.Calendar
import java.io
import java.text.SimpleDateFormat
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
Date_Time_Wrangler: (tgtDF: org.apache.spark.sql.DataFrame, colName: String, sourceType: String)org.apache.spark.sql.DataFrame


In [10]:
def convert_nulls(df: DataFrame): DataFrame = {
    val updated_df = df.na.fill("unknown", Seq("name_in"))
                .na.fill("unknown", Seq("location_found"))
                .na.fill("unknown", Seq("intake_type"))                                                                                   
                .na.fill("unknown", Seq("intake_condition"))                                                                                 
                .na.fill("unknown", Seq("species_in"))                                                                                 
                .na.fill("unknown", Seq("age_years_in"))                                                                                 
                .na.fill("unknown", Seq("sex_only"))                                                                                 
                .na.fill("unknown", Seq("desexed_in"))                                                                                 
                .na.fill("unknown", Seq("prim_colour_in"))                                                                                 
                .na.fill("unknown", Seq("prim_colour_out"))                                                                                 
                .na.fill("unknown", Seq("sec_colour_in"))                                                                                  
                .na.fill("unknown", Seq("sec_colour_out"))                                                                                 
                .na.fill("unknown", Seq("prim_breed_in"))                                                                                 
                .na.fill("unknown", Seq("sec_breed_in"))                                                                                
                .na.fill("unknown", Seq("prim_breed_out"))                                                                                 
                .na.fill("unknown", Seq("sec_breed_out"))                                                                                 
                .na.fill("unknown", Seq("date_in"))                                                                                 
                .na.fill("unknown", Seq("time_in"))                                                                                
                .na.fill("unknown", Seq("time_out"))                                                                                 
                .na.fill("unknown", Seq("ampm_in"))                                                                                 
                .na.fill("unknown", Seq("month_in"))                                                                                 
                .na.fill("unknown", Seq("id_in"))                                                                                 
                .na.fill("unknown", Seq("weekday_in"))                                                                                 
                .na.fill("unknown", Seq("outcome_type"))                                                                                 
                .na.fill("unknown", Seq("outcome_subtype"))                                                                                 
                .na.fill("unknown", Seq("age_years_out"))                                                                                 
                .na.fill("unknown", Seq("desexed_out"))                                                                                 
                .na.fill("unknown", Seq("date_out"))                                                                                 
                .na.fill("unknown", Seq("month_out"))                                                                                  
                .na.fill("unknown", Seq("ampm_out"))                                                                                  
                .na.fill("unknown", Seq("weekday_out"))
    return updated_df
}

convert_nulls: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [11]:
// Encoding categorical features to numeric values stored in vectors

def encode_features(df: DataFrame): DataFrame = {
    // Ensure only categorical columns are encoded
    val feats = df.columns.filterNot(_.toLowerCase().contains("name")).filterNot(_.toLowerCase().contains("date")).filterNot(_.toLowerCase().contains("time")).filterNot(_.toLowerCase().contains("dob")).filterNot(_.toLowerCase().contains("age")).filterNot(_.toLowerCase().contains("id"))
    // Define new encoded columns both indice and vectors
    val encoded_feats = feats.flatMap{ name =>
        val indexer = new StringIndexer()
                        .setInputCol(name)
                        .setOutputCol(name+"_index")
        val encoder = new OneHotEncoderEstimator()
                        .setInputCols(Array(name+"_index"))
                        .setOutputCols(Array(name+"_vector"))
                        .setDropLast(false)
        Array(indexer, encoder)
    }
    val pipeline = new Pipeline().setStages(encoded_feats)  // Setting the encoding pipeline     
    val df_transformed = pipeline.fit(df).transform(df) // Fitting the encoding pipeline to the dataframe
    val non_index_cols = df_transformed.columns.filterNot(_.contains("_index")).toList // List of column names that do not hold index
    val vector_cols = df_transformed.columns.filter(_.contains("_vector")).toSeq // Sequence of column names that hold vectors
    var df_result = df_transformed.select(non_index_cols.head, non_index_cols.tail:_*) // Dataframe with all non-index columns
    vector_cols.foreach(x => df_result = df_result.withColumn(s"$x", to_json(struct(s"$x"))))  // Convert columns that hold vectors to hold json of the vectors
    return df_result
}

encode_features: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


### PIPELINE

# ALL


In [12]:
// Change these to your local path
val intake_file_path = "Austin_Animal_Center_Intakes.csv"
val outcomes_file_path = "Austin_Animal_Center_Outcomes.csv"

// Update import and dataframes
var (in_df, out_df) = import_data_set_schema(intake_file_path, outcomes_file_path)
var (in, out) = create_unique_id(in_df, out_df)
in = convert_breed(convert_colour(convert_sex(convert_age(in, "in"), "in"), "in"),"in")
out = convert_breed(convert_colour(convert_sex(convert_age(out, "out"), "out"), "out"), "out")
in = Date_Time_Wrangler(in, "datetime_in", "in")
out = Date_Time_Wrangler(out, "datetime_out", "out")
out = Date_Time_Wrangler(out, "dob", "out")

// Drop unnecessary columns
var in_dropped = in.drop("sex_in").drop("breed_in").drop("color_in").drop("age_in")
var out_dropped = out.drop("sex_out").drop("sex_only").drop("color_out").drop("breed_out").drop("age_out").drop("ampm_out").drop("month_out").drop("weekday_out").drop("species_out").drop("name_out").drop("time_out").drop("date_out")

// Join dataframes
in_dropped = in_dropped.withColumnRenamed("animal_id_unique", "id_in")
out_dropped = out_dropped.withColumnRenamed("animal_id_unique", "id_out")
val joined = in_dropped.join(out_dropped).where($"id_in" === $"id_out").drop("id_out")

//remove nulls
val nulls_removed = convert_nulls(joined)

//print schema
println("joined schema")
joined.printSchema

joined schema
root
 |-- id_in: string (nullable = true)
 |-- name_in: string (nullable = true)
 |-- location_found: string (nullable = true)
 |-- intake_type: string (nullable = true)
 |-- intake_condition: string (nullable = true)
 |-- species_in: string (nullable = true)
 |-- age_years_in: float (nullable = false)
 |-- sex_only: string (nullable = true)
 |-- desexed_in: string (nullable = true)
 |-- prim_colour_in: string (nullable = true)
 |-- sec_colour_in: string (nullable = true)
 |-- mix_bool: boolean (nullable = false)
 |-- prim_breed_in: string (nullable = true)
 |-- date_in: string (nullable = true)
 |-- time_in: string (nullable = true)
 |-- ampm_in: string (nullable = true)
 |-- month_in: string (nullable = true)
 |-- weekday_in: string (nullable = true)
 |-- outcome_type: string (nullable = true)
 |-- outcome_subtype: string (nullable = true)
 |-- age_years_out: float (nullable = false)
 |-- desexed_out: string (nullable = true)
 |-- prim_colour_out: string (nullable = tru

intake_file_path: String = Austin_Animal_Center_Intakes.csv
outcomes_file_path: String = Austin_Animal_Center_Outcomes.csv
in_df: org.apache.spark.sql.DataFrame = [animal_id: string, name_in: string ... 10 more fields]
out_df: org.apache.spark.sql.DataFrame = [animal_id: string, name_out: string ... 10 more fields]
in: org.apache.spark.sql.DataFrame = [animal_id_unique: string, name_in: string ... 20 more fields]
out: org.apache.spark.sql.DataFrame = [animal_id_unique: string, name_out: string ... 18 more fields]
in: org.apache.spark.sql.DataFrame = [animal_id_unique: string, name_in: string ... 20 more fields]
out: org.apache.spark.sql.DataFrame = [animal_id_unique: string, name_out: string ... 18 more fields]
in: org.apache.spark.sql.DataFrame = [animal_id_unique: string, name_in: str...

In [13]:
println("Unique breed_in before wrangle: " + in.select("breed_in").distinct.count())
println("Unique color_in before wrangle: " + in.select("color_in").distinct.count())
println("-------------------------------")
println("Unique prim_breed AFTER: " + joined.select("prim_breed_in").distinct.count())
println("Unique prim_colour AFTER: " + joined.select("prim_colour_in").distinct.count())
println("Unique sec_colour AFTER: " + joined.select("sec_colour_in").distinct.count())

Unique breed_in before wrangle: 2397
Unique color_in before wrangle: 569
-------------------------------
Unique prim_breed AFTER: 356
Unique prim_colour AFTER: 30
Unique sec_colour AFTER: 35


In [14]:
joined.groupBy("mix_bool").count().show()

+--------+-----+
|mix_bool|count|
+--------+-----+
|    true|92421|
|   false| 6803|
+--------+-----+



In [12]:
// Encode features
//val result = encode_features(nulls_removed)

result: org.apache.spark.sql.DataFrame = [id_in: string, name_in: string ... 44 more fields]


In [15]:
// write encoded to csv
nulls_removed.write.csv("phase2_output")