In [106]:
# spark.stop()

In [1]:
import findspark 
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array_contains, split, when, regexp_replace

# creat SparkSession
spark = SparkSession.builder \
    .appName("ETL_media") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "1") \
    .getOrCreate()
print(f"Spark Executor Memory: {spark.conf.get('spark.executor.memory')}")
print(f"Spark Driver Memory: {spark.conf.get('spark.driver.memory')}")
print(f"Spark Executor Cores: {spark.conf.get('spark.executor.cores')}")
print(f"Spark Driver Cores: {spark.conf.get('spark.driver.cores')}")

#file_path = r'F:\申请，润\我去澳洲\university of adelaide\AWS\video_data.csv'
# load CSV 
# File layout - inferSchema 
# Field delimiter – pipe, tab, comma
# File encoding – ASCII, UTF-8, UTF-16,gb2312 中文字符
# API - read.csv
#df = spark.read.csv(file_path, header=True, inferSchema=True,encoding="gb2312")
df = spark.read.csv("video_data_new_id.csv", header=True, inferSchema=True,encoding="UTF-8") # automately recognize the type of data
df.show(1,truncate=False)



Spark Executor Memory: 8g
Spark Driver Memory: 4g
Spark Executor Cores: 4
Spark Driver Cores: 1
+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
|DateTime|VideoTitle                                               |events                                         |id                                  |
+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
|NULL    |App Web|Clips|a-current-affair;2016|William Tyrrell twist|157,120,160,104,162,000,000,000,000,000,000,000|6e37dd30-bc06-4541-b39a-62e07baa0204|
+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
only showing top 1 row



# initial check

## layout

In [109]:
from pyspark.sql.functions import trim
if df.columns != ['DateTime','VideoTitle','events','id']:
    spark.stop()
    print("The dataset is lack of necessary columns")

df = df.withColumn("VideoTitle", trim(col("VideoTitle")))
# 替换 VideoTitle 列中每个 | 符号后的空格
df = df.withColumn("VideoTitle", regexp_replace(col("VideoTitle"), r'\|\s+', '|'))
df.show(1,truncate=False)

+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
|DateTime|VideoTitle                                               |events                                         |id                                  |
+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
|NULL    |App Web|Clips|a-current-affair;2016|William Tyrrell twist|157,120,160,104,162,000,000,000,000,000,000,000|6e37dd30-bc06-4541-b39a-62e07baa0204|
+--------+---------------------------------------------------------+-----------------------------------------------+------------------------------------+
only showing top 1 row



## garbled characters or unreadable character

## None Value

In [110]:
from pyspark.sql.functions import size,lit

# None value
# 当您调用 df.filter(condition) 时，Spark 将应用这个条件到每一行数据上。
# 如果某行中任何一个列的值满足 condition 中的条件（即为空），
# 那么这一行就会被包含在 anomalies_df 中。
condition = None
for column in df.columns:
    #print(column)
    if condition is None:
        condition = col(column).isNull()
    else:
        condition = condition | col(column).isNull()
# 找到包含 None 值的行
anomalies_df = df.filter(condition) #.withColumn("AnomalyType", lit("Contains None value"))
# lit 常常用于为新列赋予静态值，比如标识数据中的异常值或者增加注释列。 


In [111]:
#异常值写入csv日志
def write_anomalies(df_fault, file_name,anomal_type,mode = "append"):
    df_fault= df_fault.withColumn("AnomalyType", lit(anomal_type))
    print(f"number of {anomal_type}:{df_fault.count()}")
    df_fault.coalesce(1).write.csv(file_name, header=True, mode=mode)
    log_message = f"Number of {anomal_type}: {df_fault.count()}"
    return log_message

#anomalies info record in txt
def write_log(log_message,log_file):
    from datetime import datetime
    log_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(log_file, 'a') as log_file_obj:
        log_file_obj.write(log_time + ' , ' + log_message + "\n")

In [112]:
from datetime import datetime
date_today = datetime.now().strftime("%Y-%m-%d")
file_name = rf"F:\申请，润\我去澳洲\university of adelaide\AWS\ETL\anomalies_log_{date_today}"
log_file= rf"F:\申请，润\我去澳洲\university of adelaide\AWS\ETL\file_log_{date_today}"

log_message = write_anomalies(anomalies_df, file_name,"Contains None value")
write_log(log_message,log_file)

df = df.filter(~condition)
df.count()

number of Contains None value:16


1048559

## outliers
Data-type check: 1. Numerical value check. 2 String length check. 3 Date value check. 4. Boolean value check

### Date, string

In [113]:
from pyspark.sql.functions import to_date,length
from pyspark.sql.types import StringType, ArrayType
date_cols = ["DateTime", "VideoTitle","events"]
max_length = 150
for col_name in date_cols:
    if col_name == "DateTime":
        df = df.withColumn("DateTime_test", to_date(col_name, "yyyy-MM-dd HH:mm:ss"))
        df_fault = df.filter(~(col("DateTime_test").isNotNull()))# 如果不能转化为以上格式，就是Null
        if df_fault.count() > 0 :
            #df_fault = df_fault.withColumn("AnomalyType", lit("DateTime Fault"))
            log_message = write_anomalies(df_fault.drop("DateTime_test"), file_name,"DateTime Fault")
        
           # log_message = f"Number of DateTime anomalies: {df_fault.count()}"
            write_log(log_message,log_file)
        
            df = df.filter(col("DateTime_test").isNotNull()).drop("DateTime_test") #if df_fault == 0 then df = df
            print(df.count())
        else:
            df = df.drop("DateTime_test")

    elif col_name == "VideoTitle":
        df_fault = df.filter(~(length(col(col_name)) <= max_length))
        if df_fault.count() >0 :
            log_message = write_anomalies(df_fault, file_name,"VideoTitle Length Fault")
            write_log(log_message,log_file)
            df = df.filter(length(col(col_name)) <= max_length)

    elif col_name == "events":
        df_fault = df.filter(df.events.contains("206"))
        if df_fault.count() >0 :
            log_message = write_anomalies(df_fault, file_name,"Events contains 206")
            write_log(log_message,log_file)
            df = df.filter(~df.events.contains("206"))

