In [0]:
file_location = "/FileStore/tables/data/full_employee.csv"
file_type = "csv"

emp_df = spark.read.csv(file_location, header=True, inferSchema=True)

display(emp_df.limit(20))

emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department
10001,Staff,1953-07-25,Hideyuki,Zallocco,M,1990-04-28,60117,Development
10002,Engineer,1954-11-18,Byong,Delgrande,F,1991-09-07,65828,Sales
10003,Engineer,1958-01-30,Berry,Babb,F,1992-03-21,40006,Production
10004,Staff,1957-09-28,Xiong,Verhoeff,M,1987-11-26,40054,Production
10005,Senior Engineer,1952-10-28,Abdelkader,Baumann,F,1991-01-18,78228,Human Resources
10006,Senior Engineer,1959-10-30,Eran,Cusworth,M,1986-11-14,40000,Development
10007,Staff,1957-04-14,Christoph,Parfitt,M,1991-06-28,56724,Research
10008,Senior Staff,1964-11-17,Xudong,Samarati,M,1985-11-13,46671,Development
10009,Engineer,1962-12-18,Lihong,Magliocco,M,1993-10-23,60929,Quality Management
10010,Senior Staff,1956-04-24,Kwangyoen,Speek,F,1993-02-14,72488,Production


In [0]:
def shape(df):
    row_num = df.count()
    col_num = len(df.columns)
    return [row_num, col_num]

shape(emp_df)

Out[90]: [331603, 9]

In [0]:
emp_df.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- emp_level: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- salary: integer (nullable = true)
 |-- department: string (nullable = true)



In [0]:
from pyspark.sql.functions import *

null_counter = emp_df.select([
    sum(when(col(coll).isNull(), 1).otherwise(0)).alias(coll) for coll in emp_df.columns
])

null_counter.show()

+------+---------+----------+----------+---------+---+---------+------+----------+
|emp_no|emp_level|birth_date|first_name|last_name|sex|hire_date|salary|department|
+------+---------+----------+----------+---------+---+---------+------+----------+
|     0|        0|         0|         0|        0|  0|        0|     0|         0|
+------+---------+----------+----------+---------+---+---------+------+----------+



BASIC ML

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
#transform strings to int

ind = StringIndexer(inputCols =['emp_level','sex', "department"], outputCols = ['level_indexed', 'sex_indexed', 'dept_indexed'])

emp_df = ind.fit(emp_df).transform(emp_df)

In [0]:
display(emp_df.limit(20))

emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department,level_indexed,sex_indexed,dept_indexed
10001,Staff,1953-07-25,Hideyuki,Zallocco,M,1990-04-28,60117,Development,0.0,0.0,0.0
10002,Engineer,1954-11-18,Byong,Delgrande,F,1991-09-07,65828,Sales,2.0,1.0,2.0
10003,Engineer,1958-01-30,Berry,Babb,F,1992-03-21,40006,Production,2.0,1.0,1.0
10004,Staff,1957-09-28,Xiong,Verhoeff,M,1987-11-26,40054,Production,0.0,0.0,1.0
10005,Senior Engineer,1952-10-28,Abdelkader,Baumann,F,1991-01-18,78228,Human Resources,1.0,1.0,7.0
10006,Senior Engineer,1959-10-30,Eran,Cusworth,M,1986-11-14,40000,Development,1.0,0.0,0.0
10007,Staff,1957-04-14,Christoph,Parfitt,M,1991-06-28,56724,Research,0.0,0.0,4.0
10008,Senior Staff,1964-11-17,Xudong,Samarati,M,1985-11-13,46671,Development,3.0,0.0,0.0
10009,Engineer,1962-12-18,Lihong,Magliocco,M,1993-10-23,60929,Quality Management,2.0,0.0,6.0
10010,Senior Staff,1956-04-24,Kwangyoen,Speek,F,1993-02-14,72488,Production,3.0,1.0,1.0


In [0]:
emp_df.dtypes

