In [2]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *


In [3]:
spark = SparkSession.builder \
            .master('local[*]') \
            .appName('testing_misc_functions') \
            .getOrCreate()

In [7]:
data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"), # 1981
             ("John", "12", "12", "6"), # 2006
             ("Rosy", "7", "8", "63"), # 1963
             ("Abdul", "23", "5", "81") # 1981
            ]

raw_df = spark.createDataFrame(data_list, ['name', 'day', 'month', 'year']).repartition(3)
raw_df.show(10)
raw_df.printSchema()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
+-----+---+-----+----+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [16]:
df_with_id = raw_df.withColumn('id', monotonically_increasing_id()) \
                    .withColumn('year',col('year').cast('int')) \
                    .withColumn('day', col('day').cast('int')) \
                    .withColumn('month', col('month').cast('int'))
df_with_id.show()
df_with_id.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|   6|          0|
| Rosy|  7|    8|  63|          1|
|Abdul| 23|    5|  81|          2|
| Ravi| 28|    1|2002|17179869184|
|Abdul| 23|    5|  81|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [None]:
#implementing case statement using SQL expressions
df_with_id.withColumn('year', \
                        expr('''
                                case when year < 21 then year + 2000
                                    when year < 100 then year + 1900
                                    else year
                                end
                            ''')
                    ).show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Rosy|  7|    8|1963|          1|
|Abdul| 23|    5|1981|          2|
| Ravi| 28|    1|2002|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [24]:
#implementing case statement using column object expressions
df_with_year = df_with_id.withColumn('year', \
                        when(col('year') < 21, col('year') + 2000). \
                        when(col('year') < 100, col('year') + 1900). \
                        otherwise(col('year'))
                    )

df_with_year.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Rosy|  7|    8|1963|          1|
|Abdul| 23|    5|1981|          2|
| Ravi| 28|    1|2002|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [45]:
df_with_dob = df_with_year.withColumn('dob', \
                                      expr("to_date(concat(day, '/', month, '/', year), 'd/M/yyyy')")) \
                            .drop('day', 'month', 'year') \
                            .dropDuplicates(['name', 'dob']) \
                            .orderBy(col('dob').desc()) 

#you can also use distinct but you can't specify the columns on which you want distinct 

#you are telling Spark how to interpret (parse) the input string, not how to display the output.
#for the format specification, you use date_format function
df_with_dob.show()
df_with_dob.printSchema()
# when there are multiple rows with same dob while sorting with dob in desc, then I want to sort using id in asc order
df_with_year.withColumn('dob', \
                        expr("to_date(concat(day, '/', month, '/', year), 'd/M/yyyy')")) \
            .drop('day', 'month', 'year') \
            .orderBy(col('dob').desc(), col('id').asc()) \
            .show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| John|          0|2006-12-12|
| Ravi|17179869184|2002-01-28|
|Abdul|          2|1981-05-23|
| Rosy|          1|1963-08-07|
+-----+-----------+----------+

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- dob: date (nullable = true)

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| John|          0|2006-12-12|
| Ravi|17179869184|2002-01-28|
|Abdul|          2|1981-05-23|
|Abdul|17179869185|1981-05-23|
| Rosy|          1|1963-08-07|
+-----+-----------+----------+



In [51]:
df_with_year.withColumn('dob', \
                        expr("to_date(concat(day, '/', month, '/', year), 'd/M/yyyy')")) \
            .drop('day', 'month', 'year') \
            .sort('dob', 'id', ascending = [False, True]) \
            .show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| John|          0|2006-12-12|
| Ravi|17179869184|2002-01-28|
|Abdul|          2|1981-05-23|
|Abdul|17179869185|1981-05-23|
| Rosy|          1|1963-08-07|
+-----+-----------+----------+