df.count()

# df = df.withColumn("event_array", split(col("events"), ",").cast("array<int>"))
# df.show(1)
# print(df.count())



number of DateTime Fault:16
1048543
number of VideoTitle Length Fault:35
number of Events contains 206:23


1048485

In [114]:
df = df.withColumn("VideoTitle_split", split(col("VideoTitle"), "\\|"))
df.show(1)
# 过滤 DataFrame
df_fault = df.filter(
    ~(((size(col("VideoTitle_split")) == 2) & (col("VideoTitle_split")[0] == "news")) |
    ((size(col("VideoTitle_split")) == 4) & (col("VideoTitle_split")[0] != "news")))
).drop("VideoTitle_split")

if df_fault.count() >0 :
# 在写入CSV文件时，PySpark不支持直接写入复杂的数据类型（如数组、结构体等）。
# CSV格式是纯文本格式，不支持嵌套或复杂的数据结构，因此需要将这些复杂类型转换为可以表示为纯文本的简单类型。
# 对于数组类型，可以将数组转换为字符串格式，然后再写入CSV文件。
# 你可以使用 pyspark.sql.functions.concat_ws 函数将数组元素连接成一个字符串。
    log_message = write_anomalies(df_fault, file_name,"VideoTitle_split Fault")
    write_log(log_message,log_file)
    df = df.filter(
        ((size(col("VideoTitle_split")) == 2) & (col("VideoTitle_split")[0] == "news")) |
        ((size(col("VideoTitle_split")) == 4) & (col("VideoTitle_split")[0] != "news"))
    )
df.count()
df.show(1)