Out[95]: [('emp_no', 'int'),
 ('emp_level', 'string'),
 ('birth_date', 'date'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('sex', 'string'),
 ('hire_date', 'date'),
 ('salary', 'int'),
 ('department', 'string'),
 ('level_indexed', 'double'),
 ('sex_indexed', 'double'),
 ('dept_indexed', 'double')]

In [0]:
emp_df = emp_df.select(
    "*",
    year(col('birth_date')).alias("birth_year"),
    year(col('hire_date')).alias("hire_year")    
)

In [0]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.functions import vector_to_array
#apply MinMax Scaler for year figures

year_assembler = VectorAssembler(inputCols =["birth_year", "hire_year"], outputCol='year_union')
emp_df = year_assembler.transform(emp_df)

sc = MinMaxScaler(max=5.0, inputCol='year_union', outputCol = 'scaled_year')
emp_df = sc.fit(emp_df).transform(emp_df)

display(emp_df.limit(20))

emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department,level_indexed,sex_indexed,dept_indexed,birth_year,hire_year,year_union,scaled_year
10001,Staff,1953-07-25,Hideyuki,Zallocco,M,1990-04-28,60117,Development,0.0,0.0,0.0,1953,1990,"Map(vectorType -> dense, length -> 2, values -> List(1953.0, 1990.0))","Map(vectorType -> dense, length -> 2, values -> List(0.38461538461538464, 1.6666666666666665))"
10002,Engineer,1954-11-18,Byong,Delgrande,F,1991-09-07,65828,Sales,2.0,1.0,2.0,1954,1991,"Map(vectorType -> dense, length -> 2, values -> List(1954.0, 1991.0))","Map(vectorType -> dense, length -> 2, values -> List(0.7692307692307693, 2.0))"
10003,Engineer,1958-01-30,Berry,Babb,F,1992-03-21,40006,Production,2.0,1.0,1.0,1958,1992,"Map(vectorType -> dense, length -> 2, values -> List(1958.0, 1992.0))","Map(vectorType -> dense, length -> 2, values -> List(2.307692307692308, 2.333333333333333))"
10004,Staff,1957-09-28,Xiong,Verhoeff,M,1987-11-26,40054,Production,0.0,0.0,1.0,1957,1987,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1987.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.6666666666666666))"
10005,Senior Engineer,1952-10-28,Abdelkader,Baumann,F,1991-01-18,78228,Human Resources,1.0,1.0,7.0,1952,1991,"Map(vectorType -> dense, length -> 2, values -> List(1952.0, 1991.0))","Map(vectorType -> dense, length -> 2, values -> List(0.0, 2.0))"
10006,Senior Engineer,1959-10-30,Eran,Cusworth,M,1986-11-14,40000,Development,1.0,0.0,0.0,1959,1986,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1986.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.3333333333333333))"
10007,Staff,1957-04-14,Christoph,Parfitt,M,1991-06-28,56724,Research,0.0,0.0,4.0,1957,1991,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1991.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 2.0))"
10008,Senior Staff,1964-11-17,Xudong,Samarati,M,1985-11-13,46671,Development,3.0,0.0,0.0,1964,1985,"Map(vectorType -> dense, length -> 2, values -> List(1964.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(4.615384615384616, 0.0))"
10009,Engineer,1962-12-18,Lihong,Magliocco,M,1993-10-23,60929,Quality Management,2.0,0.0,6.0,1962,1993,"Map(vectorType -> dense, length -> 2, values -> List(1962.0, 1993.0))","Map(vectorType -> dense, length -> 2, values -> List(3.8461538461538463, 2.6666666666666665))"
10010,Senior Staff,1956-04-24,Kwangyoen,Speek,F,1993-02-14,72488,Production,3.0,1.0,1.0,1956,1993,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1993.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 2.6666666666666665))"


In [0]:
emp_df = emp_df.withColumn(
    "scaled_birth",
    vector_to_array(col("scaled_year"))[0]
).withColumn(
    "scaled_hire",
    vector_to_array(col("scaled_year"))[0]
)

In [0]:
emp_df.columns

Out[99]: ['emp_no',
 'emp_level',
 'birth_date',
 'first_name',
 'last_name',
 'sex',
 'hire_date',
 'salary',
 'department',
 'level_indexed',
 'sex_indexed',
 'dept_indexed',
 'birth_year',
 'hire_year',
 'year_union',
 'scaled_year',
 'scaled_birth',
 'scaled_hire']

In [0]:
#assemble all the prediction features

assembler = VectorAssembler(inputCols = ['level_indexed', "sex_indexed", "dept_indexed", "scaled_birth", "scaled_hire"],
                            outputCol = "predictors")


emp_df = assembler.transform(emp_df)

In [0]:
emp_df = emp_df.sort(["hire_date","birth_date"], asc=False)  #sort the data by hire date and birth date

print("Shape before duplicates dropped", shape(emp_df))

Shape before duplicates dropped [331603, 19]


