In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://haol-spark:7077") \
        .appName("Question_B1")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext


In [3]:
# B.1
data = spark_session.read.csv("/home/ubuntu/LDSA/ass_2/gender_eq.csv", \
                         escape="\"", ignoreLeadingWhiteSpace=True, multiLine=True, header=True, inferSchema=True)

In [4]:
colTypes=dict(data.dtypes)

In [5]:
for col in data.columns:
    if colTypes[col] == "string":
        data = data.withColumn(col, regexp_replace(col, "\\n", "")).withColumn(col, regexp_replace(col, "\\r", ""))

In [6]:
sqlData = data.select("EmployerName", "DiffMeanHourlyPercent", "SicCodes")

In [36]:
sqlData.show(truncate=False)

+------------------------------------+---------------------+-----------------+
|EmployerName                        |DiffMeanHourlyPercent|SicCodes         |
+------------------------------------+---------------------+-----------------+
|"Bryanston School",Incorporated     |18.0                 |85310            |
|"RED BAND" CHEMICAL COMPANY, LIMITED|2.3                  |47730            |
|118 LIMITED                         |1.7                  |61900            |
|123 EMPLOYEES LTD                   |41.0                 |78300            |
|1610 LIMITED                        |-22.0                |93110            |
|1879 EVENTS MANAGEMENT LIMITED      |13.4                 |56210,70229      |
|1LIFE MANAGEMENT SOLUTIONS LIMITED  |15.1                 |93110,93130,93290|
|1ST HOME CARE LTD.                  |15.0                 |86900,88100      |
|1STOP HALAL LIMITED                 |11.9                 |56290            |
|2 AGRICULTURE LIMITED               |13.4          

In [10]:
dataSorted = sqlData.select("EmployerName", "DiffMeanHourlyPercent", \
                        abs(sqlData["DiffMeanHourlyPercent"]).alias("AbsMeanDiff")) \
                        .sort("AbsMeanDiff", ascending=False)
dataSorted.show(truncate=False)

+--------------------------------------------------------+---------------------+-----------+
|EmployerName                                            |DiffMeanHourlyPercent|AbsMeanDiff|
+--------------------------------------------------------+---------------------+-----------+
|COMPLETE CARE HOLDINGS LIMITED                          |-400.0               |400.0      |
|BARLOWS (U.K.) LIMITED                                  |-267.6               |267.6      |
|MILLWALL HOLDINGS PLC                                   |159.0                |159.0      |
|INBRELLA LIMITED                                        |-150.0               |150.0      |
|BAR 2010 LIMITED                                        |-148.0               |148.0      |
|Solent Academies Trust                                  |-107.0               |107.0      |
|STOKE CITY FOOTBALL CLUB LIMITED                        |92.5                 |92.5       |
|BURNLEY FOOTBALL & ATHLETIC COMPANY, LIMITED(THE)       |88.4        

In [11]:
dataSorted = dataSorted.sort("AbsMeanDiff", ascending=True)
dataSorted.show(truncate=False)

+------------------------------------------+---------------------+-----------+
|EmployerName                              |DiffMeanHourlyPercent|AbsMeanDiff|
+------------------------------------------+---------------------+-----------+
|COOPER TOPCO LIMITED                      |0.0                  |0.0        |
|BOREAL LIMITED                            |0.0                  |0.0        |
|COMFORT CALL LIMITED                      |0.0                  |0.0        |
|AVENUE CARE SERVICES LIMITED              |0.0                  |0.0        |
|BLUES AGENCY LIMITED                      |0.0                  |0.0        |
|CAVITY DENTAL STAFF AGENCY LTD            |0.0                  |0.0        |
|CMD RECRUITMENT LIMITED                   |0.0                  |0.0        |
|ACCA LIMITED                              |0.0                  |0.0        |
|ARRIVA DURHAM COUNTY LIMITED              |0.0                  |0.0        |
|BANBURY HEATH LIMITED                     |0.0     

In [12]:
sqlDataAvg  = sqlData.groupBy().mean("DiffMeanHourlyPercent")

In [13]:
sqlDataAvg.show()

+--------------------------+
|avg(DiffMeanHourlyPercent)|
+--------------------------+
|          14.2997905559787|
+--------------------------+



In [None]:
sqlDataAvg.write.save("mean_pay_gap.csv", format="csv")

In [20]:
dataNeg = sqlData.filter(sqlData["DiffMeanHourlyPercent"] < 0)

In [23]:
dataNeg.count()

1225

In [24]:
sqlData.count()

10504

In [25]:
print(1225/10504)

0.11662223914699162


## B.2

In [26]:
sic = spark_session.read.csv('/home/ubuntu/LDSA/ass_2/sic.csv', header="true", inferSchema="true", multiLine="true", escape="\"")

In [27]:
# Remove entries without and with extra SIC-codes
import re
dataSIC = sqlData.filter(sqlData["SicCodes"] != "None")
dataSIC = dataSIC.withColumn("SicCodes", regexp_replace("SicCodes", ",\d*", ""))
dataSIC = dataSIC.withColumn("SicCodes", dataSIC.SicCodes.cast("int"))

