This Notebook is for analyzing profile test data. The test data includes 20 json files. This notebook will load the data into dataframe and analyzed using Spark SQL.

1. Import PySpark Library

In [1]:
from pyspark.sql import SparkSession

2. Create SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Profile Analyse") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

3. Load JSON FILES

In [3]:
df = spark.read.json("test_data/*.json")

4. Print DataFrame Schema

In [4]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- profile: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- jobHistory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- fromDate: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |    |-- salary: long (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- toDate: string (nullable = true)
 |    |-- lastName: string (nullable = true)



5. Records in DataSet

In [5]:
df.count()

17139693

6. Flatten Nested DataFrame (Data Preparation for Analysis)

Firstly, convert firstName, lastName and jobHistory into columns

In [6]:
nested_df = df.select("id", "profile.firstName", "profile.lastName", "profile.jobHistory")

Secondly, use explode function to explode jobHistory array into multiple rows. Noted: if jobHistory is empty Array, there will be no records in the exploded dataframe.

In [7]:
from pyspark.sql.functions import explode
exploded_df = nested_df.withColumn("jobHistory", explode(nested_df.jobHistory))

Thirdly, convert fromDate, location, salary, title, toDate into columns

In [8]:
flattern_df = exploded_df.select("id", "firstName", "lastName", "jobHistory.fromDate", "jobHistory.location", "jobHistory.salary", "jobHistory.title", "jobHistory.toDate")

7. Create TempView for dataFrame

In [9]:
flattern_df.createOrReplaceTempView("people")

8. Average Salary for each profile

In [10]:
sqlDF = spark.sql("SELECT id, firstName, lastName, avg(salary) as avgSalary FROM people group by id, firstName, lastName order by lastName desc limit 10")
sqlDF.show()

+--------------------+---------+--------+------------------+
|                  id|firstName|lastName|         avgSalary|
+--------------------+---------+--------+------------------+
|82dab74c-3946-45b...|   Robert|  Zywiec| 66833.33333333333|
|5894afab-574f-429...|  Richard|  Zywiec|           69625.0|
|ba24222d-6e39-40d...|  Matthew|  Zywiec|           65500.0|
|58305fc8-a83c-4f2...|    Joyce| Zywicki|          123500.0|
|6cdb8e48-ee18-4b4...|     Jodi| Zywicki|138571.42857142858|
|13348815-753b-43d...|Stephanie| Zywicki|131888.88888888888|
|080d601f-774b-47f...| Jonathan| Zywicki|          120625.0|
|3462e81b-a9cc-4ac...|   Sandra| Zywicki|43333.333333333336|
|b8a6bf60-03da-4e0...|     Paul| Zywicki| 98166.66666666667|
|eeb15ed5-fb0d-4d6...|    Donna| Zywicki|          115000.0|
+--------------------+---------+--------+------------------+



9. Average Salary for whole DataSet

In [11]:
sqlDF = spark.sql("SELECT avg(salary) as avgSalary FROM people")
sqlDF.show()

+----------------+
|       avgSalary|
+----------------+
|97473.6229416272|
+----------------+



10. Top 5 Paying Job

If we consider the most paying job contury wide, the average pay of a job is calculated based on all the jobs accorss all locations.

In [12]:
sqlDF = spark.sql("SELECT title, avg(salary) as avgSalary FROM people group by title order by avgSalary desc, title limit 5")
sqlDF.show()

+--------------------+-----------------+
|               title|        avgSalary|
+--------------------+-----------------+
|      internal sales|97555.94285236429|
|  service technician| 97539.8681255395|
|     Support Analyst|97515.95270974559|
|clinical psycholo...|97515.49312935937|
|             dentist| 97515.0922739562|
+--------------------+-----------------+



If we consider the most paying job location wide, the average pay of a job is calculated based on all the jobs in a specific location.

In [13]:
sqlDF = spark.sql("SELECT title, location, avg(salary) as avgSalary FROM people group by title, location order by avgSalary desc, title, location limit 5")
sqlDF.show()

+--------------------+---------+-----------------+
|               title| location|        avgSalary|
+--------------------+---------+-----------------+
|   cosmetic injector|Melbourne|97685.17451749711|
|safety superinten...|    Perth|97670.41450212519|
|  Multi Site Manager|Melbourne| 97646.6894018173|
|             trimmer| Brisbane|97643.56431163519|
|       store manager|   Hobart|97641.65395372489|
+--------------------+---------+-----------------+



11. Bottom 5 Paying Job

In [14]:
sqlDF = spark.sql("SELECT title, avg(salary) as avgSalary FROM people group by title order by avgSalary, title limit 5")
sqlDF.show()

+--------------------+-----------------+
|               title|        avgSalary|
+--------------------+-----------------+
|business developm...|97410.55035168272|
|    research analyst|97412.92759450486|
|retail sales cons...|97419.07190129737|
|Administration Of...|97423.83029048711|
|           paralegal|97432.43600056692|
+--------------------+-----------------+



12. The person that makes most money

To find the person that makes most money currently, we need to make sure the person is still working (toDate is null) and then find the highest salary.

In [15]:
sqlDF = spark.sql(""" SELECT id, firstName, lastName, salary FROM people where toDate is null order by salary desc, lastName desc, fromDate desc limit 1   """)
sqlDF.show()

+--------------------+---------+--------+------+
|                  id|firstName|lastName|salary|
+--------------------+---------+--------+------+
|5b217f27-8f8d-4dc...|    Kevin|    Zyla|159000|
+--------------------+---------+--------+------+



13. Most popular tilte in 2019

To Find most popular title started in 2019, we need to extract year from fromDate.

In [16]:
sqlDF = spark.sql(""" select title as MostPopularJob from ( SELECT title, count(*) as con  FROM people where SUBSTRING(fromDate, 1, 4) = '2019' group by title order by con desc limit 1) """)
sqlDF.show()

+-----------------+
|   MostPopularJob|
+-----------------+
|Sheetmetal Worker|
+-----------------+



14. How many people are currently working

In [17]:
sqlDF = spark.sql(""" SELECT count(distinct id) as NumOfWorkingPeople FROM people where toDate is null """)
sqlDF.show()

+------------------+
|NumOfWorkingPeople|
+------------------+
|           7710613|
+------------------+



15. List latest job for each profile

In [18]:
sqlDF = spark.sql(""" SELECT id, firstName, lastName, location, title, salary, fromDate, toDate from (SELECT *, 
ROW_NUMBER() OVER(PARTITION BY id ORDER BY fromDate DESC) AS row_number FROM people) where row_number = 1 order by lastName desc, firstName  limit 10 """)
sqlDF.show()

+--------------------+---------+--------+---------+--------------------+------+----------+----------+
|                  id|firstName|lastName| location|               title|salary|  fromDate|    toDate|
+--------------------+---------+--------+---------+--------------------+------+----------+----------+
|ba24222d-6e39-40d...|  Matthew|  Zywiec|    Perth|  Multi Site Manager| 67000|2017-04-23|      null|
|5894afab-574f-429...|  Richard|  Zywiec|   Sydney|           assembler| 83000|2018-07-23|      null|
|82dab74c-3946-45b...|   Robert|  Zywiec| Adelaide|registration officer| 85000|2016-08-08|2019-04-08|
|4e26c80a-8e84-46f...|    Bobby| Zywicki|    Perth| taxation accountant| 89000|2017-12-11|2019-04-11|
|f643f39c-e18a-430...|   Calvin| Zywicki| Adelaide|assistant operati...|144000|2015-04-24|2019-01-24|
|03aeca24-7be1-42a...|  Charles| Zywicki|   Sydney|    sales consultant| 95000|2016-06-10|2019-04-10|
|cc529ff4-2dbf-4ce...|  Cherryl| Zywicki|    Perth|             trimmer| 66000|201

16. Highest Paying job for each profile

In [19]:
storeDF = spark.sql(""" SELECT title, location, id, firstName, lastName, salary, SUBSTRING(fromDate, 1, 4) as yearMade  from (SELECT *, 
ROW_NUMBER() OVER(PARTITION BY id ORDER BY salary desc) AS row_number FROM people) where row_number = 1  """)
storeDF.show(10)

+--------------------+---------+--------------------+---------+--------+------+--------+
|               title| location|                  id|firstName|lastName|salary|yearMade|
+--------------------+---------+--------------------+---------+--------+------+--------+
|           evaluator|Melbourne|00008a82-3345-419...|     Alan| Hawkins| 87000|    2016|
|     devops engineer| Adelaide|00008d2e-3760-452...|   Alfred|     Siu| 61000|    2015|
|  service technician| Adelaide|0000aa9b-7c17-489...|     Juan|    Moss| 85000|    2018|
|     Support Analyst| Adelaide|0000b622-2709-4ef...|     Mark|     Roy| 83000|    2018|
|     counter manager| Brisbane|0000c8c2-af21-48d...|    Wayne|   Hyman|139000|    2017|
|business developm...|   Hobart|00019089-c2f9-48d...|    Linda|  Gardea| 83000|    2016|
|medical radiation...|Melbourne|0001a9dc-3396-40e...|   Steven| Hilbert| 79000|    2018|
|     counter manager|    Perth|0001da88-d735-4b9...|     Fred|Bernhard| 98000|    2015|
|procurement speci...

17. write data into parquet file

In [22]:
storeDF.write.partitionBy("yearMade")\
        .mode("overwrite")\
        .option("compression", "snappy")\
        .parquet("output/people")