In [0]:
display(emp_df.sort(["hire_date","birth_date"], asc=False).limit(20))

emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department,level_indexed,sex_indexed,dept_indexed,birth_year,hire_year,year_union,scaled_year,scaled_birth,scaled_hire,predictors
217245,Manager,1953-06-24,Shirish,Ossenbruggen,F,1985-01-01,68318,Finance,6.0,1.0,8.0,1953,1985,"Map(vectorType -> dense, length -> 2, values -> List(1953.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(0.38461538461538464, 0.0))",0.3846153846153846,0.3846153846153846,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 8.0, 0.38461538461538464, 0.38461538461538464))"
415921,Manager,1954-10-05,Tonny,Butterworth,F,1985-01-01,40000,Research,6.0,1.0,4.0,1954,1985,"Map(vectorType -> dense, length -> 2, values -> List(1954.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(0.7692307692307693, 0.0))",0.7692307692307693,0.7692307692307693,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 4.0, 0.7692307692307693, 0.7692307692307693))"
276153,Manager,1956-06-08,Krassimir,Wegerle,F,1985-01-01,41491,Finance,6.0,1.0,8.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 8.0, 1.5384615384615385, 1.5384615384615385))"
276153,Manager,1956-06-08,Krassimir,Wegerle,F,1985-01-01,41491,Sales,6.0,1.0,2.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 2.0, 1.5384615384615385, 1.5384615384615385))"
69075,Manager,1956-09-12,Margareta,Markovitch,M,1985-01-01,44154,Finance,6.0,0.0,8.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 8.0, 1.5384615384615385, 1.5384615384615385))"
69075,Manager,1956-09-12,Margareta,Markovitch,M,1985-01-01,44154,Human Resources,6.0,0.0,7.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 7.0, 1.5384615384615385, 1.5384615384615385))"
94810,Manager,1957-07-08,DeForest,Hagimont,M,1985-01-01,64508,Production,6.0,0.0,1.0,1957,1985,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.0))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 1.0, 1.9230769230769231, 1.9230769230769231))"
94810,Manager,1957-07-08,DeForest,Hagimont,M,1985-01-01,64508,Development,6.0,0.0,0.0,1957,1985,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.0))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 1.9230769230769231, 1.9230769230769231))"
469466,Manager,1959-10-28,Ebru,Alpin,M,1985-01-01,40000,Development,6.0,0.0,0.0,1959,1985,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.0))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 2.6923076923076925, 2.6923076923076925))"
424772,Manager,1959-11-09,Arie,Staelin,M,1985-01-01,48747,Development,6.0,0.0,0.0,1959,1985,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.0))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 2.6923076923076925, 2.6923076923076925))"


In [0]:
display(emp_df.limit(10))

emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department,level_indexed,sex_indexed,dept_indexed,birth_year,hire_year,year_union,scaled_year,scaled_birth,scaled_hire,predictors
217245,Manager,1953-06-24,Shirish,Ossenbruggen,F,1985-01-01,68318,Finance,6.0,1.0,8.0,1953,1985,"Map(vectorType -> dense, length -> 2, values -> List(1953.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(0.38461538461538464, 0.0))",0.3846153846153846,0.3846153846153846,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 8.0, 0.38461538461538464, 0.38461538461538464))"
415921,Manager,1954-10-05,Tonny,Butterworth,F,1985-01-01,40000,Research,6.0,1.0,4.0,1954,1985,"Map(vectorType -> dense, length -> 2, values -> List(1954.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(0.7692307692307693, 0.0))",0.7692307692307693,0.7692307692307693,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 4.0, 0.7692307692307693, 0.7692307692307693))"
276153,Manager,1956-06-08,Krassimir,Wegerle,F,1985-01-01,41491,Sales,6.0,1.0,2.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 2.0, 1.5384615384615385, 1.5384615384615385))"
276153,Manager,1956-06-08,Krassimir,Wegerle,F,1985-01-01,41491,Finance,6.0,1.0,8.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 1.0, 8.0, 1.5384615384615385, 1.5384615384615385))"
69075,Manager,1956-09-12,Margareta,Markovitch,M,1985-01-01,44154,Human Resources,6.0,0.0,7.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 7.0, 1.5384615384615385, 1.5384615384615385))"
69075,Manager,1956-09-12,Margareta,Markovitch,M,1985-01-01,44154,Finance,6.0,0.0,8.0,1956,1985,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 0.0))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 8.0, 1.5384615384615385, 1.5384615384615385))"
94810,Manager,1957-07-08,DeForest,Hagimont,M,1985-01-01,64508,Production,6.0,0.0,1.0,1957,1985,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.0))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 1.0, 1.9230769230769231, 1.9230769230769231))"
94810,Manager,1957-07-08,DeForest,Hagimont,M,1985-01-01,64508,Development,6.0,0.0,0.0,1957,1985,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.0))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 1.9230769230769231, 1.9230769230769231))"
469466,Manager,1959-10-28,Ebru,Alpin,M,1985-01-01,40000,Development,6.0,0.0,0.0,1959,1985,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.0))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 2.6923076923076925, 2.6923076923076925))"
424772,Manager,1959-11-09,Arie,Staelin,M,1985-01-01,48747,Development,6.0,0.0,0.0,1959,1985,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1985.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.0))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 0.0, 0.0, 2.6923076923076925, 2.6923076923076925))"


