# Analysis with DataFrames / SQL
Analysis with DataFrames / SQL

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
         .master("local") \
         .appName("DataFrame") \
         .config("spark.some.config.option", "some-value") \
         .getOrCreate()
        
sc = spark.sparkContext

B.1.1 Which organization has the largest gender pay gap? Which the least?

In [72]:
from pyspark.sql.types import *

df = spark.read.csv('data_pp.csv', header=True, inferSchema=True)


Schema of dataframe

In [73]:
from pyspark.sql import functions as F
df.printSchema()

root
 |-- EmployerName: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- CompanyNumber: string (nullable = true)
 |-- SicCodes: decimal(35,0) (nullable = true)
 |-- DiffMeanHourlyPercent: double (nullable = true)
 |-- DiffMedianHourlyPercent: double (nullable = true)
 |-- DiffMeanBonusPercent: double (nullable = true)
 |-- DiffMedianBonusPercent: double (nullable = true)
 |-- MaleBonusPercent: double (nullable = true)
 |-- FemaleBonusPercent: double (nullable = true)
 |-- MaleLowerQuartile: double (nullable = true)
 |-- FemaleLowerQuartile: double (nullable = true)
 |-- MaleLowerMiddleQuartile: double (nullable = true)
 |-- FemaleLowerMiddleQuartile: double (nullable = true)
 |-- MaleUpperMiddleQuartile: double (nullable = true)
 |-- FemaleUpperMiddleQuartile: double (nullable = true)
 |-- MaleTopQuartile: double (nullable = true)
 |-- FemaleTopQuartile: double (nullable = true)
 |-- CompanyLinkToGPGInfo: string (nullable = true)
 |-- ResponsiblePerson: string (null

In [74]:
df1 = df.selectExpr("EmployerName","abs(DiffMeanHourlyPercent)")

#getting the max difference between hourlepercent
print(df1.agg({"abs(DiffMeanHourlyPercent)": "max"}).collect())
print(df1.agg({"abs(DiffMeanHourlyPercent)": "min"}).collect())

print(df1.where("abs(DiffMeanHourlyPercent)==400.0").collect())
print(df.where("abs(DiffMeanHourlyPercent)==0.0").count())

#for calculating avg of mean gender pay gap
df.groupBy().avg('DiffMeanHourlyPercent').collect()

[Row(max(abs(DiffMeanHourlyPercent))=400.0)]
[Row(min(abs(DiffMeanHourlyPercent))=0.0)]
[Row(EmployerName='COMPLETECAREHOLDINGSLIMITED', abs(DiffMeanHourlyPercent)=400.0)]
81


[Row(avg(DiffMeanHourlyPercent)=14.298103136021377)]

In [75]:
#getting the total no of rows
df.count()

10491

In [76]:
#df.createOrReplaceTempView("tallysheet")
df.createOrReplaceTempView("Table")

In [23]:
df.rdd.getNumPartitions()

1

In [77]:
my_map = dict([('AGRICULTURE FORESTRY AND FISHING', (1,3)), ('MINING AND QUARRYING', (5,9)), ('MANUFACTURING', (10,33)),
      ('ELECTRICITY GAS STEAM AND AIR CONDITIONING SUPPLY',(35,35)),('WATER SUPPLY SEWERAGE WASTE MANAGEMENT AND REMEDIATION ACTIVITIES',(36,39))
     ,('CONSTRUCTION',(41,43)),('WHOLESALE AND RETAIL TRADE REPAIR OF MOTOR VEHICLES AND MOTORCYCLES',(45,47)),
      ('TRANSPORTATION AND STORAGE',(49,53)),('ACCOMMODATION AND FOOD SERVICE ACTIVITIES',(55,56)),
     ('INFORMATION AND COMMUNICATION',(58,63)),('FINANCIAL AND INSURANCE ACTIVITIES',(64,66))
     ,('REAL ESTATE ACTIVITIES',(68,68)),('PROFESSIONAL SCIENTIFIC AND TECHNICAL ACTIVITIES',(69,75)),
     ('ADMINISTRATIVE AND SUPPORT SERVICE ACTIVITIES',(77,82)),('PUBLIC ADMINISTRATION AND DEFENCE COMPULSORY SOCIAL SECURITY',(84,84)),
     ('EDUCATION',(85,85)),('HUMAN HEALTH AND SOCIAL WORK ACTIVITIES',(86,88)),('ARTS ENTERTAINMENT AND RECREATION',(90,93)),
     ('OTHER SERVICE ACTIVITIES',(94,96)),
    ('ACTIVITIES OF HOUSEHOLDS AS EMPLOYERS UNDIFFERENTIATED GOODS-AND SERVICES-PRODUCING ACTIVITIES OF HOUSEHOLDS FOR OWN USE',(97,98)),
     ('ACTIVITIES OF EXTRATERRITORIAL ORGANISATIONS AND BODIES',(99,99))])
inv_map = {v: k for k, v in my_map.items()}
inv_map
# Immutable data for all workers
broadcast_variable = sc.broadcast(inv_map)

In [129]:
broadcast_variable.value[0]

KeyError: 0

In [78]:
inv_map.keys()

dict_keys([(97, 98), (5, 9), (36, 39), (1, 3), (99, 99), (86, 88), (69, 75), (85, 85), (90, 93), (68, 68), (45, 47), (58, 63), (49, 53), (41, 43), (77, 82), (10, 33), (84, 84), (94, 96), (55, 56), (64, 66), (35, 35)])

In [79]:
df2 = spark.sql("select * from Table where SicCodes != 1 ")
df2.count()

9575

In [80]:
#changing the column type of sicodes from decimal to string

from pyspark.sql.types import StringType

df2 = df2.withColumn("SicCodes_Str", df["SicCodes"].cast(StringType()))

#df5.select(df5.SicCodes.cast(StringType()).alias('SicCodes_String'))
df2.select('SicCodes_Str').show()

+---------------+
|   SicCodes_Str|
+---------------+
|          85310|
|          47730|
|          61900|
|          78300|
|          93110|
|     5621070229|
|931109313093290|
|     8690088100|
|          56290|
|      147010910|
|          10120|
|          10120|
|          10110|
|          82200|
|          56103|
|          52290|
|          78200|
|         186210|
|          81210|
|          52290|
+---------------+
only showing top 20 rows



In [81]:
#creating a new column where to keep the first two digits from the SicCodes as int
import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

def SicCodes_int(SicCodes_Str):
    if SicCodes_Str is not None:
        return SicCodes_Str[:2]
  

udf_SicCode = udf(SicCodes_int, StringType())


df2 = df2.withColumn("SicSector_No",udf_SicCode("SicCodes_Str"))

df2=df2.withColumn("SicSector_No", df2["SicSector_No"].cast(IntegerType()))
df2.select('SicCodes_Str','SicSector_No').show()

+---------------+------------+
|   SicCodes_Str|SicSector_No|
+---------------+------------+
|          85310|          85|
|          47730|          47|
|          61900|          61|
|          78300|          78|
|          93110|          93|
|     5621070229|          56|
|931109313093290|          93|
|     8690088100|          86|
|          56290|          56|
|      147010910|          14|
|          10120|          10|
|          10120|          10|
|          10110|          10|
|          82200|          82|
|          56103|          56|
|          52290|          52|
|          78200|          78|
|         186210|          18|
|          81210|          81|
|          52290|          52|
+---------------+------------+
only showing top 20 rows



In [82]:
#df_Sic_int is the last and final dataframe to have first two digits of SicCodes as int
df2 = df2.drop('SicCodes_Str')
df2.printSchema()

root
 |-- EmployerName: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- CompanyNumber: string (nullable = true)
 |-- SicCodes: decimal(35,0) (nullable = true)
 |-- DiffMeanHourlyPercent: double (nullable = true)
 |-- DiffMedianHourlyPercent: double (nullable = true)
 |-- DiffMeanBonusPercent: double (nullable = true)
 |-- DiffMedianBonusPercent: double (nullable = true)
 |-- MaleBonusPercent: double (nullable = true)
 |-- FemaleBonusPercent: double (nullable = true)
 |-- MaleLowerQuartile: double (nullable = true)
 |-- FemaleLowerQuartile: double (nullable = true)
 |-- MaleLowerMiddleQuartile: double (nullable = true)
 |-- FemaleLowerMiddleQuartile: double (nullable = true)
 |-- MaleUpperMiddleQuartile: double (nullable = true)
 |-- FemaleUpperMiddleQuartile: double (nullable = true)
 |-- MaleTopQuartile: double (nullable = true)
 |-- FemaleTopQuartile: double (nullable = true)
 |-- CompanyLinkToGPGInfo: string (nullable = true)
 |-- ResponsiblePerson: string (null

In [111]:
#creating a new column where to keep the first two digits from the SicCodes as int
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def SicCodes_name(SicSector_No):
    if SicSector_No >= 10 and SicSector_No <=33:
        return broadcast_variable.value[(10,33)]
    elif SicSector_No == 35:
        return broadcast_variable.value[(35,35)]
    elif SicSector_No >= 36 and SicSector_No <=39:
        return broadcast_variable.value[(36,39)]
    elif SicSector_No >= 41 and SicSector_No <=43:
        return broadcast_variable.value[(41,43)]
    elif SicSector_No >= 45 and SicSector_No <=47:
        return broadcast_variable.value[(45,47)]
    elif SicSector_No >= 49 and SicSector_No <=53:
        return broadcast_variable.value[(49,53)]
    elif SicSector_No >= 55 and SicSector_No <=56:
        return broadcast_variable.value[(55,56)]
    elif SicSector_No >= 58 and SicSector_No <=63:
        return broadcast_variable.value[(58,63)]
    elif SicSector_No >= 64 and SicSector_No <=66:
        return broadcast_variable.value[(64,66)]
    elif SicSector_No == 68:
        return broadcast_variable.value[(68,68)]
    elif SicSector_No >= 69 and SicSector_No <=75:
        return broadcast_variable.value[(69,75)]
    elif SicSector_No >= 77 and SicSector_No <=82:
        return broadcast_variable.value[(77,82)]
    elif SicSector_No == 84:
        return broadcast_variable.value[(84,84)]
    elif SicSector_No == 85:
        return broadcast_variable.value[(85,85)]
    elif SicSector_No >= 86 and SicSector_No <=88:
        return broadcast_variable.value[(86,88)]
    elif SicSector_No >= 90 and SicSector_No <=93:
        return broadcast_variable.value[(90,93)]
    elif SicSector_No >= 94 and SicSector_No <=96:
        return broadcast_variable.value[(94,96)]
    elif SicSector_No >= 97 and SicSector_No <=98:
        return broadcast_variable.value[(97,98)]
    elif SicSector_No == 99:
        return broadcast_variable.value[(99,99)]

udf_SicCodes_name = udf(SicCodes_name, StringType())


df2 = df2.withColumn("SicSector_Name",udf_SicCodes_name("SicSector_No"))

df2.select('SicSector_No','SicSector_Name').show()
df2.printSchema()

+------------+--------------------+
|SicSector_No|      SicSector_Name|
+------------+--------------------+
|          85|           EDUCATION|
|          47|WHOLESALE AND RET...|
|          61|INFORMATION AND C...|
|          78|ADMINISTRATIVE AN...|
|          93|ARTS ENTERTAINMEN...|
|          56|ACCOMMODATION AND...|
|          93|ARTS ENTERTAINMEN...|
|          86|HUMAN HEALTH AND ...|
|          56|ACCOMMODATION AND...|
|          14|       MANUFACTURING|
|          10|       MANUFACTURING|
|          10|       MANUFACTURING|
|          10|       MANUFACTURING|
|          82|ADMINISTRATIVE AN...|
|          56|ACCOMMODATION AND...|
|          52|TRANSPORTATION AN...|
|          78|ADMINISTRATIVE AN...|
|          18|       MANUFACTURING|
|          81|ADMINISTRATIVE AN...|
|          52|TRANSPORTATION AN...|
+------------+--------------------+
only showing top 20 rows

root
 |-- EmployerName: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- CompanyNumber: st

In [147]:
#counting genderpay gap for one industry sector and then we can do it for the rest of the industry
df3 = df2.filter(df2.SicSector_Name.isNotNull())
df3.groupBy(df3.SicSector_Name).avg('DiffMeanHourlyPercent').show()

+--------------------+--------------------------+
|      SicSector_Name|avg(DiffMeanHourlyPercent)|
+--------------------+--------------------------+
|ADMINISTRATIVE AN...|        11.247895622895628|
|ELECTRICITY GAS S...|        14.785185185185187|
|        CONSTRUCTION|        21.707570977917985|
|TRANSPORTATION AN...|        10.276178010471213|
|WHOLESALE AND RET...|        14.928056112224441|
|PUBLIC ADMINISTRA...|         9.176190476190477|
|ARTS ENTERTAINMEN...|         21.06199999999999|
|OTHER SERVICE ACT...|         12.46292682926829|
|FINANCIAL AND INS...|        26.421882951653938|
|           EDUCATION|        11.730219780219784|
|ACTIVITIES OF EXT...|         9.944444444444445|
|HUMAN HEALTH AND ...|         6.588755622188904|
|ACCOMMODATION AND...|        7.8681318681318615|
|WATER SUPPLY SEWE...|         7.499999999999998|
|PROFESSIONAL SCIE...|        18.491334250343872|
|       MANUFACTURING|        14.340364298724948|
|REAL ESTATE ACTIV...|        16.024799999999995|


In [148]:
#diff median
df3.groupBy(df3.SicSector_Name).avg('DiffMedianHourlyPercent').show()

+--------------------+----------------------------+
|      SicSector_Name|avg(DiffMedianHourlyPercent)|
+--------------------+----------------------------+
|ADMINISTRATIVE AN...|           9.256228956228956|
|ELECTRICITY GAS S...|          15.651851851851852|
|        CONSTRUCTION|          23.757413249211364|
|TRANSPORTATION AN...|           9.660732984293192|
|WHOLESALE AND RET...|           7.907314629258507|
|PUBLIC ADMINISTRA...|           9.785714285714286|
|ARTS ENTERTAINMEN...|           6.594666666666666|
|OTHER SERVICE ACT...|           9.334634146341465|
|FINANCIAL AND INS...|          22.449363867684472|
|           EDUCATION|          13.661538461538465|
|ACTIVITIES OF EXT...|           6.022222222222222|
|HUMAN HEALTH AND ...|           2.843178410794602|
|ACCOMMODATION AND...|           3.743589743589742|
|WATER SUPPLY SEWE...|           8.059420289855073|
|PROFESSIONAL SCIE...|          14.778541953232475|
|       MANUFACTURING|           13.76746812386155|
|REAL ESTATE

In [149]:
#diff mean bonus percent
df3.groupBy(df3.SicSector_Name).avg('DiffMeanBonusPercent').show()

+--------------------+-------------------------+
|      SicSector_Name|avg(DiffMeanBonusPercent)|
+--------------------+-------------------------+
|ADMINISTRATIVE AN...|        7.882744107744111|
|ELECTRICITY GAS S...|       27.094444444444434|
|        CONSTRUCTION|        27.95173501577289|
|TRANSPORTATION AN...|         13.2479057591623|
|WHOLESALE AND RET...|       -50.14178356713425|
|PUBLIC ADMINISTRA...|       18.576190476190476|
|ARTS ENTERTAINMEN...|                   27.685|
|OTHER SERVICE ACT...|        17.52731707317074|
|FINANCIAL AND INS...|        47.30254452926211|
|           EDUCATION|      -15.838461538461546|
|ACTIVITIES OF EXT...|       24.166666666666668|
|HUMAN HEALTH AND ...|      -7.9389805097451305|
|ACCOMMODATION AND...|       10.840659340659341|
|WATER SUPPLY SEWE...|        9.556521739130435|
|PROFESSIONAL SCIE...|        32.17345254470423|
|       MANUFACTURING|        8.211256830601098|
|REAL ESTATE ACTIV...|       23.070399999999996|
|ACTIVITIES OF HOU..

In [150]:
#diff median bonus percent
df3.groupBy(df3.SicSector_Name).avg('DiffMedianBonusPercent').show()

+--------------------+---------------------------+
|      SicSector_Name|avg(DiffMedianBonusPercent)|
+--------------------+---------------------------+
|ADMINISTRATIVE AN...|         1.1228956228956166|
|ELECTRICITY GAS S...|         22.235185185185188|
|        CONSTRUCTION|          -2.83312302839117|
|TRANSPORTATION AN...|         -35.23979057591623|
|WHOLESALE AND RET...|         -51.30641282565135|
|PUBLIC ADMINISTRA...|          18.89761904761905|
|ARTS ENTERTAINMEN...|         -21.37633333333334|
|OTHER SERVICE ACT...|         -4.016585365853658|
|FINANCIAL AND INS...|         12.706615776081417|
|           EDUCATION|        -13.298351648351646|
|ACTIVITIES OF EXT...|         12.266666666666667|
|HUMAN HEALTH AND ...|        -3.0916041979010487|
|ACCOMMODATION AND...|        -15.151648351648364|
|WATER SUPPLY SEWE...|         -42.89999999999999|
|PROFESSIONAL SCIE...|          13.30701513067399|
|       MANUFACTURING|        -45.433734061930814|
|REAL ESTATE ACTIV...|         

In [151]:
#male bonus percent
df3.groupBy(df3.SicSector_Name).avg('MaleBonusPercent').show()

+--------------------+---------------------+
|      SicSector_Name|avg(MaleBonusPercent)|
+--------------------+---------------------+
|ADMINISTRATIVE AN...|    34.22769360269361|
|ELECTRICITY GAS S...|    65.00185185185184|
|        CONSTRUCTION|    43.94132492113566|
|TRANSPORTATION AN...|    42.71963350785341|
|WHOLESALE AND RET...|    55.96883767535071|
|PUBLIC ADMINISTRA...|    20.68095238095238|
|ARTS ENTERTAINMEN...|   25.313000000000013|
|OTHER SERVICE ACT...|     33.9019512195122|
|FINANCIAL AND INS...|    67.39083969465649|
|           EDUCATION|    7.634340659340661|
|ACTIVITIES OF EXT...|    33.86666666666667|
|HUMAN HEALTH AND ...|   12.083358320839592|
|ACCOMMODATION AND...|   23.835897435897433|
|WATER SUPPLY SEWE...|   55.343478260869574|
|PROFESSIONAL SCIE...|    45.55103163686382|
|       MANUFACTURING|   29.409981785063792|
|REAL ESTATE ACTIV...|    38.63680000000001|
|ACTIVITIES OF HOU...|    4.666666666666667|
|INFORMATION AND C...|    60.89908883826881|
+---------

In [152]:
#female bonus percent
df3.groupBy(df3.SicSector_Name).avg('FemaleBonusPercent').show()

+--------------------+-----------------------+
|      SicSector_Name|avg(FemaleBonusPercent)|
+--------------------+-----------------------+
|ADMINISTRATIVE AN...|      33.87954545454545|
|ELECTRICITY GAS S...|      65.11296296296298|
|        CONSTRUCTION|      42.62208201892746|
|TRANSPORTATION AN...|      38.60104712041888|
|WHOLESALE AND RET...|     52.015531062124246|
|PUBLIC ADMINISTRA...|     18.788095238095238|
|ARTS ENTERTAINMEN...|     21.529999999999994|
|OTHER SERVICE ACT...|      32.64536585365854|
|FINANCIAL AND INS...|      65.39007633587785|
|           EDUCATION|      8.581318681318681|
|ACTIVITIES OF EXT...|     30.577777777777776|
|HUMAN HEALTH AND ...|     13.225037481259365|
|ACCOMMODATION AND...|     23.433699633699643|
|WATER SUPPLY SEWE...|      50.18405797101449|
|PROFESSIONAL SCIE...|      43.82420907840442|
|       MANUFACTURING|     27.862659380692214|
|REAL ESTATE ACTIV...|     36.521600000000014|
|ACTIVITIES OF HOU...|     0.6333333333333334|
|INFORMATION 

In [153]:
spark.stop()