In [2]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('not basics').getOrCreate()

avg_df = spark.read.csv("./AvgSalary.csv", header=True)
salary_df = spark.read.csv("./Glassdoor Gender Pay Gap.csv", header=True, inferSchema=True)

salary_df = salary_df.na.drop()
salary_df = salary_df.withColumn("TotalSalary", salary_df.BasePay + salary_df.Bonus)
df = salary_df.join(avg_df, salary_df.JobTitle == avg_df.JobTitle, "inner")
df = df.withColumn("BelowAvg", df.TotalSalary < df.AvgSalary)

df_cols = df.columns
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) == 2]))

# rename by adding suffix '_duplicated'
for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

# rename the column in DF
df = df.toDF(*df_cols)

# remove flagged columns
cols_to_remove = [c for c in df_cols if '_duplicated' in c]
df = df.drop(*cols_to_remove)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/13 08:20:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [4]:
salary_df.summary().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+-------------------+------+------------------+------------------+---------+--------------+------------------+-----------------+------------------+------------------+
|summary|           JobTitle|Gender|               Age|          PerfEval|Education|          Dept|         Seniority|          BasePay|             Bonus|         CompanyID|
+-------+-------------------+------+------------------+------------------+---------+--------------+------------------+-----------------+------------------+------------------+
|  count|               1000|  1000|              1000|              1000|      989|           999|              1000|             1000|              1000|              1000|
|   mean|               null|  null|            41.393|             3.037|     null|          null|             2.971|        93994.653|          6506.161|           688.204|
| stddev|               null|  null|14.294855504477594|1.4239587980902595|     null|          null|1.3950287112117459|25530.7

                                                                                

In [1]:
salary_df.show()

NameError: name 'salary_df' is not defined

In [6]:
from pyspark.sql import functions as F
df_agg = salary_df.agg(*(F.count(F.when(F.isnull(c),c)).alias(c) for c in salary_df.columns))

df_agg.show()

+--------+------+---+--------+---------+----+---------+-------+-----+---------+
|JobTitle|Gender|Age|PerfEval|Education|Dept|Seniority|BasePay|Bonus|CompanyID|
+--------+------+---+--------+---------+----+---------+-------+-----+---------+
|       0|     0|  0|       0|       11|   1|        0|      0|    0|        0|
+--------+------+---+--------+---------+----+---------+-------+-----+---------+



In [12]:
salary_df.describe(['BasePay', 'Bonus']).show()

+-------+-----------------+------------------+
|summary|          BasePay|             Bonus|
+-------+-----------------+------------------+
|  count|             1000|              1000|
|   mean|        93994.653|          6506.161|
| stddev|25530.79670901769|2479.4433190236978|
|    min|            34208|               149|
|    max|           242105|             50000|
+-------+-----------------+------------------+



In [15]:
salary_df.filter("Gender=='Female'").show()

+-------------------+------+---+--------+-----------+--------------+---------+-------+-----+---------+
|           JobTitle|Gender|Age|PerfEval|  Education|          Dept|Seniority|BasePay|Bonus|CompanyID|
+-------------------+------+---+--------+-----------+--------------+---------+-------+-----+---------+
|            Manager|Female| 60|       4|        PhD|    Management|        3| 140614| 8354|      812|
|            Manager|Female| 62|       4|    Masters|         Sales|        4| 125203| 7808|        5|
|            Manager|Female| 45|       1|High School|   Engineering|        5| 129893| 5120|     1003|
|  Financial Analyst|Female| 47|       5|High School|         Sales|        5| 126190| 8961|      417|
|            Manager|Female| 62|       4|    Masters|Administration|        3| 106008| 6235|      884|
|            Manager|Female| 57|       1|    Masters|    Operations|        3| 145095| 3889|      590|
|            Manager|Female| 45|       2|High School|    Management|     

In [2]:
salary_df = salary_df.na.drop()
salary_df = salary_df.withColumn("TotalSalary", salary_df.BasePay + salary_df.Bonus)
df = salary_df.join(avg_df, salary_df.JobTitle == avg_df.JobTitle, "inner")
df = df.withColumn("BelowAvg", df.TotalSalary < df.AvgSalary)