In [0]:
emp_df = emp_df.distinct() #remove any duplicates
print("Shape after duplicates dropped", shape(emp_df))

display(emp_df.limit(20))

Shape after duplicates dropped [331603, 19]


emp_no,emp_level,birth_date,first_name,last_name,sex,hire_date,salary,department,level_indexed,sex_indexed,dept_indexed,birth_year,hire_year,year_union,scaled_year,scaled_birth,scaled_hire,predictors
10010,Senior Staff,1956-04-24,Kwangyoen,Speek,F,1993-02-14,72488,Production,3.0,1.0,1.0,1956,1993,"Map(vectorType -> dense, length -> 2, values -> List(1956.0, 1993.0))","Map(vectorType -> dense, length -> 2, values -> List(1.5384615384615385, 2.6666666666666665))",1.5384615384615383,1.5384615384615383,"Map(vectorType -> dense, length -> 5, values -> List(3.0, 1.0, 1.0, 1.5384615384615385, 1.5384615384615385))"
10011,Engineer,1961-12-31,Shuichi,Tyugu,F,1995-01-17,42365,Customer Service,2.0,1.0,3.0,1961,1995,"Map(vectorType -> dense, length -> 2, values -> List(1961.0, 1995.0))","Map(vectorType -> dense, length -> 2, values -> List(3.4615384615384617, 3.333333333333333))",3.4615384615384617,3.4615384615384617,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 1.0, 3.0, 3.4615384615384617, 3.4615384615384617))"
10013,Senior Engineer,1961-08-30,Perry,Lorho,F,1991-08-03,40000,Human Resources,1.0,1.0,7.0,1961,1991,"Map(vectorType -> dense, length -> 2, values -> List(1961.0, 1991.0))","Map(vectorType -> dense, length -> 2, values -> List(3.4615384615384617, 2.0))",3.4615384615384617,3.4615384615384617,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 1.0, 7.0, 3.4615384615384617, 3.4615384615384617))"
10016,Technique Leader,1963-07-12,Bilhanan,Wuwongse,M,1993-10-06,70889,Sales,4.0,0.0,2.0,1963,1993,"Map(vectorType -> dense, length -> 2, values -> List(1963.0, 1993.0))","Map(vectorType -> dense, length -> 2, values -> List(4.230769230769231, 2.6666666666666665))",4.230769230769231,4.230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(4.0, 0.0, 2.0, 4.230769230769231, 4.230769230769231))"
10004,Staff,1957-09-28,Xiong,Verhoeff,M,1987-11-26,40054,Production,0.0,0.0,1.0,1957,1987,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1987.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 0.6666666666666666))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.0, 1.0, 1.9230769230769231, 1.9230769230769231))"
10018,Engineer,1957-12-11,Naftali,Dulli,M,1993-06-06,55881,Production,2.0,0.0,1.0,1957,1993,"Map(vectorType -> dense, length -> 2, values -> List(1957.0, 1993.0))","Map(vectorType -> dense, length -> 2, values -> List(1.9230769230769231, 2.6666666666666665))",1.9230769230769231,1.9230769230769231,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.0, 1.0, 1.9230769230769231, 1.9230769230769231))"
10003,Engineer,1958-01-30,Berry,Babb,F,1992-03-21,40006,Production,2.0,1.0,1.0,1958,1992,"Map(vectorType -> dense, length -> 2, values -> List(1958.0, 1992.0))","Map(vectorType -> dense, length -> 2, values -> List(2.307692307692308, 2.333333333333333))",2.307692307692308,2.307692307692308,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 1.0, 1.0, 2.307692307692308, 2.307692307692308))"
10002,Engineer,1954-11-18,Byong,Delgrande,F,1991-09-07,65828,Sales,2.0,1.0,2.0,1954,1991,"Map(vectorType -> dense, length -> 2, values -> List(1954.0, 1991.0))","Map(vectorType -> dense, length -> 2, values -> List(0.7692307692307693, 2.0))",0.7692307692307693,0.7692307692307693,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 1.0, 2.0, 0.7692307692307693, 0.7692307692307693))"
10015,Engineer,1959-05-19,Bojan,Zallocco,M,1986-10-14,40000,Research,2.0,0.0,4.0,1959,1986,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1986.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.3333333333333333))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 0.0, 4.0, 2.6923076923076925, 2.6923076923076925))"
10006,Senior Engineer,1959-10-30,Eran,Cusworth,M,1986-11-14,40000,Development,1.0,0.0,0.0,1959,1986,"Map(vectorType -> dense, length -> 2, values -> List(1959.0, 1986.0))","Map(vectorType -> dense, length -> 2, values -> List(2.6923076923076925, 0.3333333333333333))",2.6923076923076925,2.6923076923076925,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 0.0, 0.0, 2.6923076923076925, 2.6923076923076925))"


