In [32]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when, expr, to_date
from pyspark.sql.types import *

In [3]:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Misc Demo") \
        .master("local[2]") \
        .getOrCreate()

In [4]:
data_list = [("Ravi", "28", "1", "2002"),
                 ("Abdul", "23", "5", "81"),  # 1981
                 ("John", "12", "12", "6"),  # 2006
                 ("Rosy", "7", "8", "63"),  # 1963
                 ("Abdul", "23", "5", "81")]  # 1981

In [6]:
columns=["name", "day", "month", "year"]
raw_df = spark.createDataFrame(data_list,columns)
raw_df.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [7]:
raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year").repartition(3)
raw_df.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| John| 12|   12|   6|
| Ravi| 28|    1|2002|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [8]:
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [10]:
df1=raw_df.withColumn("id",monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|   6|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|  63| 8589934593|
|Abdul| 23|    5|  81|17179869184|
|Abdul| 23|    5|  81|17179869185|
+-----+---+-----+----+-----------+



In [22]:
df2=df1.withColumn("year", expr("""
                                case when year < 21 then year + 2000
                                when year < 100 then year + 1900 
                                else year 
                                  end 
                               """))
df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         id|
+-----+---+-----+------+-----------+
| John| 12|   12|2006.0|          0|
| Ravi| 28|    1|  2002| 8589934592|
| Rosy|  7|    8|1963.0| 8589934593|
|Abdul| 23|    5|1981.0|17179869184|
|Abdul| 23|    5|1981.0|17179869185|
+-----+---+-----+------+-----------+



In [24]:
df3=df1.withColumn("year", expr("""
                                case when year < 21 then cast(year as int) + 2000
                                when year < 100 then cast(year as int) + 1900 
                                else year 
                                end 
                               """)) #Inline Cast
df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [25]:
df4=df1.withColumn("year", expr("""
                                case when year < 21 then year + 2000
                                when year < 100 then year + 1900 
                                else year 
                                  end 
                               """).cast(IntegerType())) #Change the Schema
df4.show()
df4.printSchema()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- id: long (nullable = false)



In [27]:
df5=df1.withColumn("day",col("day").cast(IntegerType()))\
       .withColumn("month",col("month").cast(IntegerType()))\
       .withColumn("year",col("year").cast(IntegerType()))

df6=df5.withColumn("year", expr("""
                                case when year < 21 then year + 2000
                                when year < 100 then year + 1900 
                                else year 
                                  end 
                               """))
df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [30]:
df7=df5.withColumn("year",\
                 when(col("year") < 21, col("year") + 2000)\
                 .when(col("year") < 100, col("year") + 1900)\
                 .otherwise(col("year")))
df7.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
| John| 12|   12|2006|          0|
| Ravi| 28|    1|2002| 8589934592|
| Rosy|  7|    8|1963| 8589934593|
|Abdul| 23|    5|1981|17179869184|
|Abdul| 23|    5|1981|17179869185|
+-----+---+-----+----+-----------+



In [39]:
df8=df7.withColumn("dob",to_date(expr("concat(day,'/',month,'/',year)"),'d/M/y'))\
       .drop("day","month","year")\
       .dropDuplicates(["name","dob"])\
       .sort(expr("dob desc"))
df8.show()

+-----+-----------+----------+
| name|         id|       dob|
+-----+-----------+----------+
| Rosy| 8589934593|1963-08-07|
|Abdul|17179869184|1981-05-23|
| Ravi| 8589934592|2002-01-28|
| John|          0|2006-12-12|
+-----+-----------+----------+

