In [2]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("local") \
        .appName("simple_example")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

dataframe = spark_session.read\
    .csv("files/gender_data_2017_2018.csv",\
    header=True, multiLine=True, \
    inferSchema=True, escape='"').cache()


In [3]:
#Counting for testing
dataframe.count()

10504

In [4]:
#Dataframe schema
dataframe.printSchema()

root
 |-- EmployerName: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- CompanyNumber: string (nullable = true)
 |-- SicCodes: string (nullable = true)
 |-- DiffMeanHourlyPercent: double (nullable = true)
 |-- DiffMedianHourlyPercent: double (nullable = true)
 |-- DiffMeanBonusPercent: double (nullable = true)
 |-- DiffMedianBonusPercent: double (nullable = true)
 |-- MaleBonusPercent: double (nullable = true)
 |-- FemaleBonusPercent: double (nullable = true)
 |-- MaleLowerQuartile: double (nullable = true)
 |-- FemaleLowerQuartile: double (nullable = true)
 |-- MaleLowerMiddleQuartile: double (nullable = true)
 |-- FemaleLowerMiddleQuartile: double (nullable = true)
 |-- MaleUpperMiddleQuartile: double (nullable = true)
 |-- FemaleUpperMiddleQuartile: double (nullable = true)
 |-- MaleTopQuartile: double (nullable = true)
 |-- FemaleTopQuartile: double (nullable = true)
 |-- CompanyLinkToGPGInfo: string (nullable = true)
 |-- ResponsiblePerson: string (nullable = 

In [6]:
# Messy look of dataframe
dataframe.show()

+--------------------+--------------------+-------------+--------------------+---------------------+-----------------------+--------------------+----------------------+----------------+------------------+-----------------+-------------------+-----------------------+-------------------------+-----------------------+-------------------------+---------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------------+
|        EmployerName|             Address|CompanyNumber|            SicCodes|DiffMeanHourlyPercent|DiffMedianHourlyPercent|DiffMeanBonusPercent|DiffMedianBonusPercent|MaleBonusPercent|FemaleBonusPercent|MaleLowerQuartile|FemaleLowerQuartile|MaleLowerMiddleQuartile|FemaleLowerMiddleQuartile|MaleUpperMiddleQuartile|FemaleUpperMiddleQuartile|MaleTopQuartile|FemaleTopQuartile|CompanyLinkToGPGInfo|   ResponsiblePerson|  EmployerSize|         CurrentName|SubmittedAfterTheDeadline|
+--------------------+--------------

In [7]:
#Receiving the first data for example
dataframe.rdd.take(1)

[Row(EmployerName='"Bryanston School",Incorporated', Address='Bryanston House,\r\nBlandford,\r\nDorset,\r\nUnited Kingdom,\r\nDT11 0PX', CompanyNumber='00226143', SicCodes='85310', DiffMeanHourlyPercent=18.0, DiffMedianHourlyPercent=28.2, DiffMeanBonusPercent=0.0, DiffMedianBonusPercent=0.0, MaleBonusPercent=0.0, FemaleBonusPercent=0.0, MaleLowerQuartile=24.4, FemaleLowerQuartile=75.6, MaleLowerMiddleQuartile=50.8, FemaleLowerMiddleQuartile=49.2, MaleUpperMiddleQuartile=49.2, FemaleUpperMiddleQuartile=50.8, MaleTopQuartile=51.5, FemaleTopQuartile=48.5, CompanyLinkToGPGInfo='https://www.bryanston.co.uk/employment', ResponsiblePerson='Nick McRobb (Bursar and Clerk to the Governors)', EmployerSize='500 to 999', CurrentName='"Bryanston School",Incorporated', SubmittedAfterTheDeadline
 ='False\r')]

In [9]:
# Number of partitions, only one!
dataframe.rdd.getNumPartitions()

1

In [13]:
#Number of lines
dataframe.rdd.count()

10504

In [16]:
# Some testing of quering
dataframe.select('EmployerName').distinct().take(10)

[Row(EmployerName='ALLERGAN LIMITED'),
 Row(EmployerName='ATOS CONSULTING LIMITED'),
 Row(EmployerName='BAE SYSTEMS GLOBAL COMBAT SYSTEMS MUNITIONS LIMITED'),
 Row(EmployerName='BIRDS (DERBY) LIMITED'),
 Row(EmployerName='Blades Restaurants Ltd'),
 Row(EmployerName='BOTT LIMITED'),
 Row(EmployerName='BRIDGEWAY CONSULTING LIMITED'),
 Row(EmployerName='CAPITA MANAGED IT SOLUTIONS LIMITED'),
 Row(EmployerName='CITIGROUP GLOBAL MARKETS LIMITED'),
 Row(EmployerName='Cornwall Council')]

In [54]:
#-------B.1.1 Which organization has the largest gender pay gap?-----------------------
#It seems fair to only chech the differences in mean value and showing the top 1 result 
from  pyspark.sql.functions import abs
dataframe.withColumn('DiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .orderBy('DiffMeanHourlyPercent', ascending=False)\
    .show(1)
    


+--------------------+---------------------+
|        EmployerName|DiffMeanHourlyPercent|
+--------------------+---------------------+
|COMPLETE CARE HOL...|                400.0|
+--------------------+---------------------+
only showing top 1 row



In [60]:
#-------B.1.1 Which the least?---------
#It seems fair to only chech the differences in mean value and showing the 20 last result 
dataframe.withColumn('DiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .orderBy('DiffMeanHourlyPercent', ascending=True)\
    .show(20)

#Since there seems to be a lot with 0.0 in difference, we will count them as well
dataframe.withColumn('DiffMeanHourlyPercent1', abs(dataframe.DiffMeanHourlyPercent))\
    .filter(dataframe.DiffMeanHourlyPercent == 0.0)\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .count()

+--------------------+---------------------+
|        EmployerName|DiffMeanHourlyPercent|
+--------------------+---------------------+
|      D.G.F. LIMITED|                  0.0|
|      BOREAL LIMITED|                  0.0|
|CRAIGTON FOODS LI...|                  0.0|
|CYCLE TRAINING UK...|                  0.0|
|         725 LIMITED|                  0.0|
|        ACCA LIMITED|                  0.0|
|ANGEL HUMAN RESOU...|                  0.0|
|CARE AT HOME SERV...|                  0.0|
|BRAYBORNE FACILIT...|                  0.0|
|CAVITY DENTAL STA...|                  0.0|
|  Avenues South East|                  0.0|
|Choices Housing A...|                  0.0|
|BESPOKE CLEANING ...|                  0.0|
|CINNAMON CARE COL...|                  0.0|
|24-7 EMPLOYMENT S...|                  0.0|
|CMD RECRUITMENT L...|                  0.0|
|ARRIVA DURHAM COU...|                  0.0|
|COMFORT CALL LIMITED|                  0.0|
|BANBURY HEATH LIM...|                  0.0|
|COOPER TO

82

In [68]:
#----------B.1.2 What is the mean gender pay gap across all organization?-------------

dataframe.withColumn('DiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .orderBy('DiffMeanHourlyPercent', ascending=False)\
    .summary()\
    .show()
    
# The mean of DiffMeanHourlyPercent is 16.143545316070075

+-------+--------------------+---------------------+
|summary|        EmployerName|DiffMeanHourlyPercent|
+-------+--------------------+---------------------+
|  count|               10504|                10504|
|   mean|                null|   16.143545316070075|
| stddev|                null|   13.200863586376073|
|    min|"Bryanston School...|                  0.0|
|    25%|                null|                  6.5|
|    50%|                null|                 14.0|
|    75%|                null|                 23.1|
|    max|        ward hadaway|                400.0|
+-------+--------------------+---------------------+



In [74]:
#-----------B.1.3-----Export the results of B.1.2 to a CSV file.-----------

#Creating the summary
result_summary = \
    dataframe.withColumn('DiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .orderBy('DiffMeanHourlyPercent', ascending=False)\
    .summary()

#Exporting result into csv file
result_summary.write.format("csv")\
.mode("overwrite")\
.options(header="true")\
.save("result_summary.csv")

In [85]:
#-------------Inspect the output file to check it looks---reasonable.-------------
# My answer: Yes, it looks reasonable
dataframe = spark_session.read\
    .csv("result_summary.csv",\
    header=True, multiLine=True, \
    inferSchema=True, escape='"').show()

+-------+--------------------+---------------------+
|summary|        EmployerName|DiffMeanHourlyPercent|
+-------+--------------------+---------------------+
|  count|               10504|                10504|
|   mean|                null|   16.143545316070075|
| stddev|                null|   13.200863586376073|
|    min|"\"Bryanston Scho...|        Incorporated"|
|    25%|                null|                  6.5|
|    50%|                null|                 14.0|
|    75%|                null|                 23.1|
|    max|        ward hadaway|                400.0|
+-------+--------------------+---------------------+



In [94]:
#----------B.1.4 What proportion of organizations pay women more than men on average?
#Explain your calculation.
dataframe = spark_session.read\
    .csv("files/gender_data_2017_2018.csv",\
    header=True, multiLine=True, \
    inferSchema=True, escape='"').cache()

# Calculating the amount of negative results
dataframe.withColumn('DiffMeanHourlyPercent1', abs(dataframe.DiffMeanHourlyPercent))\
    .filter(dataframe.DiffMeanHourlyPercent < 0.0)\
    .select('EmployerName', 'DiffMeanHourlyPercent')\
    .count()



1225

In [95]:
# Calculating all the results
dataframe.count()

10504

In [96]:
1225/10504

#Final answer: 0.11662223914699162

0.11662223914699162

In [122]:
#----------B.2.1 Create a new column for the industry sector 
#----------(for each company) using the SIC code

#Importing useful functions, etc...
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

#Since the sector group ranges (the first two digits) we start at C

#Creating and appending first in dictionary
dictionary_sic = dict.fromkeys(list(range(10,34)), 'C')

#Can only update with one argument at time
dictionary_sic.update(dict.fromkeys([35], 'D'))
dictionary_sic.update(dict.fromkeys(list(range(36,40)), 'E'))
dictionary_sic.update(dict.fromkeys(list(range(41,44)), 'F'))
dictionary_sic.update(dict.fromkeys(list(range(45,48)), 'G'))
dictionary_sic.update(dict.fromkeys(list(range(49,54)), 'H'))
dictionary_sic.update(dict.fromkeys(list(range(55,57)), 'I'))
dictionary_sic.update(dict.fromkeys(list(range(58,64)), 'J'))
dictionary_sic.update(dict.fromkeys(list(range(64,67)), 'K'))
dictionary_sic.update(dict.fromkeys([68], 'L'))
dictionary_sic.update(dict.fromkeys(list(range(69,76)), 'M'))
dictionary_sic.update(dict.fromkeys(list(range(77,83)), 'N'))
dictionary_sic.update(dict.fromkeys([84], 'O'))
dictionary_sic.update(dict.fromkeys([85], 'P'))
dictionary_sic.update(dict.fromkeys(list(range(86,89)), 'Q'))
dictionary_sic.update(dict.fromkeys(list(range(90,94)), 'R'))
dictionary_sic.update(dict.fromkeys(list(range(94,97)), 'S'))
dictionary_sic.update(dict.fromkeys(list(range(97,99)), 'T'))
dictionary_sic.update(dict.fromkeys([99], 'U'))

#Broadcasting the dictionary to represent the SIC code mapping   
dictionary_broadcast = spark_context.broadcast(dictionary_sic)

print(dictionary_sic)

{10: 'C', 11: 'C', 12: 'C', 13: 'C', 14: 'C', 15: 'C', 16: 'C', 17: 'C', 18: 'C', 19: 'C', 20: 'C', 21: 'C', 22: 'C', 23: 'C', 24: 'C', 25: 'C', 26: 'C', 27: 'C', 28: 'C', 29: 'C', 30: 'C', 31: 'C', 32: 'C', 33: 'C', 35: 'D', 36: 'E', 37: 'E', 38: 'E', 39: 'E', 41: 'F', 42: 'F', 43: 'F', 45: 'G', 46: 'G', 47: 'G', 49: 'H', 50: 'H', 51: 'H', 52: 'H', 53: 'H', 55: 'I', 56: 'I', 58: 'J', 59: 'J', 60: 'J', 61: 'J', 62: 'J', 63: 'J', 64: 'K', 65: 'K', 66: 'K', 68: 'L', 69: 'M', 70: 'M', 71: 'M', 72: 'M', 73: 'M', 74: 'M', 75: 'M', 77: 'N', 78: 'N', 79: 'N', 80: 'N', 81: 'N', 82: 'N', 84: 'O', 85: 'P', 86: 'Q', 87: 'Q', 88: 'Q', 90: 'R', 91: 'R', 92: 'R', 93: 'R', 94: 'S', 95: 'S', 96: 'S', 97: 'T', 98: 'T', 99: 'U'}


In [123]:
dictionary_broadcast.value[10]

'C'

In [124]:
#We do not have the need to check for code beginning with 1 because of the try clause
def dictionary_value(sic_code_string):
    try:
        array = sic_code_string.split(',')
        return dictionary_broadcast.value[int(array[0][0:2])]
    except:
        return

In [127]:
udf_dictionary_value = udf(dictionary_value, StringType())
dataframe.select('SicCodes').withColumn("IndustrySector", \
    udf_dictionary_value(dataframe.SicCodes))\
    .show(5)

+--------+--------------+
|SicCodes|IndustrySector|
+--------+--------------+
|   85310|             P|
|   47730|             G|
|   61900|             J|
|   78300|             N|
|   93110|             R|
+--------+--------------+
only showing top 5 rows

