In [1]:
## Set Python - Spark environment.
import os
import sys

os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/2728B59/mysql-connector-java-5.1.2.jar  pyspark-shell'
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
## Create SparkContext, SparkSession
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs:///apps/hive/warehouse/'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL using BI On Hadoop") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
## Create SparkContext, SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext
#sc = SparkContext()
#spark = SparkSession(sc)

#spark

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col, countDistinct

In [5]:
!pwd

/home/2728B59


In [6]:
## Define Schema
dataSchema = StructType([
    StructField("age_upon_outcome", StringType(), True),
    StructField("animal_id_outcome", StringType(), True),
    StructField("date_of_birth", TimestampType(), True),
    StructField("outcome_subtype", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("sex_upon_outcome", StringType(), True),
    StructField("age_upon_outcome_(days)", IntegerType(), True),
    StructField("age_upon_outcome_(years)", DoubleType(), True),
    StructField("age_upon_outcome_age_group", StringType(), True),
    StructField("outcome_datetime", TimestampType(), True),
    StructField("outcome_month", IntegerType(), True),
    StructField("outcome_year", IntegerType(), True),
    StructField("outcome_monthyear", StringType(), True),
    StructField("outcome_weekday", StringType(), True),
    StructField("outcome_hour", IntegerType(), True),
    StructField("outcome_number", DoubleType(), True),
    StructField("dob_year", IntegerType(), True),
    StructField("dob_month", IntegerType(), True),
    StructField("dob_monthyear", StringType(), True),
    StructField("age_upon_intake", StringType(), True),
    StructField("animal_id_intake", StringType(), True),
    StructField("animal_type", StringType(), True),
    StructField("breed", StringType(), True),
    StructField("color", StringType(), True),
    StructField("found_location", StringType(), True),
    StructField("intake_condition", StringType(), True),
    StructField("intake_type", StringType(), True),
    StructField("sex_upon_intake", StringType(), True),
    StructField("count", IntegerType(), True)
])

## .Load the animalFlumeData.csv

In [7]:
Flume_Data =spark.read.format("csv")\
        .option("header", "false")\
        .option("inferSchema", "true")\
        .load("file:///home/2728B59/animalFlumeData.csv",schema = dataSchema)

In [8]:
KafkaSchema = StructType([
    StructField("animal_id_outcome", StringType(), True),
    StructField("age_upon_intake_days", IntegerType(), True),
    StructField("age_upon_intake_years", DoubleType(), True),
    StructField("age_upon_intake_age_group", StringType(), True),
    StructField("intake_datetime", TimestampType(), True),
    StructField("intake_month", IntegerType(), True),
    StructField("intake_year", IntegerType(), True),
    StructField("intake_monthyear", StringType(), True),
    StructField("intake_weekday", StringType(), True),
    StructField("intake_hour", IntegerType(), True),
    StructField("intake_number", DoubleType(), True),
    StructField("time_in_shelter", StringType(), True),
    StructField("time_in_shelter_days", DoubleType(), True)
])


##  Load the animalKafkaTabData.csv

In [9]:
Kafka_Data = spark.read.format("csv")\
        .option("header", "false")\
        .option("inferSchema", "true")\
        .option ('delimiter','\t')\
        .load("file:///home/2728B59/animalKafkaTabData.csv",
              schema = KafkaSchema)

        

In [10]:
Flume_Data.printSchema()

root
 |-- age_upon_outcome: string (nullable = true)
 |-- animal_id_outcome: string (nullable = true)
 |-- date_of_birth: timestamp (nullable = true)
 |-- outcome_subtype: string (nullable = true)
 |-- outcome_type: string (nullable = true)
 |-- sex_upon_outcome: string (nullable = true)
 |-- age_upon_outcome_(days): integer (nullable = true)
 |-- age_upon_outcome_(years): double (nullable = true)
 |-- age_upon_outcome_age_group: string (nullable = true)
 |-- outcome_datetime: timestamp (nullable = true)
 |-- outcome_month: integer (nullable = true)
 |-- outcome_year: integer (nullable = true)
 |-- outcome_monthyear: string (nullable = true)
 |-- outcome_weekday: string (nullable = true)
 |-- outcome_hour: integer (nullable = true)
 |-- outcome_number: double (nullable = true)
 |-- dob_year: integer (nullable = true)
 |-- dob_month: integer (nullable = true)
 |-- dob_monthyear: string (nullable = true)
 |-- age_upon_intake: string (nullable = true)
 |-- animal_id_intake: string (nullab

In [11]:
Flume_Data.show()

+----------------+-----------------+-------------------+---------------+---------------+----------------+-----------------------+------------------------+--------------------------+-------------------+-------------+------------+-----------------+---------------+------------+--------------+--------+---------+-------------+---------------+----------------+-----------+--------------------+------------------+--------------------+----------------+---------------+---------------+-----+
|age_upon_outcome|animal_id_outcome|      date_of_birth|outcome_subtype|   outcome_type|sex_upon_outcome|age_upon_outcome_(days)|age_upon_outcome_(years)|age_upon_outcome_age_group|   outcome_datetime|outcome_month|outcome_year|outcome_monthyear|outcome_weekday|outcome_hour|outcome_number|dob_year|dob_month|dob_monthyear|age_upon_intake|animal_id_intake|animal_type|               breed|             color|      found_location|intake_condition|    intake_type|sex_upon_intake|count|
+----------------+------------

In [12]:
Kafka_Data.printSchema()

root
 |-- animal_id_outcome: string (nullable = true)
 |-- age_upon_intake_days: integer (nullable = true)
 |-- age_upon_intake_years: double (nullable = true)
 |-- age_upon_intake_age_group: string (nullable = true)
 |-- intake_datetime: timestamp (nullable = true)
 |-- intake_month: integer (nullable = true)
 |-- intake_year: integer (nullable = true)
 |-- intake_monthyear: string (nullable = true)
 |-- intake_weekday: string (nullable = true)
 |-- intake_hour: integer (nullable = true)
 |-- intake_number: double (nullable = true)
 |-- time_in_shelter: string (nullable = true)
 |-- time_in_shelter_days: double (nullable = true)



In [13]:
Kafka_Data.show()

+-----------------+--------------------+---------------------+-------------------------+-------------------+------------+-----------+----------------+--------------+-----------+-------------+--------------------+--------------------+
|animal_id_outcome|age_upon_intake_days|age_upon_intake_years|age_upon_intake_age_group|    intake_datetime|intake_month|intake_year|intake_monthyear|intake_weekday|intake_hour|intake_number|     time_in_shelter|time_in_shelter_days|
+-----------------+--------------------+---------------------+-------------------------+-------------------+------------+-----------+----------------+--------------+-----------+-------------+--------------------+--------------------+
|          A178569|                5475|                 15.0|             (12.5, 15.0]|2014-03-17 09:45:00|           3|       2014|         2014-03|        Monday|          9|          1.0|6 days 06:12:00.0...|   6.258333333333334|
|          A284639|                4015|                 11.0|  

In [14]:
Flume_Data.count()


43604

In [15]:
Kafka_Data.count()

71961

In [16]:
spark.sql("USE insofe_animal59_PHD_DB")


DataFrame[]

In [17]:
spark.sql("show tables").show()

+--------------------+----------+-----------+
|            database| tableName|isTemporary|
+--------------------+----------+-----------+
|insofe_animal59_p...|animaldata|      false|
+--------------------+----------+-----------+



In [18]:
Flume_Data.toPandas().head()

Unnamed: 0,age_upon_outcome,animal_id_outcome,date_of_birth,outcome_subtype,outcome_type,sex_upon_outcome,age_upon_outcome_(days),age_upon_outcome_(years),age_upon_outcome_age_group,outcome_datetime,...,age_upon_intake,animal_id_intake,animal_type,breed,color,found_location,intake_condition,intake_type,sex_upon_intake,count
0,1 month,A678753,2014-04-12,Partner,Transfer,Intact Female,30,0.082192,"(-0.025, 2.5]",2014-05-30 16:59:00,...,4 weeks,A678753,Cat,Domestic Shorthair Mix,Black/White,4101 Smith School Road in Austin (TX),Normal,Stray,Intact Female,1
1,1 month,A680216,2014-04-15,Partner,Transfer,Intact Male,30,0.082192,"(-0.025, 2.5]",2014-05-31 11:30:00,...,1 month,A680216,Dog,German Shepherd Mix,Brown/Black,Del Valle (TX),Normal,Owner Surrender,Intact Male,1
2,1 month,A684348,2014-05-31,Partner,Transfer,Intact Female,30,0.082192,"(-0.025, 2.5]",2014-07-22 15:54:00,...,1 month,A684348,Cat,Domestic Shorthair Mix,Tortie,4501 Leslie Ave in Austin (TX),Normal,Stray,Intact Female,1
3,1 month,A695315,2014-12-07,Foster,Adoption,Neutered Male,30,0.082192,"(-0.025, 2.5]",2015-01-27 18:00:00,...,1 month,A695315,Dog,Pit Bull Mix,Brown Brindle,10511 Brownie Dr in Austin (TX),Normal,Stray,Intact Male,1
4,1 month,A725965,2016-03-30,,Adoption,Intact Female,30,0.082192,"(-0.025, 2.5]",2016-05-22 12:29:00,...,5 weeks,A725965,Cat,Domestic Shorthair Mix,Blue/White,5800 Cherry Park in Austin (TX),Normal,Stray,Intact Female,1


In [19]:
Kafka_Data.toPandas().head()

Unnamed: 0,animal_id_outcome,age_upon_intake_days,age_upon_intake_years,age_upon_intake_age_group,intake_datetime,intake_month,intake_year,intake_monthyear,intake_weekday,intake_hour,intake_number,time_in_shelter,time_in_shelter_days
0,A178569,5475,15.0,"(12.5, 15.0]",2014-03-17 09:45:00,3,2014,2014-03,Monday,9,1.0,6 days 06:12:00.000000000,6.258333
1,A284639,4015,11.0,"(10.0, 12.5]",2014-01-27 13:05:00,1,2014,2014-01,Monday,13,1.0,1 days 00:12:00.000000000,1.008333
2,A540495,1825,5.0,"(2.5, 5.0]",2014-04-30 11:02:00,4,2014,2014-04,Wednesday,11,1.0,5 days 04:00:00.000000000,5.166667
3,A554674,4015,11.0,"(10.0, 12.5]",2017-09-29 21:52:00,9,2017,2017-09,Friday,21,1.0,1 days 07:08:00.000000000,1.297222
4,A556644,2555,7.0,"(5.0, 7.5]",2016-05-25 18:23:00,5,2016,2016-05,Wednesday,18,1.0,0 days 00:58:00.000000000,0.040278


In [20]:
spark.sql("select * from animaldata").show(3)

+-------+---------+-------------------+
|dept_no|dept_name|      last_modified|
+-------+---------+-------------------+
|1 month|  A745619|2017-01-21 00:00:00|
|1 month|  A755211|2017-06-16 00:00:00|
|1 month|  A763655|2017-10-21 00:00:00|
+-------+---------+-------------------+
only showing top 3 rows



In [21]:
import pymysql.cursors

In [22]:
connection = pymysql.connect(host='172.16.0.232',
                             user='insofeadmin',
                             password='insofe_password',
                             db='insofe_animal59_PHD_DB',
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor)

try:
    with connection.cursor() as cursor:


        query = "SELECT * FROM animalData"
        cursor.execute(query)
        animalData = cursor.fetchall()
finally:
    connection.close()

In [23]:
ValuesSchema2 = StructType([
    StructField("age_upon_outcome", StringType(), True),
    StructField("animal_id_outcome", StringType(), True),
    StructField("date_of_birth", TimestampType(), True),
    StructField("outcome_subtype", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("sex_upon_outcome", StringType(), True),
    StructField("age_upon_outcome_(days)", IntegerType(), True),
    StructField("age_upon_outcome_(years)", DoubleType(), True),
    StructField("age_upon_outcome_age_group", StringType(), True),
    StructField("outcome_datetime", TimestampType(), True),
    StructField("outcome_month", IntegerType(), True),
    StructField("outcome_year", IntegerType(), True),
    StructField("outcome_monthyear", StringType(), True),
    StructField("outcome_weekday", StringType(), True),
    StructField("outcome_hour", IntegerType(), True),
    StructField("outcome_number", DoubleType(), True),
    StructField("dob_year", IntegerType(), True),
    StructField("dob_month", IntegerType(), True),
    StructField("dob_monthyear", StringType(), True),
    StructField("age_upon_intake", StringType(), True),
    StructField("animal_id_intake", StringType(), True),
    StructField("animal_type", StringType(), True),
    StructField("breed", StringType(), True),
    StructField("color", StringType(), True),
    StructField("found_location", StringType(), True),
    StructField("intake_condition", StringType(), True),
    StructField("intake_type", StringType(), True),
    StructField("sex_upon_intake", StringType(), True),
    StructField("count", IntegerType(), True)
])

## Using spark load the data from animalData table in
insofe_animal59_PHD_DB database to a spark dataframe with name
data_mysql 

In [24]:
data_mysql = spark.createDataFrame(animalData, schema = ValuesSchema2)

In [25]:
data_mysql.show(3)

+----------------+-----------------+-------------------+---------------+------------+----------------+-----------------------+------------------------+--------------------------+-------------------+-------------+------------+-----------------+---------------+------------+--------------+--------+---------+-------------+---------------+----------------+-----------+--------------------+--------------+--------------------+----------------+-----------+---------------+-----+
|age_upon_outcome|animal_id_outcome|      date_of_birth|outcome_subtype|outcome_type|sex_upon_outcome|age_upon_outcome_(days)|age_upon_outcome_(years)|age_upon_outcome_age_group|   outcome_datetime|outcome_month|outcome_year|outcome_monthyear|outcome_weekday|outcome_hour|outcome_number|dob_year|dob_month|dob_monthyear|age_upon_intake|animal_id_intake|animal_type|               breed|         color|      found_location|intake_condition|intake_type|sex_upon_intake|count|
+----------------+-----------------+----------------

##  Merge (rowbind) the dataframes loaded from flume (data_flume) ingestion
and mysql (data_mysql) data frame.
 

In [26]:
combined_data = data_mysql.union(Flume_Data)

In [27]:
combined_data.count()

71961

In [28]:
DF_value_cols = Kafka_Data.columns
DF_label_cols = combined_data.columns

[item for i, item in enumerate(DF_value_cols) if item in DF_label_cols]


['animal_id_outcome']

## Join the resultant spark dataframe in above step with the dataframe which has
data from the kafka on column “animal_id_outcome”. This is the final data
set.


In [29]:
final_Data = combined_data.join(Kafka_Data,on = 'animal_id_outcome' ,how="inner")

## Check the number of rows and columns in the final data set. Check the data
types of the columns with the Data Description 

In [30]:
final_Data.count()

71961

In [31]:
print('row counts :', final_Data.count())


('row counts :', 71961)


In [35]:
len(final_Data.columns)

41

## Print the top 5 rows from the final data set.

In [33]:
final_Data.show(5)

+-----------------+----------------+-------------------+---------------+------------+----------------+-----------------------+------------------------+--------------------------+-------------------+-------------+------------+-----------------+---------------+------------+--------------+--------+---------+-------------+---------------+----------------+-----------+--------------------+--------------+--------------------+----------------+-----------+---------------+-----+--------------------+---------------------+-------------------------+-------------------+------------+-----------+----------------+--------------+-----------+-------------+--------------------+--------------------+
|animal_id_outcome|age_upon_outcome|      date_of_birth|outcome_subtype|outcome_type|sex_upon_outcome|age_upon_outcome_(days)|age_upon_outcome_(years)|age_upon_outcome_age_group|   outcome_datetime|outcome_month|outcome_year|outcome_monthyear|outcome_weekday|outcome_hour|outcome_number|dob_year|dob_month|dob_mo

## Calculate Average age of animal when it comes out of shelter (ColumnName:
age_upon_outcome_(years)).


In [36]:
final_Data.agg({'age_upon_outcome_(years)': 'mean'}).show()

+-----------------------------+
|avg(age_upon_outcome_(years))|
+-----------------------------+
|           2.1071279286935076|
+-----------------------------+



## GET THE TOP 10 AGES_UPON_OUTCOME_(days) OF ANIMALS 

In [37]:
final_Data.sort(desc('age_upon_outcome_(years)')).show(10)

+-----------------+----------------+-------------------+---------------+---------------+----------------+-----------------------+------------------------+--------------------------+-------------------+-------------+------------+-----------------+---------------+------------+--------------+--------+---------+-------------+---------------+----------------+-----------+--------------------+-----------------+--------------------+----------------+---------------+---------------+-----+--------------------+---------------------+-------------------------+-------------------+------------+-----------+----------------+--------------+-----------+-------------+--------------------+--------------------+
|animal_id_outcome|age_upon_outcome|      date_of_birth|outcome_subtype|   outcome_type|sex_upon_outcome|age_upon_outcome_(days)|age_upon_outcome_(years)|age_upon_outcome_age_group|   outcome_datetime|outcome_month|outcome_year|outcome_monthyear|outcome_weekday|outcome_hour|outcome_number|dob_year|dob

## Print THE DIFFERENT TYPES OF outcome_type

In [38]:
final_Data.select('outcome_type').distinct().show()

+---------------+
|   outcome_type|
+---------------+
|Return to Owner|
|     Euthanasia|
|           null|
|      Rto-Adopt|
|       Transfer|
|       Disposal|
|       Relocate|
|       Adoption|
|        Missing|
|           Died|
+---------------+



## Get the count of different types of outcome_type

In [39]:
final_Data.select('outcome_type').distinct().count()

10

## Which year has seen more number of animal outcomes

In [40]:
final_Data.groupby('outcome_year').count().sort(desc('count')).show(1)

+------------+-----+
|outcome_year|count|
+------------+-----+
|        2014|16700|
+------------+-----+
only showing top 1 row



## Which year has seen more number of animal intakes

In [41]:
final_Data.groupby('intake_year').count().sort(desc('count')).show(1)

+-----------+-----+
|intake_year|count|
+-----------+-----+
|       2014|16754|
+-----------+-----+
only showing top 1 row



## Check if there are null values in any of the columns

In [43]:
final_Data.select([count(when(col(c).isNull(), c)).alias(c) for c in final_Data.columns]).show()

+-----------------+----------------+-------------+---------------+------------+----------------+-----------------------+------------------------+--------------------------+----------------+-------------+------------+-----------------+---------------+------------+--------------+--------+---------+-------------+---------------+----------------+-----------+-----+-----+--------------+----------------+-----------+---------------+-----+--------------------+---------------------+-------------------------+---------------+------------+-----------+----------------+--------------+-----------+-------------+---------------+--------------------+
|animal_id_outcome|age_upon_outcome|date_of_birth|outcome_subtype|outcome_type|sex_upon_outcome|age_upon_outcome_(days)|age_upon_outcome_(years)|age_upon_outcome_age_group|outcome_datetime|outcome_month|outcome_year|outcome_monthyear|outcome_weekday|outcome_hour|outcome_number|dob_year|dob_month|dob_monthyear|age_upon_intake|animal_id_intake|animal_type|bree

## How many animals are in shelter for more than a year

In [44]:
final_Data.where("time_in_shelter_days > 365").count()

162

## Which type of animals are most visited to animal_shelter


In [46]:
final_Data.groupby('animal_type').count().show()

+-----------+-----+
|animal_type|count|
+-----------+-----+
|       Bird|  339|
|        Cat|28303|
|      Other| 4410|
|        Dog|38909|
+-----------+-----+



## find out how many wild life animals are taken in to shelter till now 

In [47]:
final_Data[final_Data["intake_type"]== "Wildlife"].count()

3457

## Which year the stray type animals intake is high (Hint: use
“intake_type","intake_year” columns)


In [48]:
final_Data[final_Data["intake_type"]== "Stray"].groupby("intake_year").count().toPandas()

Unnamed: 0,intake_year,count
0,2018,1865
1,2015,12278
2,2013,2567
3,2014,11776
4,2016,11038
5,2017,10990


## Which month in which year has got the highest number of intakes

In [49]:
final_Data.groupBy('intake_monthyear').count().orderBy('count', ascending=False).show(1)

+----------------+-----+
|intake_monthyear|count|
+----------------+-----+
|         2015-06| 1996|
+----------------+-----+
only showing top 1 row



## Take 30% of the data using sample function. (Hint: use fraction=0.30)

In [63]:
sample_df = final_Data.sample(fraction=0.30, seed=4646)
sample_df.count()

21545

## Remove the Rows whose sex_upon_intake is "UNKNOWN". 

In [64]:
data_after_remove = sample_df.filter("sex_upon_intake != 'UNKNOWN'")
data_after_remove.count()

21545

## Select the columns other than 

In [65]:
drop_cols = ['age_upon_outcome','date_of_birth','age_upon_outcome_(days)','outcome_datetime',
             'outcome_month','outcome_year','outcome_monthyear','dob_year','dob_month','dob_monthyear',
             'animal_id_intake','count','age_upon_intake_days','intake_datetime','intake_monthyear','intake_hour']
sample_drop_cols = data_after_remove.drop(*drop_cols)

In [67]:
sample_drop_cols.columns

['animal_id_outcome',
 'outcome_subtype',
 'outcome_type',
 'sex_upon_outcome',
 'age_upon_outcome_(years)',
 'age_upon_outcome_age_group',
 'outcome_weekday',
 'outcome_hour',
 'outcome_number',
 'age_upon_intake',
 'animal_type',
 'breed',
 'color',
 'found_location',
 'intake_condition',
 'intake_type',
 'sex_upon_intake',
 'age_upon_intake_years',
 'age_upon_intake_age_group',
 'intake_month',
 'intake_year',
 'intake_weekday',
 'intake_number',
 'time_in_shelter',
 'time_in_shelter_days']

In [68]:
sample_drop_cols_2 = sample_drop_cols.drop('time_in_shelter', 'age_upon_intake')

In [69]:
print sample_drop_cols_2.count()
sample_drop_cols_2.select('animal_id_outcome').distinct().count()

21545


21545

In [70]:
sample_drop_cols_2.select('age_upon_outcome_age_group').distinct().count()

9

In [71]:
sample_drop_cols_2.select([count(when(col(c).isNull(), c)).alias(c) for c in sample_drop_cols_2.columns]).show()

+-----------------+---------------+------------+----------------+------------------------+--------------------------+---------------+------------+--------------+-----------+-----+-----+--------------+----------------+-----------+---------------+---------------------+-------------------------+------------+-----------+--------------+-------------+--------------------+
|animal_id_outcome|outcome_subtype|outcome_type|sex_upon_outcome|age_upon_outcome_(years)|age_upon_outcome_age_group|outcome_weekday|outcome_hour|outcome_number|animal_type|breed|color|found_location|intake_condition|intake_type|sex_upon_intake|age_upon_intake_years|age_upon_intake_age_group|intake_month|intake_year|intake_weekday|intake_number|time_in_shelter_days|
+-----------------+---------------+------------+----------------+------------------------+--------------------------+---------------+------------+--------------+-----------+-----+-----+--------------+----------------+-----------+---------------+-----------------

In [72]:
sample_drop_3 = sample_drop_cols_2.drop('outcome_subtype')

In [73]:
sample_drop_3.select([count(when(col(c).isNull(), c)).alias(c) for c in sample_drop_3.columns]).show()

+-----------------+------------+----------------+------------------------+--------------------------+---------------+------------+--------------+-----------+-----+-----+--------------+----------------+-----------+---------------+---------------------+-------------------------+------------+-----------+--------------+-------------+--------------------+
|animal_id_outcome|outcome_type|sex_upon_outcome|age_upon_outcome_(years)|age_upon_outcome_age_group|outcome_weekday|outcome_hour|outcome_number|animal_type|breed|color|found_location|intake_condition|intake_type|sex_upon_intake|age_upon_intake_years|age_upon_intake_age_group|intake_month|intake_year|intake_weekday|intake_number|time_in_shelter_days|
+-----------------+------------+----------------+------------------------+--------------------------+---------------+------------+--------------+-----------+-----+-----+--------------+----------------+-----------+---------------+---------------------+-------------------------+------------+----

## Remove any null values present in the Data. Check the nulls in all columns. All of them should be equal to zero. 

In [74]:
print sample_drop_3.count()
sample_drop_4 = sample_drop_3.na.drop()
print sample_drop_4.count()

21545
21544


## Drop the animal_id_outcome column. Check the number of columns in the data.

In [75]:
sample_drop_5 = sample_drop_4.drop('animal_id_outcome')
len(sample_drop_5.columns)

21

## Split the Data into train and test

In [76]:
train, validation = sample_drop_5.randomSplit([0.7, 0.3])
print("There are %d training examples and %d test examples." %
      (train.count(), validation.count()))

There are 14957 training examples and 6587 test examples.


In [80]:
 target_col = "time_in_shelter"

In [81]:
dtypes_list = final_Data.dtypes
dtypes_list


[('animal_id_outcome', 'string'),
 ('age_upon_outcome', 'string'),
 ('date_of_birth', 'timestamp'),
 ('outcome_subtype', 'string'),
 ('outcome_type', 'string'),
 ('sex_upon_outcome', 'string'),
 ('age_upon_outcome_(days)', 'int'),
 ('age_upon_outcome_(years)', 'double'),
 ('age_upon_outcome_age_group', 'string'),
 ('outcome_datetime', 'timestamp'),
 ('outcome_month', 'int'),
 ('outcome_year', 'int'),
 ('outcome_monthyear', 'string'),
 ('outcome_weekday', 'string'),
 ('outcome_hour', 'int'),
 ('outcome_number', 'double'),
 ('dob_year', 'int'),
 ('dob_month', 'int'),
 ('dob_monthyear', 'string'),
 ('age_upon_intake', 'string'),
 ('animal_id_intake', 'string'),
 ('animal_type', 'string'),
 ('breed', 'string'),
 ('color', 'string'),
 ('found_location', 'string'),
 ('intake_condition', 'string'),
 ('intake_type', 'string'),
 ('sex_upon_intake', 'string'),
 ('count', 'int'),
 ('age_upon_intake_days', 'int'),
 ('age_upon_intake_years', 'double'),
 ('age_upon_intake_age_group', 'string'),
 ('i

In [83]:
cat_cols = [each_val[0] for each_val in dtypes_list
            if each_val[1] == 'string' and
               each_val[0] not in   target_col]


## Using Spark ML pipeline, build regression models and Tune the models.
HINT: Use setHandleInvalid(“Keep”). 

In [None]:
from pyspark.ml.feature import VectorAssembler

num_cols_assembler = VectorAssembler(inputCols = num_cols,
                                     outputCol = 'num_cols_vector')