MODEL DEVELOPEMENT

In [0]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor

In [0]:
emp_df.groupBy("hire_year").count().orderBy('count', ascending=False).show()

+---------+-----+
|hire_year|count|
+---------+-----+
|     1986|40004|
|     1985|39069|
|     1987|36984|
|     1988|34791|
|     1989|31343|
|     1990|28295|
|     1991|24921|
|     1992|22563|
|     1993|19657|
|     1994|16303|
|     1995|13403|
|     1996|10615|
|     1997| 7371|
|     1998| 4611|
|     1999| 1658|
|     2000|   15|
+---------+-----+



In [0]:
emp_df.groupBy("birth_year").count().orderBy('count', ascending=False).show()

+----------+-----+
|birth_year|count|
+----------+-----+
|      1959|25785|
|      1954|25756|
|      1958|25706|
|      1960|25608|
|      1955|25522|
|      1961|25487|
|      1963|25484|
|      1956|25478|
|      1962|25398|
|      1964|25343|
|      1953|25294|
|      1957|25215|
|      1952|23401|
|      1965| 2126|
+----------+-----+



In [0]:
#To prevent bias and data leakages
train_data = emp_df.filter(col("hire_year") < 1996)
train_data = train_data.select("predictors", 'salary')

train_data.show()

+--------------------+------+
|          predictors|salary|
+--------------------+------+
|[3.0,1.0,1.0,1.53...| 72488|
|[2.0,1.0,3.0,3.46...| 42365|
|[1.0,1.0,7.0,3.46...| 40000|
|[4.0,0.0,2.0,4.23...| 70889|
|[0.0,0.0,1.0,1.92...| 40054|
|[2.0,0.0,1.0,1.92...| 55881|
|[2.0,1.0,1.0,2.30...| 40006|
|[2.0,1.0,2.0,0.76...| 65828|
|[2.0,0.0,4.0,2.69...| 40000|
| (5,[0,2],[4.0,4.0])| 44276|
|[1.0,0.0,0.0,2.69...| 40000|
|[3.0,1.0,6.0,1.53...| 72488|
|[0.0,1.0,0.0,2.30...| 46168|
|[0.0,0.0,4.0,1.92...| 56724|
|[2.0,0.0,6.0,3.84...| 60929|
|[3.0,0.0,0.0,4.61...| 46671|
|[1.0,1.0,7.0,0.0,...| 78228|
|[1.0,0.0,0.0,3.46...| 40000|
|[0.0,0.0,5.0,4.61...| 71380|
|[2.0,0.0,0.0,1.92...| 55881|
+--------------------+------+
only showing top 20 rows



In [0]:
test_data = emp_df.filter(col("hire_year") >= 1996)
test_data = test_data.select("predictors", "salary")
test_data.show()

+--------------------+------+
|          predictors|salary|
+--------------------+------+
|[2.0,1.0,0.0,0.76...| 44978|
|[2.0,0.0,0.0,2.69...| 42284|
|[2.0,0.0,8.0,0.38...| 48041|
|       (5,[0],[3.0])| 54615|
|(5,[3,4],[3.46153...| 70303|
|[1.0,1.0,1.0,3.46...| 40000|
|[0.0,1.0,6.0,1.53...| 40000|
|[2.0,1.0,7.0,2.30...| 46836|
|[1.0,1.0,1.0,3.07...| 47585|
|[2.0,0.0,6.0,2.69...| 40000|
|(5,[3,4],[3.84615...| 53977|
|[2.0,0.0,2.0,3.84...| 66415|
| (5,[0,2],[3.0,4.0])| 54615|
|[5.0,0.0,0.0,3.84...| 40000|
|[3.0,0.0,6.0,3.46...| 40000|
|(5,[3,4],[3.84615...| 40000|
|[2.0,0.0,0.0,4.61...| 47280|
|[2.0,0.0,0.0,2.69...| 40000|
|[0.0,1.0,1.0,1.53...| 40000|
|[2.0,1.0,0.0,1.92...| 48857|
+--------------------+------+
only showing top 20 rows



In [0]:
rnd_reg = RandomForestRegressor(featuresCol='predictors', labelCol='salary')
rnd_reg = rnd_reg.fit(train_data)

In [0]:
rnd_preds = rnd_reg.transform(test_data)
rnd_preds.show()

+--------------------+------+------------------+
|          predictors|salary|        prediction|
+--------------------+------+------------------+
|[4.0,1.0,2.0,0.0,...| 85292| 67895.47119197769|
|[1.0,0.0,0.0,0.38...| 40000| 48973.83611162548|
|[4.0,1.0,4.0,0.76...| 40000|48803.166496493024|
|[2.0,1.0,0.0,0.76...| 40000| 49059.91280575292|
|[0.0,1.0,1.0,0.76...| 41716| 48483.99061025144|
|[0.0,1.0,0.0,0.76...| 41716| 48483.99061025144|
|[0.0,0.0,2.0,1.15...| 76801| 68426.60490837465|
|[2.0,0.0,4.0,1.15...| 74934| 48843.58123945023|
|[1.0,1.0,2.0,1.15...| 80779| 68537.42626531437|
|[0.0,1.0,2.0,1.53...|108341| 69001.59514485208|
|[3.0,1.0,1.0,1.53...| 44737| 48704.86431561297|
|[0.0,0.0,4.0,1.92...| 53048|48731.088173065174|
|[0.0,1.0,1.0,1.92...| 60157|48612.845065926755|
|[0.0,1.0,2.0,2.30...| 53882| 68888.21168683951|
|[2.0,0.0,1.0,2.30...| 44055| 49160.76418723038|
|[1.0,0.0,0.0,2.30...| 43549| 48896.23947559446|
|[2.0,1.0,4.0,2.69...| 40164| 49239.56492029775|
|[0.0,0.0,2.0,2.69..

Train with Linear Regression

In [0]:
linear = LinearRegression(featuresCol='predictors', labelCol='salary')
linear = linear.fit(train_data)

In [0]:
linear_preds = linear.transform(test_data)
linear_preds.show()

+--------------------+------+------------------+
|          predictors|salary|        prediction|
+--------------------+------+------------------+
|[4.0,1.0,2.0,0.0,...| 85292| 52753.78109176191|
|[1.0,0.0,0.0,0.38...| 40000|52137.733613673656|
|[4.0,1.0,4.0,0.76...| 40000| 53417.16714255408|
|[2.0,1.0,0.0,0.76...| 40000|  52130.9365282247|
|[0.0,1.0,1.0,0.76...| 41716| 52478.36470837124|
|[0.0,1.0,0.0,0.76...| 41716| 52151.63294947606|
|[0.0,0.0,2.0,1.15...| 76801|  52811.4678750915|
|[2.0,0.0,4.0,1.15...| 74934|  53444.2349716305|
|[1.0,1.0,2.0,1.15...| 80779|52799.709523141646|
|[0.0,1.0,2.0,1.53...|108341| 52815.01900026822|
|[3.0,1.0,1.0,1.53...| 44737|   52457.242609496|
|[0.0,0.0,4.0,1.92...| 53048| 53474.85392588366|
|[0.0,1.0,1.0,1.92...| 60157|52493.248507873934|
|[0.0,1.0,2.0,2.30...| 53882|52824.941533270015|
|[2.0,0.0,1.0,2.30...| 44055| 52478.92349444765|
|[1.0,0.0,0.0,2.30...| 43549| 52162.53994617815|
|[2.0,1.0,4.0,2.69...| 40164|53462.669896309926|
|[0.0,0.0,2.0,2.69..

METRICS AND EVALUATION

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

mse_evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="mse")
mae_evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="mae")

rnd_mse = mse_evaluator.evaluate(rnd_preds)
rnd_mae = mae_evaluator.evaluate(rnd_preds)

print(f"Random Forest Mean Squared Error:, {int(rnd_mse):,}")
print("Random Forest Mean Absolute Error: {:,}".format(int(rnd_mae)))

Random Forest Mean Squared Error:, 136,852,192
Random Forest Mean Absolute Error: 9,412


In [0]:
mse_evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="mse")
mae_evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="mae")

linear_mse = mse_evaluator.evaluate(linear_preds)
linear_mae = mae_evaluator.evaluate(linear_preds)

print(f"Linear Regression Mean Squared Error:, {int(linear_mse):,}")
print("Linear Regression Mean Absolute Error: {:,}".format(int(linear_mae)))

Linear Regression Mean Squared Error:, 206,447,368
Linear Regression Mean Absolute Error: 11,799


random forest regressor model perfoms better than the linear regression

Testing other pyspark regression models

In [0]:
from pyspark.ml.regression import AFTSurvivalRegression, GBTRegressor, FMRegressor, DecisionTreeRegressor

In [0]:
aft_model = AFTSurvivalRegression(featuresCol='predictors', labelCol='salary')
aft_model = aft_model.fit(train_data)

aft_preds = aft_model.transform(test_data)
aft_preds.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-4369324742592617>:2[0m
[1;32m      1[0m aft_model [38;5;241m=[39m AFTSurvivalRegression(featuresCol[38;5;241m=[39m[38;5;124m'[39m[38;5;124mpredictors[39m[38;5;124m'[39m, labelCol[38;5;241m=[39m[38;5;124m'[39m[38;5;124msalary[39m[38;5;124m'[39m)
[0;32m----> 2[0m aft_model [38;5;241m=[39m aft_model[38;5;241m.[39mfit(train_data)
[1;32m      4[0m aft_preds [38;5;241m=[39m aft_model[38;5;241m.[39mtransform(test_data)
[1;32m      5[0m aft_preds[38;5;241m.[39mshow()

File [0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39

In [0]:
gb_model = GBTRegressor(featuresCol='predictors', labelCol='salary')
gb_model = gb_model.fit(train_data)

gb_preds = gb_model.transform(test_data)
gb_preds.show()

+--------------------+------+------------------+
|          predictors|salary|        prediction|
+--------------------+------+------------------+
|[4.0,1.0,2.0,0.0,...| 85292| 70473.75006753167|
|[1.0,0.0,0.0,0.38...| 40000|48648.794502646044|
|[4.0,1.0,4.0,0.76...| 40000|48437.098027414504|
|[2.0,1.0,0.0,0.76...| 40000|49008.263003382745|
|[0.0,1.0,1.0,0.76...| 41716|48685.481707777086|
|[0.0,1.0,0.0,0.76...| 41716| 48524.33836808741|
|[0.0,0.0,2.0,1.15...| 76801| 69768.68021143017|
|[2.0,0.0,4.0,1.15...| 74934| 48953.55134759175|
|[1.0,1.0,2.0,1.15...| 80779| 69711.43109851463|
|[0.0,1.0,2.0,1.53...|108341| 69996.44574838014|
|[3.0,1.0,1.0,1.53...| 44737| 48926.93556549842|
|[0.0,0.0,4.0,1.92...| 53048|48694.737439996345|
|[0.0,1.0,1.0,1.92...| 60157|48697.206326061234|
|[0.0,1.0,2.0,2.30...| 53882| 69886.12822589024|
|[2.0,0.0,1.0,2.30...| 44055| 48310.45747837725|
|[1.0,0.0,0.0,2.30...| 43549|  48772.2276804477|
|[2.0,1.0,4.0,2.69...| 40164| 49090.34988050985|
|[0.0,0.0,2.0,2.69..

In [0]:
fm_model = FMRegressor(featuresCol='predictors', labelCol='salary')
fm_model = fm_model.fit(train_data)

fm_preds = fm_model.transform(test_data)
fm_preds.show()

+--------------------+------+------------------+
|          predictors|salary|        prediction|
+--------------------+------+------------------+
|[4.0,1.0,2.0,0.0,...| 85292|  9639.47446786482|
|[1.0,0.0,0.0,0.38...| 40000|1151.5367331463622|
|[4.0,1.0,4.0,0.76...| 40000|27098.862189138526|
|[2.0,1.0,0.0,0.76...| 40000| 9724.479254843582|
|[0.0,1.0,1.0,0.76...| 41716| 5996.174336819629|
|[0.0,1.0,0.0,0.76...| 41716| 4462.357944149711|
|[0.0,0.0,2.0,1.15...| 76801| 5135.482425173914|
|[2.0,0.0,4.0,1.15...| 74934|14851.179371093123|
|[1.0,1.0,2.0,1.15...| 80779|15363.958735207645|
|[0.0,1.0,2.0,1.53...|108341|15610.992676347785|
|[3.0,1.0,1.0,1.53...| 44737| 26471.21785448063|
|[0.0,0.0,4.0,1.92...| 53048|15185.344072978198|
|[0.0,1.0,1.0,1.92...| 60157|17788.450634967547|
|[0.0,1.0,2.0,2.30...| 53882|25907.926698481268|
|[2.0,0.0,1.0,2.30...| 44055|21850.978354083138|
|[1.0,0.0,0.0,2.30...| 43549| 14503.23309816224|
|[2.0,1.0,4.0,2.69...| 40164| 53995.69132552551|
|[0.0,0.0,2.0,2.69..

In [0]:
tree_model = DecisionTreeRegressor(featuresCol='predictors', labelCol='salary')
tree_model = tree_model.fit(train_data)

tree_preds = tree_model.transform(test_data)
tree_preds.show()

+--------------------+------+------------------+
|          predictors|salary|        prediction|
+--------------------+------+------------------+
|[4.0,1.0,2.0,0.0,...| 85292| 70639.92797783934|
|[1.0,0.0,0.0,0.38...| 40000|48777.889420034204|
|[4.0,1.0,4.0,0.76...| 40000| 48671.15149802891|
|[2.0,1.0,0.0,0.76...| 40000|48777.889420034204|
|[0.0,1.0,1.0,0.76...| 41716| 48671.15149802891|
|[0.0,1.0,0.0,0.76...| 41716| 48671.15149802891|
|[0.0,0.0,2.0,1.15...| 76801| 69920.29964252486|
|[2.0,0.0,4.0,1.15...| 74934| 48986.27496483825|
|[1.0,1.0,2.0,1.15...| 80779| 69936.88119993183|
|[0.0,1.0,2.0,1.53...|108341| 69920.29964252486|
|[3.0,1.0,1.0,1.53...| 44737| 48671.15149802891|
|[0.0,0.0,4.0,1.92...| 53048| 48671.15149802891|
|[0.0,1.0,1.0,1.92...| 60157| 48671.15149802891|
|[0.0,1.0,2.0,2.30...| 53882| 69920.29964252486|
|[2.0,0.0,1.0,2.30...| 44055|48777.889420034204|
|[1.0,0.0,0.0,2.30...| 43549|48777.889420034204|
|[2.0,1.0,4.0,2.69...| 40164| 48986.27496483825|
|[0.0,0.0,2.0,2.69..

Evaluation and Metric

In [0]:
gb_mse = mse_evaluator.evaluate(gb_preds)
gb_mae = mae_evaluator.evaluate(gb_preds)

print(f"Gradient Boosted Tree Mean Squared Error:, {int(gb_mse):,}")
print("Gradient Boosted Tree Mean Absolute Error: {:,}".format(int(gb_mae)))

Gradient Boosted Tree Mean Squared Error:, 136,010,392
Gradient Boosted Tree Mean Absolute Error: 9,322


In [0]:
fm_mse = mse_evaluator.evaluate(fm_preds)
fm_mae = mae_evaluator.evaluate(fm_preds)

print(f"Factorization Algorithm Mean Squared Error:, {int(fm_mse):,}")
print("Factorization Algorithm Mean Absolute Error: {:,}".format(int(fm_mae)))

Factorization Algorithm Mean Squared Error:, 1,324,512,142
Factorization Algorithm Mean Absolute Error: 30,712


In [0]:
tree_mse = mse_evaluator.evaluate(tree_preds)
tree_mae = mae_evaluator.evaluate(tree_preds)

print(f"Decision Tree Mean Squared Error:, {int(tree_mse):,}")
print("Decision Tree Mean Absolute Error: {:,}".format(int(tree_mae)))

Decision Tree Mean Squared Error:, 135,881,360
Decision Tree Mean Absolute Error: 9,318


Above all and all, Decision Tree Model produced predictions that are more accurate than the other models