df_cols = df.columns
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) == 2]))

# rename by adding suffix '_duplicated'
for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

# rename the column in DF
df = df.toDF(*df_cols)

# remove flagged columns
cols_to_remove = [c for c in df_cols if '_duplicated' in c]
df = df.drop(*cols_to_remove)

In [7]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- PerfEval: integer (nullable = true)
 |-- Seniority: integer (nullable = true)
 |-- BelowAvg: boolean (nullable = true)
 |-- JobEncoded: vector (nullable = true)
 |-- EducationEncoded: vector (nullable = true)
 |-- DeptEncoded: vector (nullable = true)



In [3]:
from pyspark.ml.feature import StringIndexer
job_indexer = StringIndexer(inputCol="JobTitle", outputCol="JobIndex")
edu_indexer = StringIndexer(inputCol="Education", outputCol="EducationIndex")
dept_indexer = StringIndexer(inputCol="Dept", outputCol="DeptIndex")
#below_indexer = StringIndexer(inputCol="BelowAvg", outputCol="BelowIndex")
df = job_indexer.fit(df).transform(df)
df = edu_indexer.fit(df).transform(df)
df = dept_indexer.fit(df).transform(df)
#df = below_indexer.fit(df).transform(df)

from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCols=["JobIndex", "EducationIndex", "DeptIndex"], 
                    outputCols=["JobEncoded", "EducationEncoded", "DeptEncoded"])
df = ohe.fit(df).transform(df)

df = df.drop("Gender", "Education", "Dept", "BasePay", "Bonus", "CompanyID", "TotalSalary", "JobTitle", "AvgSalary", "EducationIndex", "JobIndex", "DeptIndex")
df = df.withColumn("BelowAvg",df.BelowAvg.cast('int'))
df.show()

                                                                                

+---+--------+---------+--------+-------------+----------------+-------------+
|Age|PerfEval|Seniority|BelowAvg|   JobEncoded|EducationEncoded|  DeptEncoded|
+---+--------+---------+--------+-------------+----------------+-------------+
| 58|       1|        4|       0|(9,[8],[1.0])|       (3,[],[])|(4,[2],[1.0])|
| 59|       4|        5|       0|(9,[8],[1.0])|       (3,[],[])|(4,[1],[1.0])|
| 65|       4|        5|       0|(9,[5],[1.0])|   (3,[1],[1.0])|(4,[1],[1.0])|
| 55|       1|        5|       0|(9,[8],[1.0])|   (3,[0],[1.0])|(4,[2],[1.0])|
| 60|       4|        3|       0|(9,[8],[1.0])|       (3,[],[])|(4,[2],[1.0])|
| 59|       1|        5|       0|(9,[1],[1.0])|   (3,[2],[1.0])|(4,[1],[1.0])|
| 65|       4|        5|       0|(9,[1],[1.0])|   (3,[0],[1.0])|(4,[2],[1.0])|
| 60|       4|        3|       0|(9,[8],[1.0])|   (3,[2],[1.0])|(4,[2],[1.0])|
| 62|       5|        5|       0|(9,[8],[1.0])|   (3,[2],[1.0])|    (4,[],[])|
| 64|       2|        5|       0|(9,[1],[1.0])|     

In [4]:
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['Age','PerfEval','Seniority','JobEncoded','EducationEncoded','DeptEncoded'], outputCol='features')
output = assembler.transform(df)
final_data = output.select('features','BelowAvg')
final_data.show()