In [28]:
codes = []
for line in sic.collect():
    for i in range(line.Max - line.Min + 1):
        codes.append((line.Min + i, line.Sector))

codesBr = spark_context.broadcast(spark_context.parallelize(codes).collectAsMap())

from pyspark.sql.functions import UserDefinedFunction
sicFit = UserDefinedFunction(lambda x: codesBr.value[int(x/1000)])

dataSIC = dataSIC.filter(dataSIC["SicCodes"] != 1) \
                        .withColumn("Sector", sicFit("SicCodes"))

In [30]:
dataSIC.show()

+--------------------+---------------------+--------+--------------------+
|        EmployerName|DiffMeanHourlyPercent|SicCodes|              Sector|
+--------------------+---------------------+--------+--------------------+
|"Bryanston School...|                 18.0|   85310|           EDUCATION|
|"RED BAND" CHEMIC...|                  2.3|   47730|WHOLESALE AND RET...|
|         118 LIMITED|                  1.7|   61900|INFORMATION AND C...|
|   123 EMPLOYEES LTD|                 41.0|   78300|ADMINISTRATIVE AN...|
|        1610 LIMITED|                -22.0|   93110|ARTS ENTERTAINMEN...|
|1879 EVENTS MANAG...|                 13.4|   56210|ACCOMMODATION AND...|
|1LIFE MANAGEMENT ...|                 15.1|   93110|ARTS ENTERTAINMEN...|
|  1ST HOME CARE LTD.|                 15.0|   86900|HUMAN HEALTH AND ...|
| 1STOP HALAL LIMITED|                 11.9|   56290|ACCOMMODATION AND...|
|2 AGRICULTURE LIM...|                 13.4|    1470|AGRICULTURE FORES...|
|2 SISTERS FOOD GR...|   

In [31]:
sectorMean = dataSIC.groupBy("Sector") \
                        .mean("DiffMeanHourlyPercent") \
                        .withColumnRenamed("avg(DiffMeanHourlyPercent)", "MeanDiff")

In [32]:
sectorMean.sort("MeanDiff").show()

+--------------------+------------------+
|              Sector|          MeanDiff|
+--------------------+------------------+
|ACTIVITIES OF HOU...| 3.133333333333333|
|HUMAN HEALTH AND ...| 6.588755622188904|
|WATER SUPPLY SEWE...| 7.499999999999998|
|ACCOMMODATION AND...| 7.934003656307122|
|PUBLIC ADMINISTRA...| 9.176190476190477|
|ACTIVITIES OF EXT...| 9.944444444444445|
|TRANSPORTATION AN...|10.213910761154864|
|ADMINISTRATIVE AN...|11.274045801526723|
|           EDUCATION|11.714520547945208|
|OTHER SERVICE ACT...| 12.46292682926829|
|AGRICULTURE FORES...|              12.8|
|       MANUFACTURING|13.301317715959009|
|WHOLESALE AND RET...| 14.92169999999999|
|ELECTRICITY GAS S...| 15.27818181818182|
|REAL ESTATE ACTIV...|16.024799999999995|
|PROFESSIONAL SCIE...|18.477335164835157|
|INFORMATION AND C...|19.500473933649314|
|ARTS ENTERTAINMEN...|20.689416058394166|
|MINING AND QUARRYING|20.729999999999997|
|        CONSTRUCTION|21.707570977917985|
+--------------------+------------

In [33]:
# B.2.3 Calculate variance
sectorVar = dataSIC.groupBy("Sector") \
                        .agg(variance(dataSIC.DiffMeanHourlyPercent)) \
                        .withColumnRenamed("var_samp(DiffMeanHourlyPercent)", "VarDiff")
sectorStats = sectorMean.join(sectorVar, ["Sector"])

In [35]:
sectorStats.sort("MeanDiff").show()

+--------------------+------------------+------------------+
|              Sector|          MeanDiff|           VarDiff|
+--------------------+------------------+------------------+
|ACTIVITIES OF HOU...| 3.133333333333333| 4.853333333333333|
|HUMAN HEALTH AND ...| 6.588755622188904| 382.6790775783276|
|WATER SUPPLY SEWE...| 7.499999999999998|182.80441176470592|
|ACCOMMODATION AND...| 7.934003656307122| 92.24821892306353|
|PUBLIC ADMINISTRA...| 9.176190476190477|153.89112659698029|
|ACTIVITIES OF EXT...| 9.944444444444445| 53.88527777777777|
|TRANSPORTATION AN...|10.213910761154864| 205.6516217709626|
|ADMINISTRATIVE AN...|11.274045801526723|209.92382495885113|
|           EDUCATION|11.714520547945208| 146.8952006623513|
|OTHER SERVICE ACT...| 12.46292682926829|181.57783452893355|
|AGRICULTURE FORES...|              12.8|156.11877551020405|
|       MANUFACTURING|13.301317715959009| 157.9409213392759|
|WHOLESALE AND RET...| 14.92169999999999|198.42528439439434|
|ELECTRICITY GAS S...| 1

In [37]:
spark_context.stop()