#### 1 - Please load the dataset into a Spark dataframe

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, desc

import util

spark = SparkSession.builder.appName("example").getOrCreate()

df = spark.read.option("inferSchema", "true").json("test_small/*.json")

# flatten the df to make analysis easier
df = util.get_flattened_job_profile_data(df)

df.show(10)

+--------------------+--------------------+
|                  id|             profile|
+--------------------+--------------------+
|da313df5-9613-450...|{Daniel, [{2016-0...|
|2238d6ef-ff70-4d5...|{Louis, [{2015-03...|
|11214286-41bb-4d0...|{Olive, [{2013-06...|
|3c035b6e-8483-49a...|{Joe, [{2014-09-2...|
|3c035b6e-8483-49a...|{Andrea, [{2014-0...|
|3c035b6e-8483-49a...|{George, [{2014-0...|
|3c035b6e-8483-49a...|{Bob1, [{2018-09-...|
|3c035b6e-8483-49a...|{Bob2, [{2019-09-...|
|3c035b6e-8483-49a...|{Bob3, [{2019-09-...|
|3c035b6e-8483-49a...|{Bob4, [{2019-11-...|
|3c035b6e-8483-49a...|{Bob5, [{2019-09-...|
|3c035b6e-8483-49a...|{Bob6, [{2019-04-...|
|3c035b6e-8483-49a...|{Bob7, [{2019-06-...|
|3c035b6e-8483-49a...|{Bob8, [{2020-09-...|
+--------------------+--------------------+

root
 |-- id: string (nullable = true)
 |-- profile: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- jobHistory: array (nullable = true)
 |    |    |-- element: struct (conta

#### 2 - Print the schema

In [2]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- jobDetail: struct (nullable = true)
 |    |-- fromDate: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- salary: long (nullable = true)
 |    |-- title: string (nullable = true)
 |    |-- toDate: string (nullable = true)



#### 3 - How many records are there in the dataset?

In [3]:
df.count()

42

#### 4 - What is the average salary for each profile?
##### Display the first 10 results, ordered by lastName in descending order

In [4]:
util.get_average_salaries_by_profile(df) \
    .orderBy(desc('avgSalary')) \
    .limit(10) \
    .show()

+--------------------+---------+--------+---------+
|                  id|firstName|lastName|avgSalary|
+--------------------+---------+--------+---------+
|3c035b6e-8483-49a...|   George|Kastanza| 149000.0|
|3c035b6e-8483-49a...|      Joe| Johnson| 149000.0|
|3c035b6e-8483-49a...|     Bob2|   Barry| 139000.0|
|3c035b6e-8483-49a...|     Bob4|   Barry| 139000.0|
|3c035b6e-8483-49a...|     Bob3|   Barry| 139000.0|
|3c035b6e-8483-49a...|     Bob6|   Barry| 139000.0|
|3c035b6e-8483-49a...|   Andrea|Berryman| 139000.0|
|3c035b6e-8483-49a...|     Bob8|   Barry| 139000.0|
|3c035b6e-8483-49a...|     Bob1|   Barry| 139000.0|
|3c035b6e-8483-49a...|     Bob5|   Barry| 139000.0|
+--------------------+---------+--------+---------+



#### 5 - What is the average salary across the whole dataset?

In [5]:
util.get_average_salary_for_all_profiles(df).show()

+------------------+
|         avgSalary|
+------------------+
|109880.95238095238|
+------------------+



#### 6 - On average, what are the top 5 paying jobs? Bottom 5 paying jobs?
##### If there is a tie, please order by title, ~~location~~.

In [8]:
print('Top 5 paying jobs')
util.get_average_salaries_by_job_title(df) \
    .orderBy(desc('avg_salary'), 'jobTitle') \
    .limit(5) \
    .show()

print('Bottom 5 paying jobs')
util.get_average_salaries_by_job_title(df) \
    .orderBy('avg_salary', 'jobTitle') \
    .limit(5) \
    .show()

Top 5 paying jobs
+---------------+----------+
|       jobTitle|avg_salary|
+---------------+----------+
|        actuary|  164000.0|
|devops engineer|  154000.0|
|doctor engineer|  144000.0|
|   new engineer|  144000.0|
|   old engineer|  144000.0|
+---------------+----------+

Bottom 5 paying jobs
+--------------------+----------+
|            jobTitle|avg_salary|
+--------------------+----------+
|     support analyst|   42000.0|
|           evaluator|   44000.0|
|  service technician|   51000.0|
|          technician|   54000.0|
|corporate consultant|   60000.0|
+--------------------+----------+



#### 7 - Who is currently making the most money?
##### If there is a tie, please order in lastName descending, fromDate descending.

In [None]:
result = util.get_current_salaries_by_profile(df)
result = util.get_max_rows_for_column(result, 'currentSalary')
result.show()

#### 8 - What was the most popular job title that started in 2019?

In [None]:
from pyspark.sql.functions import min, year, count
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)

titles_started_in_2019 = df_flattened.groupBy('jobDetail.title') \
    .agg(min(col('jobDetail.fromDate')).alias('firstSeenDate')) \
    .where(year(col('firstSeenDate')) == 2019)

titles_started_in_2019_list = [row.title for row in titles_started_in_2019.collect()]

popular_titles_in_2019 = df_flattened.groupBy('jobDetail.title') \
    .agg(count('jobDetail.title').alias('titleCount')) \
    .where(col('jobDetail.title').isin(titles_started_in_2019_list)) \
    .orderBy(desc('titleCount'))
    

popular_titles_in_2019.collect()[0][0]


In [None]:
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)

titles_started_in_2019 = df_flattened.groupBy('jobDetail.title') \
    .agg(min(col('jobDetail.fromDate')).alias('firstSeenDate')) \
    .where(year(col('firstSeenDate')) == 2019) \
    .select('title')

title_counts = df_flattened.groupBy('jobDetail.title') \
    .agg(count('jobDetail.title').alias('titleCount'))

popular_titles_in_2019 = titles_started_in_2019 \
    .join(title_counts, titles_started_in_2019.title == title_counts.title, 'inner') \
    .select(titles_started_in_2019.title, 'titleCount') \
    .orderBy(desc('titleCount'))

popular_titles_in_2019.show(1)

#### 9 - How many people are currently working?

In [None]:
from pyspark.sql.functions import countDistinct
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)
df_flattened.select('jobDetail.toDate')

current_roles = df_flattened.where(isnull(col('jobDetail.toDate')))

current_roles.select(countDistinct('id').alias('count_of_current_people_working')).show()

#### 10 - For each person, list only their latest job
##### Display the first 10 results, ordered by lastName descending, firstName ascending order.

In [None]:
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)
df_flattened.select('jobDetail.toDate')

df = df_flattened.groupBy('id', 'firstName', 'lastName') \
    .agg(max('jobDetail.fromDate').alias('maxFromDate'))

df.show(truncate=False)

In [None]:
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)
df_max_dates = df_flattened.groupBy('id').agg(max('jobDetail.fromDate').alias('maxFromDate'))

df_result = df_flattened.join(df_max_dates, on=['id']) \
    .where(col('jobDetail.fromDate') == col('maxFromDate')) \
    .select('id', 'firstName', 'lastName', 'jobDetail')

df_result.show(truncate=False)

#### 11 - For each person, list their highest paying job along with their first name, last name, salary and the year they made this salary
##### Store the results in a dataframe, and then print out 10 results

In [None]:
df_flattened = df_small.select(
    'id',
    'profile.firstName',
    'profile.lastName', explode('profile.jobHistory').alias('jobDetail')
)
df_job_with_max_salary = df_flattened.groupBy('id').agg(max('jobDetail.salary').alias('salary'))
df_job_with_max_salary

df_result = df_flattened.join(df_max_dates, on=['id']) \
    .select('firstName', 'lastName', 'jobDetail.salary', year(col('jobDetail.fromDate')).alias('year'))

df_result.show(10)

#### 12 - Write out the last result (question 11) in parquet format, compressed, partitioned by the year of their highest paying job

In [None]:
df_result.write.partitionBy('year').parquet('output_data/', compression='gzip', mode='overwrite')