+--------------------+--------+
|            features|BelowAvg|
+--------------------+--------+
|(19,[0,1,2,11,17]...|       0|
|(19,[0,1,2,11,16]...|       0|
|(19,[0,1,2,8,13,1...|       0|
|(19,[0,1,2,11,12,...|       0|
|(19,[0,1,2,11,17]...|       0|
|(19,[0,1,2,4,14,1...|       0|
|(19,[0,1,2,4,12,1...|       0|
|(19,[0,1,2,11,14,...|       0|
|(19,[0,1,2,11,14]...|       0|
|(19,[0,1,2,4,16],...|       0|
|(19,[0,1,2,11,18]...|       0|
|(19,[0,1,2,4,13,1...|       0|
|(19,[0,1,2,11,13,...|       0|
|(19,[0,1,2,4,13,1...|       0|
|(19,[0,1,2,11,13,...|       0|
|(19,[0,1,2,4,13,1...|       0|
|(19,[0,1,2,11,14,...|       0|
|(19,[0,1,2,11,17]...|       0|
|(19,[0,1,2,11],[5...|       0|
|(19,[0,1,2,11,15]...|       0|
+--------------------+--------+
only showing top 20 rows



In [14]:
from pyspark.ml.classification import RandomForestClassifier 
rf = RandomForestClassifier(labelCol='BelowAvg', featuresCol='features')
model = rf.fit(final_data)
model.featureImportances

SparseVector(19, {0: 0.4742, 1: 0.0155, 2: 0.3773, 3: 0.0144, 4: 0.001, 5: 0.0108, 6: 0.0038, 7: 0.008, 8: 0.0081, 9: 0.0027, 10: 0.0047, 11: 0.0057, 12: 0.0253, 13: 0.0098, 14: 0.0041, 15: 0.0036, 16: 0.0093, 17: 0.0096, 18: 0.0124})

In [15]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol='ScaledFeatures')
scaled_final_data = scaler.fit(final_data).transform(final_data)
scaled_final_data.show()

+--------------------+--------+--------------------+
|            features|BelowAvg|      ScaledFeatures|
+--------------------+--------+--------------------+
|(19,[0,1,2,11,17]...|       0|(19,[0,2,11,17],[...|
|(19,[0,1,2,11,16]...|       0|(19,[0,1,2,11,16]...|
|(19,[0,1,2,8,13,1...|       0|(19,[0,1,2,8,13,1...|
|(19,[0,1,2,11,12,...|       0|(19,[0,2,11,12,17...|
|(19,[0,1,2,11,17]...|       0|(19,[0,1,2,11,17]...|
|(19,[0,1,2,4,14,1...|       0|(19,[0,2,4,14,16]...|
|(19,[0,1,2,4,12,1...|       0|(19,[0,1,2,4,12,1...|
|(19,[0,1,2,11,14,...|       0|(19,[0,1,2,11,14,...|
|(19,[0,1,2,11,14]...|       0|(19,[0,1,2,11,14]...|
|(19,[0,1,2,4,16],...|       0|(19,[0,1,2,4,16],...|
|(19,[0,1,2,11,18]...|       0|(19,[0,1,2,11,18]...|
|(19,[0,1,2,4,13,1...|       0|(19,[0,1,2,4,13,1...|
|(19,[0,1,2,11,13,...|       0|(19,[0,2,11,13,18...|
|(19,[0,1,2,4,13,1...|       0|(19,[0,1,2,4,13,1...|
|(19,[0,1,2,11,13,...|       0|(19,[0,1,2,11,13,...|
|(19,[0,1,2,4,13,1...|       0|(19,[0,1,2,4,13

In [23]:
from pyspark.ml.classification import DecisionTreeClassifier 
dt = DecisionTreeClassifier(labelCol='BelowAvg', featuresCol='features', maxDepth=4)
model = dt.fit(final_data)
print(model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_5dbd2d9d9c6f, depth=4, numNodes=27, numClasses=2, numFeatures=19
  If (feature 0 <= 38.5)
   If (feature 2 <= 4.5)
    If (feature 2 <= 3.5)
     Predict: 1.0
    Else (feature 2 > 3.5)
     If (feature 0 <= 33.5)
      Predict: 1.0
     Else (feature 0 > 33.5)
      Predict: 0.0
   Else (feature 2 > 4.5)
    If (feature 0 <= 24.5)
     If (feature 0 <= 18.5)
      Predict: 1.0
     Else (feature 0 > 18.5)
      Predict: 0.0
    Else (feature 0 > 24.5)
     If (feature 3 in {1.0})
      Predict: 1.0
     Else (feature 3 not in {1.0})
      Predict: 0.0
  Else (feature 0 > 38.5)
   If (feature 2 <= 2.5)
    If (feature 0 <= 57.5)
     If (feature 5 in {1.0})
      Predict: 0.0
     Else (feature 5 not in {1.0})
      Predict: 1.0
    Else (feature 0 > 57.5)
     If (feature 3 in {1.0})
      Predict: 1.0
     Else (feature 3 not in {1.0})
      Predict: 0.0
   Else (feature 2 > 2.5)
    If (feature 2 <= 3.5)
     If (feature 0 

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
pred = model.transform(final_data)
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy =", accuracy)



Accuracy = 0.8523761375126391


In [5]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features')
model = lr.fit(final_data)
model.coefficients

DenseVector([-0.1696, -0.2355, -1.7528, 1.5016, 0.2503, -0.4482, 0.0423, 0.0861, -0.2131, 0.51, 0.4281, 1.3552, 1.1892, -0.307, 0.7566, 0.0761, -1.1407, -0.2897, -0.8304])

In [9]:
train,test = final_data.randomSplit([0.7,0.3])
print(train.count())
print(test.count())

695
294


In [11]:
lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features')
model = lr.fit(train)
print(model)

LogisticRegressionModel: uid=LogisticRegression_e3a4df12510a, numClasses=2, numFeatures=19


In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
test_pred = model.transform(test)
train_pred = model.transform(train)
print(f"The accuracy on the test set is: {evaluator.evaluate(test_pred)}")
print(f"The accuracy on the training set is: {evaluator.evaluate(train_pred)}")

The accuracy on the test set is:0.8163265306122449
The accuracy on the training set is:0.8676258992805755


In [21]:
names = ["Age", "PerfEval", "Seniority", "Dept_Engineering", "Dept_Management", "Dept_Sales", "Education_High School", "Education_Masters", "Education_PhD", "JobTitle_Driver", "JobTitle_Financial Analyst", "JobTitle_Graphic Designer", "JobTitle_It", "JobTitle_Manager", "JobTitle_Marketing Asscociate", "JobTitle_Sales Associate", "JobTitle_Warehouse Associate","JobTitle_Software Engineer"]
coef = list(model.coefficients)

for n,c in zip(names, coef):
    print(n,":",c)

Age : -0.18926959836765017
PerfEval : -0.3029330432877516
Seniority : -1.9726054105908277
Dept_Engineering : 2.197587022338662
Dept_Management : 0.5811025004982264
Dept_Sales : -0.5030752883003874
Education_High School : 0.10300295278590749
Education_Masters : 0.48796552156924455
Education_PhD : 0.3268424786353768
JobTitle_Driver : 0.7913239937730975
JobTitle_Financial Analyst : 0.6689851230098209
JobTitle_Graphic Designer : 1.495821633664943
JobTitle_It : 1.565697637003981
JobTitle_Manager : -0.4159940709273339
JobTitle_Marketing Asscociate : 0.862626211648865
JobTitle_Sales Associate : 0.3831818013533531
JobTitle_Warehouse Associate : -1.1297921617674394
JobTitle_Software Engineer : -0.39039605073062766


In [22]:
import pyspark.sql.functions as f
df = df.filter((f.col("Bonus") <= 15000) & (f.col("Bonus") >= 300))
df = df.filter((f.col("BasePay") <= 200000) & (f.col("BasePay") >= 30000))
df.count()


984

In [23]:
from pyspark.ml.feature import StringIndexer
job_indexer = StringIndexer(inputCol="JobTitle", outputCol="JobIndex")
edu_indexer = StringIndexer(inputCol="Education", outputCol="EducationIndex")
dept_indexer = StringIndexer(inputCol="Dept", outputCol="DeptIndex")
#below_indexer = StringIndexer(inputCol="BelowAvg", outputCol="BelowIndex")
df = job_indexer.fit(df).transform(df)
df = edu_indexer.fit(df).transform(df)
df = dept_indexer.fit(df).transform(df)
#df = below_indexer.fit(df).transform(df)

from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCols=["JobIndex", "EducationIndex", "DeptIndex"], 
                    outputCols=["JobEncoded", "EducationEncoded", "DeptEncoded"])
df = ohe.fit(df).transform(df)

df = df.drop("Gender", "Education", "Dept", "BasePay", "Bonus", "CompanyID", "TotalSalary", "JobTitle", "AvgSalary", "EducationIndex", "JobIndex", "DeptIndex")
df = df.withColumn("BelowAvg",df.BelowAvg.cast('int'))

from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['Age','PerfEval','Seniority','JobEncoded','EducationEncoded','DeptEncoded'], outputCol='features')
output = assembler.transform(df)
final_data = output.select('features','BelowAvg')

In [25]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features')
model = lr.fit(final_data)
print(model.coefficients)

train,test = final_data.randomSplit([0.7,0.3])
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
test_pred = model.transform(test)
train_pred = model.transform(train)
print(f"The accuracy on the test set is: {evaluator.evaluate(test_pred)}")
print(f"The accuracy on the training set is: {evaluator.evaluate(train_pred)}")

[-0.17418703715248107,-0.2483114398508007,-1.7910540743633994,1.5219083958336905,-0.4535251060704115,0.2560767748860016,0.04887466547857764,0.09203752017508361,-0.2178201990917076,0.5207930826946608,0.5461536044490644,1.405361589869495,1.203705680570424,-0.3201934023531408,0.8064174136074549,0.08589065707889364,-1.0992460046367987,-0.2958402613957552,-0.8287949842573207]
The accuracy on the test set is: 0.8277027027027027
The accuracy on the training set is: 0.8590116279069767


In [26]:
names = ["Age", "PerfEval", "Seniority", "Dept_Engineering", "Dept_Management", "Dept_Sales", "Education_High School", "Education_Masters", "Education_PhD", "JobTitle_Driver", "JobTitle_Financial Analyst", "JobTitle_Graphic Designer", "JobTitle_It", "JobTitle_Manager", "JobTitle_Marketing Asscociate", "JobTitle_Sales Associate", "JobTitle_Warehouse Associate","JobTitle_Software Engineer"]
coef = list(model.coefficients)

for n,c in zip(names, coef):
    print(n,":",c)

Age : -0.17418703715248107
PerfEval : -0.2483114398508007
Seniority : -1.7910540743633994
Dept_Engineering : 1.5219083958336905
Dept_Management : -0.4535251060704115
Dept_Sales : 0.2560767748860016
Education_High School : 0.04887466547857764
Education_Masters : 0.09203752017508361
Education_PhD : -0.2178201990917076
JobTitle_Driver : 0.5207930826946608
JobTitle_Financial Analyst : 0.5461536044490644
JobTitle_Graphic Designer : 1.405361589869495
JobTitle_It : 1.203705680570424
JobTitle_Manager : -0.3201934023531408
JobTitle_Marketing Asscociate : 0.8064174136074549
JobTitle_Sales Associate : 0.08589065707889364
JobTitle_Warehouse Associate : -1.0992460046367987
JobTitle_Software Engineer : -0.2958402613957552


In [34]:
avg_df = spark.read.csv("./AvgSalary.csv", header=True)
salary_df = spark.read.csv("./Glassdoor Gender Pay Gap.csv", header=True, inferSchema=True)

salary_df = salary_df.na.drop()
salary_df = salary_df.withColumn("TotalSalary", salary_df.BasePay + salary_df.Bonus)
df = salary_df.join(avg_df, salary_df.JobTitle == avg_df.JobTitle, "inner")
df = df.withColumn("BelowAvg", df.TotalSalary < df.AvgSalary)

df_cols = df.columns
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) == 2]))

# rename by adding suffix '_duplicated'
for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

# rename the column in DF
df = df.toDF(*df_cols)

# remove flagged columns
cols_to_remove = [c for c in df_cols if '_duplicated' in c]
df = df.drop(*cols_to_remove)

from pyspark.ml.feature import StringIndexer
job_indexer = StringIndexer(inputCol="JobTitle", outputCol="JobIndex")
edu_indexer = StringIndexer(inputCol="Education", outputCol="EducationIndex")
dept_indexer = StringIndexer(inputCol="Dept", outputCol="DeptIndex")
#below_indexer = StringIndexer(inputCol="BelowAvg", outputCol="BelowIndex")
df = job_indexer.fit(df).transform(df)
df = edu_indexer.fit(df).transform(df)
df = dept_indexer.fit(df).transform(df)
#df = below_indexer.fit(df).transform(df)

from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCols=["JobIndex", "EducationIndex", "DeptIndex"], 
                    outputCols=["JobEncoded", "EducationEncoded", "DeptEncoded"])
df = ohe.fit(df).transform(df)

df = df.drop("Gender", "Education", "Dept", "BasePay", "Bonus", "CompanyID", "TotalSalary", "JobTitle", "AvgSalary", "EducationIndex", "JobIndex", "DeptIndex")
df = df.withColumn("BelowAvg",df.BelowAvg.cast('int'))

from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['Age','PerfEval','Seniority','JobEncoded','EducationEncoded','DeptEncoded'], outputCol='features')
output = assembler.transform(df)
final_data = output.select('features','BelowAvg')
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol='ScaledFeatures')
scaled_final_data = scaler.fit(final_data).transform(final_data)

In [38]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


train,test = scaled_final_data.randomSplit([0.7,0.3])

lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features')
model = lr.fit(train)
names = ["Age", "PerfEval", "Seniority", "Dept_Engineering", "Dept_Management", "Dept_Sales", "Education_High School", "Education_Masters", "Education_PhD", "JobTitle_Driver", "JobTitle_Financial Analyst", "JobTitle_Graphic Designer", "JobTitle_It", "JobTitle_Manager", "JobTitle_Marketing Asscociate", "JobTitle_Sales Associate", "JobTitle_Warehouse Associate","JobTitle_Software Engineer"]
coef = list(model.coefficients)

for n,c in zip(names, coef):
    print(n,":",c)
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
test_pred = model.transform(test)
train_pred = model.transform(train)
print(f"The accuracy on the test set is: {evaluator.evaluate(test_pred)}")
print(f"The accuracy on the training set is: {evaluator.evaluate(train_pred)}")

Age : -0.17751162840019033
PerfEval : -0.27184385305719483
Seniority : -1.8301840705740096
Dept_Engineering : 1.537020727967001
Dept_Management : 0.19321483319751423
Dept_Sales : -0.5093446786848854
Education_High School : -0.1774488311750685
Education_Masters : -0.29366570145644455
Education_PhD : -0.18498759233879372
JobTitle_Driver : 0.3099080508006317
JobTitle_Financial Analyst : 0.07473101442377882
JobTitle_Graphic Designer : 1.3855850226937154
JobTitle_It : 1.2938076216934133
JobTitle_Manager : -0.2872826858526564
JobTitle_Marketing Asscociate : 0.7936543738339673
JobTitle_Sales Associate : -0.13651558644263465
JobTitle_Warehouse Associate : -1.1697282102055786
JobTitle_Software Engineer : -0.4816720054511909
The accuracy on the test set is: 0.8243243243243243
The accuracy on the training set is: 0.8571428571428571


In [3]:
avg_df = spark.read.csv("./AvgSalary.csv", header=True)
salary_df = spark.read.csv("./Glassdoor Gender Pay Gap.csv", header=True, inferSchema=True)

salary_df = salary_df.na.drop()
salary_df = salary_df.withColumn("TotalSalary", salary_df.BasePay + salary_df.Bonus)
df = salary_df.join(avg_df, salary_df.JobTitle == avg_df.JobTitle, "inner")
df = df.withColumn("BelowAvg", df.TotalSalary < df.AvgSalary)

df_cols = df.columns
# get index of the duplicate columns
duplicate_col_index = list(set([df_cols.index(c) for c in df_cols if df_cols.count(c) == 2]))

# rename by adding suffix '_duplicated'
for i in duplicate_col_index:
    df_cols[i] = df_cols[i] + '_duplicated'

# rename the column in DF
df = df.toDF(*df_cols)

# remove flagged columns
cols_to_remove = [c for c in df_cols if '_duplicated' in c]
df = df.drop(*cols_to_remove)

from pyspark.ml.feature import StringIndexer
job_indexer = StringIndexer(inputCol="JobTitle", outputCol="JobIndex")
edu_indexer = StringIndexer(inputCol="Education", outputCol="EducationIndex")
dept_indexer = StringIndexer(inputCol="Dept", outputCol="DeptIndex")
#below_indexer = StringIndexer(inputCol="BelowAvg", outputCol="BelowIndex")
df = job_indexer.fit(df).transform(df)
df = edu_indexer.fit(df).transform(df)
df = dept_indexer.fit(df).transform(df)
#df = below_indexer.fit(df).transform(df)

from pyspark.ml.feature import OneHotEncoder
ohe = OneHotEncoder(inputCols=["JobIndex", "EducationIndex", "DeptIndex"], 
                    outputCols=["JobEncoded", "EducationEncoded", "DeptEncoded"])
df = ohe.fit(df).transform(df)

df = df.drop("Gender", "Education", "Dept", "BasePay", "Bonus", "CompanyID", "TotalSalary", "JobTitle", "AvgSalary", "EducationIndex", "JobIndex", "DeptIndex")
df = df.withColumn("BelowAvg",df.BelowAvg.cast('int'))

from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['Age','PerfEval','Seniority','JobEncoded','EducationEncoded','DeptEncoded'], outputCol='features')
output = assembler.transform(df)
final_data = output.select('features','BelowAvg')

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#####################################################################
# Fitting model #######################################################
train,test = scaled_final_data.randomSplit([0.7,0.3])

lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features', elasticNetParam = 1)
model = lr.fit(train)
names = ["Age", "PerfEval", "Seniority", "Dept_Engineering", "Dept_Management", "Dept_Sales", "Education_High School", "Education_Masters", "Education_PhD", "JobTitle_Driver", "JobTitle_Financial Analyst", "JobTitle_Graphic Designer", "JobTitle_It", "JobTitle_Manager", "JobTitle_Marketing Asscociate", "JobTitle_Sales Associate", "JobTitle_Warehouse Associate","JobTitle_Software Engineer"]
coef = list(model.coefficients)

for n,c in zip(names, coef):
    print(n,":",c)
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
test_pred = model.transform(test)
train_pred = model.transform(train)
print(f"The accuracy on the test set is: {evaluator.evaluate(test_pred)}")
print(f"The accuracy on the training set is: {evaluator.evaluate(train_pred)}")

                                                                                

NameError: name 'scaled_final_data' is not defined

In [4]:
train,test = final_data.randomSplit([0.7,0.3])

lr = LogisticRegression(labelCol="BelowAvg",featuresCol='features', elasticNetParam = 1)
model = lr.fit(train)
names = ["Age", "PerfEval", "Seniority", "Dept_Engineering", "Dept_Management", "Dept_Sales", "Education_High School", "Education_Masters", "Education_PhD", "JobTitle_Driver", "JobTitle_Financial Analyst", "JobTitle_Graphic Designer", "JobTitle_It", "JobTitle_Manager", "JobTitle_Marketing Asscociate", "JobTitle_Sales Associate", "JobTitle_Warehouse Associate","JobTitle_Software Engineer"]
coef = list(model.coefficients)

for n,c in zip(names, coef):
    print(n,":",c)
evaluator = MulticlassClassificationEvaluator(labelCol="BelowAvg", predictionCol="prediction", metricName="accuracy")
test_pred = model.transform(test)
train_pred = model.transform(train)
print(f"The accuracy on the test set is: {evaluator.evaluate(test_pred)}")
print(f"The accuracy on the training set is: {evaluator.evaluate(train_pred)}")

Age : -0.1867751759489874
PerfEval : -0.21324745491626054
Seniority : -1.9171897773347268
Dept_Engineering : 1.6520902597095273
Dept_Management : 0.9590646461300305
Dept_Sales : 0.20493656336582022
Education_High School : 0.2099354423040738
Education_Masters : 0.3875036188793343
Education_PhD : -0.574548347325251
JobTitle_Driver : 1.401414843169298
JobTitle_Financial Analyst : 0.9895268399670551
JobTitle_Graphic Designer : 1.8651801124899978
JobTitle_It : 1.4038922866711758
JobTitle_Manager : -0.5266006622121671
JobTitle_Marketing Asscociate : 0.6419991938704519
JobTitle_Sales Associate : -0.263252152691717
JobTitle_Warehouse Associate : -1.3502737205885702
JobTitle_Software Engineer : -0.41435991111020803
The accuracy on the test set is: 0.822289156626506
The accuracy on the training set is: 0.867579908675799