+-------------------+--------------------+--------------------+--------------------+--------------------+
|           DateTime|          VideoTitle|              events|                  id|    VideoTitle_split|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|2024-01-11 10:30:53|news|Shark attack...|127,157,120,160,1...|2555b587-4871-497...|[news, Shark atta...|
+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row

number of VideoTitle_split Fault:11
+-------------------+--------------------+--------------------+--------------------+--------------------+
|           DateTime|          VideoTitle|              events|                  id|    VideoTitle_split|
+-------------------+--------------------+--------------------+--------------------+--------------------+
|2024-01-11 10:30:53|news|Shark attack...|127,157,120,160,1...|2555b587-4871-497...|[news, S

# Build Data Warehouse

## DimDate

In [115]:
df.select("DateTime").show(1)

+-------------------+
|           DateTime|
+-------------------+
|2024-01-11 10:30:53|
+-------------------+
only showing top 1 row



In [116]:
from pyspark.sql.functions import col, hour, minute, dayofweek, month, quarter, year, to_timestamp

# 使用 alias() 方法创建 DataFrame 的别名
df_time = df

# 进行数据处理
df_time = df_time \
    .withColumn("DateTime", to_timestamp(col("DateTime"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("Hour", hour("DateTime")) \
    .withColumn("Minute", minute("DateTime")) \
    .withColumn("DayOfWeek", dayofweek("DateTime")) \
    .withColumn("Month", month("DateTime")) \
    .withColumn("Quarter", quarter("DateTime")) \
    .withColumn("Year", year("DateTime"))

# 显示处理后的 DataFrame
df_time.show(1,truncate=False)


+-------------------+---------------------------------+-----------------------------------+------------------------------------+------------------------------------+----+------+---------+-----+-------+----+
|DateTime           |VideoTitle                       |events                             |id                                  |VideoTitle_split                    |Hour|Minute|DayOfWeek|Month|Quarter|Year|
+-------------------+---------------------------------+-----------------------------------+------------------------------------+------------------------------------+----+------+---------+-----+-------+----+
|2024-01-11 10:30:53|news|Shark attacks spearfisherman|127,157,120,160,104,000,000,000,000|2555b587-4871-4972-b629-7b8fe1804fb2|[news, Shark attacks spearfisherman]|10  |30    |5        |1    |1      |2024|
+-------------------+---------------------------------+-----------------------------------+------------------------------------+------------------------------------+----+--

In [117]:
from pyspark.sql.functions import concat, lit, row_number
from pyspark.sql.window import Window
df_time = df_time.dropDuplicates(["DateTime"])  # maybe several events coincide
# Generate a unique DateKey
windowSpec = Window.orderBy("DateTime")
df_time = df_time.withColumn("DateKey", row_number().over(windowSpec))

# Select required columns for DimDate
DimDate = df_time.select("DateKey", "DateTime", "Hour", "Minute", "DayOfWeek", "Month", "Quarter", "Year")

# Show the prepared DimDate DataFrame
DimDate.show(2,truncate=False)
DimDate.count()

+-------+-------------------+----+------+---------+-----+-------+----+
|DateKey|DateTime           |Hour|Minute|DayOfWeek|Month|Quarter|Year|
+-------+-------------------+----+------+---------+-----+-------+----+
|1      |2024-01-11 10:30:00|10  |30    |5        |1    |1      |2024|
|2      |2024-01-11 10:30:01|10  |30    |5        |1    |1      |2024|
+-------+-------------------+----+------+---------+-----+-------+----+
only showing top 2 rows



83867

## analysis DimPlatform, DimVideotype, DimVideoname

In [118]:
from pyspark.sql.functions import split, col
df_video = df
print(df.count())

# 根据拆分后的结果创建新列 DimPlatform, DimVideoType, DimVideoName, DimvideoTopic
df_video = df_video \
    .withColumn("DimPlatform", col("VideoTitle_split")[0]) \
    .withColumn("DimVideoType", when(col("VideoTitle_split")[0] == "news", "Unknow").otherwise(col("VideoTitle_split")[1])) \
    .withColumn("DimVideoName", when(col("VideoTitle_split")[0] == "news", "Unknow").otherwise(col("VideoTitle_split")[2])) \
    .withColumn("DimvideoTopic", when(col("VideoTitle_split")[0] == "news", col("VideoTitle_split")[1]).otherwise(col("VideoTitle_split")[3]))

df_video.show(1)


1048474
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|           DateTime|          VideoTitle|              events|                  id|    VideoTitle_split|DimPlatform|DimVideoType|DimVideoName|       DimvideoTopic|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|2024-01-11 10:30:53|news|Shark attack...|127,157,120,160,1...|2555b587-4871-497...|[news, Shark atta...|       news|      Unknow|      Unknow|Shark attacks spe...|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
only showing top 1 row



In [119]:
df_video.select("DimPlatform").distinct().show()

+-----------+
|DimPlatform|
+-----------+
|       news|
|    App Web|
|   App iPad|
| App iPhone|
|App Android|
+-----------+



In [120]:
df_video.select("DimVideoType").distinct().show()

+------------+
|DimVideoType|
+------------+
|       Clips|
|        Live|
|    Episodes|
|      Unknow|
|        Clip|
|     Episode|
+------------+



In [121]:
df_video.select("DimVideoName").distinct().show(2)
df_video.select("DimVideoName").distinct().count()

+--------------------+
|        DimVideoName|
+--------------------+
|stop-search-seize...|
|           hi-5;2024|
+--------------------+
only showing top 2 rows



217

In [122]:
df_video = df_video.withColumn("DimVideoType",
                               when(col("DimVideoType") == "Clips", "Clip")
                               .when(col("DimVideoType") == "Episodes", "Episode")
                               .otherwise(col("DimVideoType")))

In [123]:
df_video.select("DimvideoType").distinct().show()
df_video.show(1)

+------------+
|DimvideoType|
+------------+
|        Clip|
|        Live|
|      Unknow|
|     Episode|
+------------+

+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|           DateTime|          VideoTitle|              events|                  id|    VideoTitle_split|DimPlatform|DimVideoType|DimVideoName|       DimvideoTopic|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|2024-01-11 10:30:53|news|Shark attack...|127,157,120,160,1...|2555b587-4871-497...|[news, Shark atta...|       news|      Unknow|      Unknow|Shark attacks spe...|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
only showing top 1 row



## build DimPlatform, DimVideotype, DimVideoname

In [124]:
df_platform = df_video.select("DimPlatform").distinct()
df_platform.show()


windowSpec = Window.orderBy("DimPlatform")
df_platform = df_platform.withColumn("platform_id", row_number().over(windowSpec))

# Select required columns for DimDate
DimPlatform = df_platform.select("platform_id", "DimPlatform") \
                        .withColumnRenamed("DimPlatform","platform")

# Show the prepared DimDate DataFrame
DimPlatform.show(truncate=False)

+-----------+
|DimPlatform|
+-----------+
|       news|
|    App Web|
|   App iPad|
| App iPhone|
|App Android|
+-----------+

+-----------+-----------+
|platform_id|platform   |
+-----------+-----------+
|1          |App Android|
|2          |App Web    |
|3          |App iPad   |
|4          |App iPhone |
|5          |news       |
+-----------+-----------+



In [125]:
from pyspark.sql.functions import asc
windowSpec = Window.orderBy(asc("DimVideoType"))
df_videotype = df_video.select("DimVideoType")\
                        .distinct()\
                        .withColumn("videotype_id", row_number().over(windowSpec))

# Select required columns for DimDate
DimVideoType = df_videotype.select("videotype_id", "DimVideoType") \
                        .withColumnRenamed("DimVideoType","videotype")

# Show the prepared DimDate DataFrame
DimVideoType.show(truncate=False)

+------------+---------+
|videotype_id|videotype|
+------------+---------+
|1           |Clip     |
|2           |Episode  |
|3           |Live     |
|4           |Unknow   |
+------------+---------+



In [126]:
windowSpec = Window.orderBy(asc("DimVideoName"))
df_videoname = df_video.select("DimVideoName")\
                        .distinct()\
                        .withColumn("videoname_id", row_number().over(windowSpec))

# Select required columns for DimDate
DimVideoName = df_videoname.select("videoname_id", "DimVideoName") \
                        .withColumnRenamed("DimVideoName","videoname")

# Show the prepared DimDate DataFrame
DimVideoName.show(5,truncate=False)

DimVideoName.count()

+------------+----------------------+
|videoname_id|videoname             |
+------------+----------------------+
|1           |2-broke-girls;season-5|
|2           |2020;2016             |
|3           |60 Minutes;2016       |
|4           |60-minutes;2014       |
|5           |60-minutes;2015       |
+------------+----------------------+
only showing top 5 rows



217

In [127]:
df_video.show(1)
windowSpec = Window.orderBy(asc("DimvideoTopic"))
df_topic = df_video.dropDuplicates(["DimvideoTopic"])\
                    .withColumn("topic_id", row_number().over(windowSpec))\
                    .select("topic_id","DimvideoTopic","DimVideoName","DimVideoType","DimPlatform")\
                    .withColumnRenamed("DimvideoTopic","videoTopic")\
                    .withColumnRenamed("DimVideoName","videoName")\
                    .withColumnRenamed("DimVideoType","videoType")\
                    .withColumnRenamed("DimPlatform","videoPlatform")
df_topic.show(3,truncate=False)

+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|           DateTime|          VideoTitle|              events|                  id|    VideoTitle_split|DimPlatform|DimVideoType|DimVideoName|       DimvideoTopic|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
|2024-01-11 10:30:53|news|Shark attack...|127,157,120,160,1...|2555b587-4871-497...|[news, Shark atta...|       news|      Unknow|      Unknow|Shark attacks spe...|
+-------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+------------+--------------------+
only showing top 1 row

+--------+---------------+---------------------+---------+-------------+
|topic_id|videoTopic     |videoName            |videoType|videoPlatform|
+----

In [128]:

df_topic = df_topic.join(DimVideoName, df_topic.videoName == DimVideoName.videoname, "left") \
                 .join(DimVideoType, df_topic.videoType == DimVideoType.videotype, "left") \
                 .join(DimPlatform, df_topic.videoPlatform == DimPlatform.platform, "left")

df_topic.show(2,truncate=False)
DimVideoTopic = df_topic.select("topic_id","videoTopic","videoname_id","videotype_id","platform_id")
DimVideoTopic.show(2)
DimVideoTopic.count()

+--------+---------------+---------------------+---------+-------------+------------+---------------------+------------+---------+-----------+--------+
|topic_id|videoTopic     |videoName            |videoType|videoPlatform|videoname_id|videoname            |videotype_id|videotype|platform_id|platform|
+--------+---------------+---------------------+---------+-------------+------------+---------------------+------------+---------+-----------+--------+
|1       |#FindKaylaWeber|major-crimes;season-4|Episode  |App Web      |100         |major-crimes;season-4|2           |Episode  |2          |App Web |
|2       |#Roxy: Part one|60-minutes;2016      |Clip     |App Web      |6           |60-minutes;2016      |1           |Clip     |2          |App Web |
+--------+---------------+---------------------+---------+-------------+------------+---------------------+------------+---------+-----------+--------+
only showing top 2 rows

+--------+---------------+------------+------------+-----------

3079

## Fact Table

In [129]:
windowSpec = Window.orderBy(asc("DateTime"))
FactVideo = df_video.join(df_topic,df_video.DimvideoTopic == df_topic.videoTopic,"left")\
                    .withColumn("record_id", row_number().over(windowSpec))\
                    .withColumnRenamed("id","user_id")\
                    .select("record_id","user_id","DateTime","videoTopic","events")
            
FactVideo.show(10)


+---------+--------------------+-------------------+--------------------+--------------------+
|record_id|             user_id|           DateTime|          videoTopic|              events|
+---------+--------------------+-------------------+--------------------+--------------------+
|        1|8c27fa68-6cfb-498...|2024-01-11 10:30:00|Celebrity reactio...|157,120,160,104,1...|
|        2|92eee1ad-4480-42e...|2024-01-11 10:30:00|Celebrity reactio...|157,120,160,104,1...|
|        3|63d21351-285b-4e7...|2024-01-11 10:30:00|Australian couple...|127,157,120,160,1...|
|        4|e84969dd-bc03-454...|2024-01-11 10:30:00|                 VIC|157,120,160,104,1...|
|        5|4897e662-9886-499...|2024-01-11 10:30:00|    January 10, 2024|201,157,120,160,1...|
|        6|2f18aa82-5e56-4f0...|2024-01-11 10:30:00|Ep 7 The Courage ...|501,508,507,509,1...|
|        7|fd7d16ed-56a7-4bf...|2024-01-11 10:30:00|Ep 7 The Courage ...|501,508,507,509,1...|
|        8|0a563087-6c3f-478...|2024-01-11 10:30:0

In [130]:
mode = "append"
date_today = datetime.now().strftime("%Y-%m-%d")
data_warehouse_name = rf"F:\申请，润\我去澳洲\university of adelaide\AWS\ETL\DataWarehouse_{date_today}"
FactVideo.coalesce(1).write.csv(data_warehouse_name, header=True, mode="overwrite")
DimPlatform.coalesce(1).write.csv(data_warehouse_name, header=True, mode=mode)
DimDate.coalesce(1).write.csv(data_warehouse_name, header=True, mode=mode)
DimVideoType.coalesce(1).write.csv(data_warehouse_name, header=True, mode=mode)
DimVideoName.coalesce(1).write.csv(data_warehouse_name, header=True, mode=mode)
DimVideoTopic.coalesce(1).write.csv(data_warehouse_name, header=True, mode=mode)


# Json

In [12]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType


# Define the schema for the JSON data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("profile", StructType([
        StructField("firstName", StringType(), True),
        StructField("jobHistory", ArrayType(StructType([
            StructField("fromDate", StringType(), True),
            StructField("location", StringType(), True),
            StructField("salary", LongType(), True),
            StructField("title", StringType(), True),
            StructField("toDate", StringType(), True)
        ])), True),
        StructField("lastName", StringType(), True)
    ]), True)
])

# Read JSON data
#df_user = spark.read.json("user.json", schema=schema)
df_user = spark.read.json("user.json")

In [13]:
df_user.show(1,truncate =False)
df_user.printSchema()
df_user.count()

df_user = df_user.dropna(how="all").distinct()

df_user.show(1)
df_user.count()

# Broadcast the user IDs
# from pyspark.sql.functions import broadcast
# broadcast_user_ids = spark.sparkContext.broadcast(df_user_id.collect())

# 在PySpark中，使用广播变量（Broadcast Variables）时，
# 需要将数据从分布式的Executor节点传输到驱动程序节点（Driver节点）。
# 这是因为广播变量是在驱动程序上创建和管理的，然后将其分发到工作节点上的任务中使用。
# 具体来说，为了在广播变量中使用DataFrame或RDD的数据，需要使用collect()方法将数据收集到驱动程序节点。

+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id                                  |profile                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+------------------------------------+

190000

In [None]:
# # Remove unnecessary fields (assuming we don't need the location field)
# df = df.withColumn("profile", col("profile").withField("jobHistory", 
#               col("profile.jobHistory").cast(ArrayType(
#                   StructType([
#                       StructField("fromDate", StringType(), True),
#                       StructField("salary", LongType(), True),
#                       StructField("title", StringType(), True),
#                       StructField("toDate", StringType(), True)
#                   ])
#               ))))

In [4]:
# Handle missing values: fill missing firstName and lastName with "Unknown"
df_user = df_user.withColumn("profile", col("profile").withField("firstName", 
              when(col("profile.firstName").isNull(), "Unknown").otherwise(col("profile.firstName"))))
df_user = df_user.withColumn("profile", col("profile").withField("lastName", 
              when(col("profile.lastName").isNull(), "Unknown").otherwise(col("profile.lastName"))))


In [14]:
# Type conversion: convert date strings to date type (assuming date format is "yyyy-MM-dd")
from pyspark.sql.functions import col, expr, struct
from pyspark.sql.functions import to_date
df_user = df_user.withColumn("profile",\
                  struct( 
                         col("profile.firstName").alias("firstName"),\
                         expr("transform(profile.jobHistory, \
                                       x -> struct(to_date(x.fromDate, 'yyyy-MM-dd') as fromDate, \
                                                 to_date(x.toDate, 'yyyy-MM-dd') as toDate, \
                                                 x.title, x.location, x.salary \
                                                 ) \
                                       )" \
                            ).alias("jobHistory"), \
                         col("profile.lastName").alias("lastName")\
                         ) \
                  )
df_user.show(1)

+--------------------+--------------------+
|                  id|             profile|
+--------------------+--------------------+
|6ff74c06-5a0f-4d1...|{Mary, [{2017-09-...|
+--------------------+--------------------+
only showing top 1 row



In [15]:
updated_schema = StructType([
    StructField("id", StringType(), True),
    StructField("profile", StructType([
        StructField("firstName", StringType(), True),
        StructField("jobHistory", ArrayType(
            StructType([
                StructField("fromDate", DateType(), True),
                StructField("toDate", DateType(), True),
                StructField("title", StringType(), True),
                StructField("location", StringType(), True),
                StructField("salary", LongType(), True)
            ]), containsNull=True), True),
        StructField("lastName", StringType(), True)
    ]), False)
])

df_json = df_user.toJSON()
df_user_updated = spark.read.schema(updated_schema).json(df_json)

# Show the updated schema and data
df_user_updated.printSchema()
df_user_updated.show(1, truncate=False)

root
 |-- id: string (nullable = true)
 |-- profile: struct (nullable = true)
 |    |-- firstName: string (nullable = true)
 |    |-- jobHistory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- fromDate: date (nullable = true)
 |    |    |    |-- toDate: date (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |    |-- salary: long (nullable = true)
 |    |-- lastName: string (nullable = true)

+------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id                                  |profile                   

In [17]:
df_user_updated.show(1000,truncate=False)

+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id                                  |profile                                                                                                                                                                                                                                                                                          

In [18]:
# Q4 What is the average salary for each profile? Display the first 10 results, ordered by lastName in descending order
import warnings
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
array_mean = F.udf(lambda x: float(np.nanmean(x)), FloatType())
df_user_profile = df_user_updated.select('id','profile.lastName','profile.firstName'\
    ,transform("profile.jobHistory", lambda x: x.salary).alias("salary_list")\
    ,transform("profile.jobHistory", lambda x: x.fromDate).alias("fromDate_list")\
    ,transform("profile.jobHistory", lambda x: x.toDate).alias("toDate_list")\
    ,transform("profile.jobHistory", lambda x: x.title).alias("title_list")\
    ,transform("profile.jobHistory", lambda x: x.location).alias("location_list"))
    # .select('lastName',array_mean("salary_list").alias)\

df_user_profile.show(10)


+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  id| lastName|firstName|         salary_list|       fromDate_list|         toDate_list|          title_list|       location_list|
+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|6ff74c06-5a0f-4d1...|    Ferri|     Mary|[67000, 62000, 58...|[2017-09-23, 2017...|[NULL, 2017-09-04...|[dentist, busines...|[Brisbane, Brisba...|
|eef6f87f-3c19-40e...|Bourgeois|    Allen|            [115000]|        [2019-03-23]|              [NULL]|[service technician]|          [Brisbane]|
|0f6a46b3-1c6e-4a7...|   Durham|   Martha|[127000, 123000, ...|[2017-03-21, 2015...|[2019-04-21, 2017...|[assistant operat...|[Brisbane, Brisba...|
|7b3461ba-641f-426...|Hawkinson|   Joshua|[135000, 135000, ...|[2015-05-23, 2009...|[NULL, 2015-04-25...|[busine

In [19]:
df_user_profile.printSchema()

root
 |-- id: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- salary_list: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- fromDate_list: array (nullable = true)
 |    |-- element: date (containsNull = true)
 |-- toDate_list: array (nullable = true)
 |    |-- element: date (containsNull = true)
 |-- title_list: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- location_list: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [67]:
from datetime import date
def count_null(list_value):
    acc = 0
    for i in list_value:
        if i is None:
            acc += 1
    return acc
# UDF 中的 Spark 上下文（SparkContext）不可用。这是因为 UDF 在执行时不会自动访问活跃的 SparkContext。
# 要在 UDF 中使用当前日期，你可以在函数外部获取当前日期，并在函数中传递这个值。
# Get the current date outside the UDF
current_date_value = date.today()
def calculate_date_diff(from_dates, to_dates):
    if from_dates is None or to_dates is None:
        return None
    date_diffs = []
    for from_date, to_date in zip(from_dates, to_dates):
        if from_date is None and to_date is None:
            date_diffs.append(None)
        elif from_date is not None and to_date is None:
            to_date = current_date_value
            date_diffs.append((to_date - from_date).days)
        else:
            date_diffs.append((to_date - from_date).days)
    return date_diffs

count_listnull_udf = F.udf(count_null, IntegerType())

date_diff_udf = F.udf(calculate_date_diff, ArrayType(IntegerType()))    

array_mean = F.udf(lambda x: float(np.nanmean(x)), FloatType())
# Find the index of the maximum job duration, array_position function return bigint type, 
# but element_at function requires int type as input 
max_duration_index_expr = expr("cast(array_position(jobDuration_list, array_max(jobDuration_list)) as int)")
max_salary_index_expr = expr("cast(array_position(salary_list,array_max(salary_list)) as int)")
# toDate_list and FromDate_list should have at most one null value
df_user_profile = \
df_user_profile.withColumn("toDatelist_null",count_listnull_udf("toDate_list"))\
               .filter(F.col("toDatelist_null")<= 1)\
               .withColumn("fromDatelist_null",count_listnull_udf("FromDate_list"))\
               .filter(F.col("FromDatelist_null")<= 1)\
               .drop("toDatelist_null","fromDatelist_null")\
               .withColumn("jobDuration_list",date_diff_udf("FromDate_list","toDate_list"))\
               .withColumn("jobDuration_weeks_max", array_max(F.expr("transform(jobDuration_list, x -> ceil(x / 7))")))\
               .withColumn("avg_salary",array_mean("salary_list"))\
               .withColumn("max_salary",array_max("salary_list"))\
               .withColumn("max_job_duration_index", max_duration_index_expr)\
               .withColumn("max_job_duration_title", 
                            when(max_index_expr.isNull(), None) 
                            .otherwise(element_at("title_list", col("max_job_duration_index"))))\
               .withColumn("max_job_salary_index", max_salary_index_expr)\
               .withColumn("max_job_salary_title",
                            when(max_salary_index_expr.isNull(),None)
                            .otherwise(element_at("title_list", col("max_job_salary_index"))))\
               .withColumn("num_jobTitle",size(array_distinct(col("title_list"))))\
               .withColumn("num_jobLocation",size(array_distinct(col("location_list"))))\
               
df_user_profile.show(10)

+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+----------+----------+----------------------+----------------------+--------------------+--------------------+------------+---------------+
|                  id| lastName|firstName|         salary_list|       fromDate_list|         toDate_list|          title_list|       location_list|    jobDuration_list|jobDuration_weeks_max|avg_salary|max_salary|max_job_duration_index|max_job_duration_title|max_job_salary_index|max_job_salary_title|num_jobTitle|num_jobLocation|
+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+----------+----------+----------------------+----------------------+--------------------+--------------------+------------+---------------+
|6ff74c06-

In [None]:
from datetime import date
def count_null(list_value):
    acc = 0
    for i in list_value:
        if i is None:
            acc += 1
    return acc
# UDF 中的 Spark 上下文（SparkContext）不可用。这是因为 UDF 在执行时不会自动访问活跃的 SparkContext。
# 要在 UDF 中使用当前日期，你可以在函数外部获取当前日期，并在函数中传递这个值。
# Get the current date outside the UDF
current_date_value = date.today()
def calculate_date_diff(from_dates, to_dates):
    if from_dates is None or to_dates is None:
        return None
    date_diffs = []
    for from_date, to_date in zip(from_dates, to_dates):
        if from_date is None and to_date is None:
            date_diffs.append(None)
        elif from_date is not None and to_date is None:
            to_date = current_date_value
            date_diffs.append((to_date - from_date).days)
        else:
            date_diffs.append((to_date - from_date).days)
    return date_diffs

count_listnull_udf = F.udf(count_null, IntegerType())

date_diff_udf = F.udf(calculate_date_diff, ArrayType(IntegerType()))    

array_mean = F.udf(lambda x: float(np.nanmean(x)), FloatType())
# Find the index of the maximum job duration, array_position function return bigint type, 
# but element_at function requires int type as input 
max_duration_index_expr = expr("cast(array_position(jobDuration_list, array_max(jobDuration_list)) as int)")
max_salary_index_expr = expr("cast(array_position(salary_list,array_max(salary_list)) as int)")
# toDate_list and FromDate_list should have at most one null value
df_user_profile = \
df_user_profile.withColumn("toDatelist_null",count_listnull_udf("toDate_list"))\
               .filter(F.col("toDatelist_null")<= 1)\
               .withColumn("fromDatelist_null",count_listnull_udf("FromDate_list"))\
               .filter(F.col("FromDatelist_null")<= 1)\
               .drop("toDatelist_null","fromDatelist_null")\
               .withColumn("jobDuration_list",date_diff_udf("FromDate_list","toDate_list"))\
               .withColumn("jobDuration_weeks_max", array_max(F.expr("transform(jobDuration_list, x -> ceil(x / 7))")))\
               .withColumn("avg_salary",array_mean("salary_list"))\
               .withColumn("max_salary",array_max("salary_list"))\
               .withColumn("max_job_duration_index", max_duration_index_expr)\
               .withColumn("max_job_duration_title", 
                            when(max_index_expr.isNull(), None) 
                            .otherwise(element_at("title_list", col("max_job_duration_index"))))\
               .withColumn("max_job_salary_index", max_salary_index_expr)\
               .withColumn("max_job_salary_title",
                            when(max_salary_index_expr.isNull(),None)
                            .otherwise(element_at("title_list", col("max_job_salary_index"))))\
               .withColumn("num_jobTitle",size(array_distinct(col("title_list"))))\
               .withColumn("num_jobLocation",size(array_distinct(col("location_list"))))\
               
df_user_profile.show(10)

+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+----------+----------+----------------------+----------------------+--------------------+--------------------+------------+---------------+
|                  id| lastName|firstName|         salary_list|       fromDate_list|         toDate_list|          title_list|       location_list|    jobDuration_list|jobDuration_weeks_max|avg_salary|max_salary|max_job_duration_index|max_job_duration_title|max_job_salary_index|max_job_salary_title|num_jobTitle|num_jobLocation|
+--------------------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+----------+----------+----------------------+----------------------+--------------------+--------------------+------------+---------------+
|6ff74c06-

## Q4 What is the average salary across each persion

In [79]:
array_mean = udf(lambda x: float(np.nanmean(x)), FloatType())
       
df_user.select('id','profile.lastName',transform("profile.jobHistory", lambda x: x.salary)\
.alias("salary_list"))\
.select('lastName',array_mean("salary_list").alias("mean")).show(1) 

df_user.select(aggregate("profile.jobHistory.salary", lit(0.0), lambda acc, x: acc + x).alias("sum"), "profile.jobHistory").withColumn("size",size("jobHistory")).withColumn("mean",col("sum")/col("size")).show(1)

+--------+---------+
|lastName|     mean|
+--------+---------+
|   Ferri|56333.332|
+--------+---------+
only showing top 1 row

+--------+--------------------+----+------------------+
|     sum|          jobHistory|size|              mean|
+--------+--------------------+----+------------------+
|338000.0|[{2017-09-23, NUL...|   6|56333.333333333336|
+--------+--------------------+----+------------------+
only showing top 1 row



## Q5 What is the average salary across the whole dataset

In [93]:
from pyspark.sql import functions as F

# Explode jobHistory array to individual rows
df_exploded = df_user.withColumn("job_exploded", F.explode("profile.jobHistory")) \
                    .select("id", "job_exploded")

# Show the first 10 rows without truncating columns
df_exploded.show(10, truncate=False)

# Print the schema of the DataFrame
df_exploded.printSchema()

df_exploded.avg("job_exploded.salary")




+------------------------------------+-------------------------------------------------------------------------+
|id                                  |job_exploded                                                             |
+------------------------------------+-------------------------------------------------------------------------+
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2017-09-23, NULL, dentist, Brisbane, 67000}                             |
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2017-02-04, 2017-09-04, business analyst, Brisbane, 62000}              |
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2013-06-23, 2017-01-23, hr advisor, Brisbane, 58000}                    |
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2008-07-01, 2013-06-01, trimmer, Brisbane, 56000}                       |
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2004-01-25, 2008-06-25, business analyst, Brisbane, 52000}              |
|6ff74c06-5a0f-4d11-9361-32d1fe57c95d|{2001-09-13, 2004-01-13, medical radiation technologist, B

In [97]:
avg_salary_per_id = df_exploded.groupBy("id") \
                                .agg(F.avg("job_exploded.salary").alias("avg_salary")).show(3)
avg_salary = df_exploded.agg(F.avg("job_exploded.salary").alias("avg_salary")).show()
                    

+--------------------+------------------+
|                  id|        avg_salary|
+--------------------+------------------+
|c7abcf0b-a649-404...|          140000.0|
|ec497bd4-ac2b-4a0...|          152000.0|
|f91c071b-5f15-436...|109888.88888888889|
+--------------------+------------------+
only showing top 3 rows

+-----------------+
|       avg_salary|
+-----------------+
|97461.87312420631|
+-----------------+



## Q6 On average, what are the top 5 paying jobs? Bottom 5 paying jobs? If there is a tie, please order by title, location

In [None]:
top5_job 

In [119]:
from pyspark.sql.functions import col, explode, avg, desc

df_user.withColumn("job_exploded", explode("profile.jobHistory")) \
       .withColumn("salary", col("job_exploded.salary")) \
       .withColumn("title", col("job_exploded.title")) \
       .withColumn("location", col("job_exploded.location")) \
       .select("id", "title", "salary","location") \
       .groupby("title", "location") \
       .agg(avg("salary").alias("mean_salary")) \
       .withColumn("mean_salary", col("mean_salary").cast("int")) \
       .sort(desc("mean_salary")) \
       .show(5)

        



+--------------------+---------+-----------+
|               title| location|mean_salary|
+--------------------+---------+-----------+
|procurement speci...|Melbourne|      99246|
|financial counsellor|Melbourne|      99161|
|safety superinten...|   Hobart|      99085|
|             trimmer| Brisbane|      99022|
|admin support off...|    Perth|      98975|
+--------------------+---------+-----------+
only showing top 5 rows



## Q7 Who is currently making the most money

In [141]:
df_user.printSchema()
df_user.withColumn("job_explode",explode("profile.jobHistory"))\
       .withColumn("toDate",col("job_explode.toDate"))\
       .withColumn("salary",col("job_explode.salary"))\
       .withColumn("lastName",col("profile.lastName"))\
       .filter(col("toDate").isNull())\
       .sort(desc("salary"),desc('lastName')).show(100)



root
 |-- id: string (nullable = true)
 |-- profile: struct (nullable = false)
 |    |-- firstName: string (nullable = true)
 |    |-- jobHistory: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- fromDate: date (nullable = true)
 |    |    |    |-- toDate: date (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |    |-- salary: long (nullable = true)
 |    |-- lastName: string (nullable = true)

+--------------------+--------------------+--------------------+------+------+------------+
|                  id|             profile|         job_explode|toDate|salary|    lastName|
+--------------------+--------------------+--------------------+------+------+------------+
|a57f3565-2c1f-45f...|{Ronda, [{2013-05...|{2013-05-23, NULL...|  NULL|159000|     Zuidema|
|7cd9a4e6-fe1a-4f1...|{Lori, [{2018-11-...|{2018-11-23, NULL...|  NULL|159000|     Zortman|
|5b275c

In [142]:
def q7_most_money(df):
    df.filter(F.udf(lambda x: any([y.toDate is None for y in x]), 'boolean')('profile.jobHistory'))\
        .withColumn('col2', F.udf(lambda x: [y for y in x if y.toDate == None][0], 'struct<fromDate:string,location:string,salary:long,title:string,toDate:string>')('profile.jobHistory'))\
            .select('id','profile.lastName','col2.fromDate','col2.salary')\
                .sort(desc('salary'),desc('lastName'))\
                    .show(100,truncate=False)
q7_most_money(df_user)


+------------------------------------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id                                  |lastName   |fromDate                                                                                                                                     


## Q8 What was the most popular job title started in 2019

In [145]:
df_user.withColumn("job_explode",explode("profile.jobHistory"))\
    .withColumn("fromDate",col("job_explode.fromDate"))\
    .filter(col("fromDate").contains("2019"))\
    .withColumn("title",col("job_explode.title"))\
        .groupby("title")\
            .agg(count("title").alias("num_title"))\
                .sort(desc("num_title")).show(20)



+--------------------+---------+
|               title|num_title|
+--------------------+---------+
|sales representative|      197|
|admin support off...|      189|
|           paralegal|      185|
|registration officer|      184|
|  enrolments officer|      184|
|     counter manager|      179|
|     project manager|      179|
|     physiotherapist|      177|
|       sales manager|      175|
|  pharmacy assistant|      174|
|   asp.net developer|      172|
|          specialist|      170|
|clinical psycholo...|      170|
|safety superinten...|      170|
|customer service ...|      168|
|Warehouse Storepe...|      167|
|           assembler|      167|
|corporate consultant|      165|
|Administration Of...|      165|
|  Multi Site Manager|      164|
+--------------------+---------+
only showing top 20 rows



In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import desc
from datetime import datetime

def q8_popular_job_2019(df):
    # Define a UDF to filter jobHistory for entries starting from 2019
    filter_jobs_from_2019 = F.udf(lambda x: [y for y in x if datetime.strptime(y.fromDate, '%Y-%m-%d').date() >= datetime(2019, 1, 1).date()],
                                   'array<struct<fromDate:string,location:string,salary:long,title:string,toDate:string>>')
    
    # Apply UDF to create col2 containing job entries from 2019 onwards
    df_with_col2 = df.withColumn('col2', filter_jobs_from_2019('profile.jobHistory'))
    
    # Select id and col2, explode col2 to get individual job entries, select title, group by title and count occurrences
    popular_jobs_2019 = df_with_col2.select('id', 'col2') \
                                    .withColumn("job_exploded", F.explode("col2")) \
                                    .select("job_exploded.title") \
                                    .groupby('title').count() \
                                    .sort(desc('count'))
    
    # Show the results
    popular_jobs_2019.show()

# Example usage:
q8_popular_job_2019(df_user)


## Q10 For each person, list only their latest job. Display the first 10 results, ordered by lastName descending, firstName ascending order

dense_rank()函数也为每行返回一个唯一的整数，表示该行在分区中的顺序。与row_number()不同的是，当有多行具有相同的排序值时，它们会被分配相同的排名（排名连续），但下一个不同的值会跳过这些重复的排名。
row_number(),没有相同值。

Window.partitionBy("id")用于创建一个窗口规范，定义如何对数据进行分区。它通常与窗口函数（如row_number、rank、dense_rank、sum等）一起使用。窗口函数在每个分区内计算，并且不会像groupby那样汇总数据，而是保留每一行数据。

In [153]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy("id").orderBy(desc("job_explode.fromDate"))

df_user.withColumn("job_explode",explode("profile.jobHistory"))\
    .withColumn("rank",dense_rank().over(window_spec))\
        .filter(col("rank")==1)\
            .show(10)

        

+--------------------+--------------------+--------------------+----+
|                  id|             profile|         job_explode|rank|
+--------------------+--------------------+--------------------+----+
|0000b622-2709-4ef...|{Mark, [{2018-01-...|{2018-01-23, NULL...|   1|
|0000efb1-9edc-45f...|{Ramon, [{2016-07...|{2016-07-11, 2019...|   1|
|00064571-86c8-415...|{Evangeline, [{20...|{2014-02-12, 2019...|   1|
|000660fb-6374-49d...|{Samuel, [{2018-0...|{2018-04-22, 2019...|   1|
|00082074-b7a0-4e0...|{Lloyd, [{2018-06...|{2018-06-23, NULL...|   1|
|000b9a49-a7fa-421...|{James, [{2018-02...|{2018-02-23, NULL...|   1|
|000e61a1-1504-4be...|{Ken, [{2015-10-2...|{2015-10-23, NULL...|   1|
|0014cd52-9750-404...|{Harold, [{2016-0...|{2016-08-16, 2019...|   1|
|0019d673-b1c0-4a1...|{Craig, [{2014-06...|{2014-06-22, 2019...|   1|
|001b6454-d1ae-407...|{Lois, [{2017-11-...|{2017-11-23, NULL...|   1|
+--------------------+--------------------+--------------------+----+
only showing top 10 

## Q11  For each person, list their highest paying job along with their first name, last name, salary and the year they made this salary. Store the results in a dataframe, and then print out 10 results

In [165]:
from datetime import date
from pyspark.sql.window import Window
window_spec = Window.partitionBy("id").orderBy(desc("job_explode.salary"))
current_date_value = date.today()
df_user.withColumn("job_explode",explode("profile.jobHistory"))\
    .withColumn("rank",dense_rank().over(window_spec))\
        .filter(col("rank") == 1)\
       .withColumn("toDate", to_date(col("job_explode.toDate"), 'yyyy-MM-dd')) \
       .withColumn("year", year(col("toDate")))\
       .withColumn("year", when(col("year").isNull(),year(lit(current_date_value))).otherwise(col("year"))).show(10) 

+--------------------+--------------------+--------------------+----+----------+----+
|                  id|             profile|         job_explode|rank|    toDate|year|
+--------------------+--------------------+--------------------+----+----------+----+
|0000b622-2709-4ef...|{Mark, [{2018-01-...|{2018-01-23, NULL...|   1|      NULL|2024|
|0000efb1-9edc-45f...|{Ramon, [{2016-07...|{2016-07-11, 2019...|   1|2019-04-11|2019|
|00064571-86c8-415...|{Evangeline, [{20...|{2014-02-12, 2019...|   1|2019-04-12|2019|
|000660fb-6374-49d...|{Samuel, [{2018-0...|{2018-04-22, 2019...|   1|2019-04-22|2019|
|00082074-b7a0-4e0...|{Lloyd, [{2018-06...|{2018-06-23, NULL...|   1|      NULL|2024|
|000b9a49-a7fa-421...|{James, [{2018-02...|{2018-02-23, NULL...|   1|      NULL|2024|
|000e61a1-1504-4be...|{Ken, [{2015-10-2...|{2015-10-23, NULL...|   1|      NULL|2024|
|0014cd52-9750-404...|{Harold, [{2016-0...|{2016-08-16, 2019...|   1|2019-02-16|2019|
|0019d673-b1c0-4a1...|{Craig, [{2014-06...|{2014-06-22