In [37]:
from pprint import pprint
from pyspark.sql import SparkSession
from  pyspark.sql.functions import abs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# New API
spark_session = SparkSession\
        .builder\
        .master("local") \
        .appName("a3")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

dataframe = spark_session.read\
    .csv("gender.csv", header=True, multiLine=True, inferSchema=True, escape='"').cache()



In [None]:
dataframe.printSchema()

In [104]:
dataframe\
    .withColumn('AbsDiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .select('EmployerName', 'AbsDiffMeanHourlyPercent')\
    .orderBy('AbsDiffMeanHourlyPercent', ascending=False)\
    .show()

+--------------------+------------------------+
|        EmployerName|AbsDiffMeanHourlyPercent|
+--------------------+------------------------+
|COMPLETE CARE HOL...|                   400.0|
|BARLOWS (U.K.) LI...|                   267.6|
|MILLWALL HOLDINGS...|                   159.0|
|    INBRELLA LIMITED|                   150.0|
|    BAR 2010 LIMITED|                   148.0|
|Solent Academies ...|                   107.0|
|STOKE CITY FOOTBA...|                    92.5|
|BURNLEY FOOTBALL ...|                    88.4|
|SWANSEA CITY ASSO...|                    87.8|
|MANCHESTER CITY F...|                    87.7|
|WEST HAM UNITED F...|                    87.4|
|WATFORD ASSOCIATI...|                    87.0|
|SUNDERLAND ASSOCI...|                    85.1|
|WEST BROMWICH ALB...|                    84.4|
|SOUTHAMPTON FOOTB...|                    84.4|
|        CPFC LIMITED|                    84.0|
|NEWCASTLE UNITED ...|                    83.3|
|AFC BOURNEMOUTH L...|                  

In [None]:
summary = dataframe.withColumn('AbsDiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .filter(dataframe.DiffMeanHourlyPercent == 0.0)\
    .select('EmployerName', 'AbsDiffMeanHourlyPercent')\
    .summary()

In [110]:
dataframe\
    .withColumn('AbsDiffMeanHourlyPercent', abs(dataframe.DiffMeanHourlyPercent))\
    .filter(abs(dataframe.DiffMeanHourlyPercent) == '0.0')\
    .select('EmployerName', 'AbsDiffMeanHourlyPercent')\
    .summary()\
    .show()

+-------+--------------------+------------------------+
|summary|        EmployerName|AbsDiffMeanHourlyPercent|
+-------+--------------------+------------------------+
|  count|                  82|                      82|
|   mean|                null|                     0.0|
| stddev|                null|                     0.0|
|    min|24-7 EMPLOYMENT S...|                     0.0|
|    25%|                null|                     0.0|
|    50%|                null|                     0.0|
|    75%|                null|                     0.0|
|    max|WISE SECURITY SER...|                     0.0|
+-------+--------------------+------------------------+



In [None]:
summary.write.format("csv").mode("overwrite").options(header="true").save("summary.csv")

In [None]:
summary = spark_session.read\
    .csv("summary.csv", header=True)\
    .show()

In [115]:
dataframe\
    .count()

10503

In [116]:
1225/10503

0.11663334285442255

In [17]:
my_dict = dict.fromkeys(list(range(10,34)), 'C')
my_dict.update(dict.fromkeys([35], 'D'))
my_dict.update(dict.fromkeys(list(range(36,40)), 'E'))
my_dict.update(dict.fromkeys(list(range(41,44)), 'F'))
my_dict.update(dict.fromkeys(list(range(45,48)), 'G'))
my_dict.update(dict.fromkeys(list(range(49,54)), 'H'))
my_dict.update(dict.fromkeys(list(range(55,57)), 'I'))
my_dict.update(dict.fromkeys(list(range(58,64)), 'J'))
my_dict.update(dict.fromkeys(list(range(64,67)), 'K'))
my_dict.update(dict.fromkeys([68], 'L'))
my_dict.update(dict.fromkeys(list(range(69,76)), 'M'))
my_dict.update(dict.fromkeys(list(range(77,83)), 'N'))
my_dict.update(dict.fromkeys([84], 'O'))
my_dict.update(dict.fromkeys([85], 'P'))
my_dict.update(dict.fromkeys(list(range(86,89)), 'Q'))
my_dict.update(dict.fromkeys(list(range(90,94)), 'R'))
my_dict.update(dict.fromkeys(list(range(94,97)), 'S'))
my_dict.update(dict.fromkeys(list(range(97,99)), 'T'))
my_dict.update(dict.fromkeys([99], 'U'))
dict = spark_context.broadcast(my_dict)

In [51]:
dataframe.printSchema()

root
 |-- EmployerName: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- CompanyNumber: string (nullable = true)
 |-- SicCodes: string (nullable = true)
 |-- DiffMeanHourlyPercent: double (nullable = true)
 |-- DiffMedianHourlyPercent: double (nullable = true)
 |-- DiffMeanBonusPercent: double (nullable = true)
 |-- DiffMedianBonusPercent: double (nullable = true)
 |-- MaleBonusPercent: double (nullable = true)
 |-- FemaleBonusPercent: double (nullable = true)
 |-- MaleLowerQuartile: double (nullable = true)
 |-- FemaleLowerQuartile: double (nullable = true)
 |-- MaleLowerMiddleQuartile: double (nullable = true)
 |-- FemaleLowerMiddleQuartile: double (nullable = true)
 |-- MaleUpperMiddleQuartile: double (nullable = true)
 |-- FemaleUpperMiddleQuartile: double (nullable = true)
 |-- MaleTopQuartile: double (nullable = true)
 |-- FemaleTopQuartile: double (nullable = true)
 |-- CompanyLinkToGPGInfo: string (nullable = true)
 |-- ResponsiblePerson: string (nullable = 

In [77]:
def code_lookup(codeString):
    try:
        if not codeString.split(',')[0] == '1': 
            return dict.value[int(codeString.split(',')[0][0:2])]
        else:
            return 
    except:
        return
    
udf_code_lookup = udf(code_lookup, StringType())

dataframe.select('SicCodes').withColumn("Sector", udf_code_lookup(dataframe.SicCodes))

#dict.value[10]

DataFrame[SicCodes: string, Sector: string]

In [87]:
updatedFrame = dataframe.select('SicCodes', 'DiffMeanHourlyPercent').withColumn("Sector", udf_code_lookup(dataframe.SicCodes))
updatedFrame.groupby('Sector')\
    .agg({'DiffMeanHourlyPercent': 'min'})\
    .filter(updatedFrame.Sector.isNotNull())\
    .show()


+------+--------------------------+
|Sector|min(DiffMeanHourlyPercent)|
+------+--------------------------+
|     K|                     -24.2|
|     F|                    -267.6|
|     Q|                    -400.0|
|     E|                     -30.0|
|     T|                       0.6|
|     M|                    -150.0|
|     L|                     -15.1|
|     U|                      -6.2|
|     O|                     -25.0|
|     D|                     -19.4|
|     C|                     -51.5|
|     J|                     -37.4|
|     N|                     -64.4|
|     S|                     -47.2|
|     R|                     -32.6|
|     G|                     -41.4|
|     I|                     -25.2|
|     P|                     -51.0|
|     H|                     -36.0|
+------+--------------------------+

