In [0]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder\
        .master("local[3]")\
        .appName("Demo")\
        .getOrCreate()

In [0]:
data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"), 
             ("John", "12", "12", "6"), 
             ("Rosy", "7", "8", "63"), 
             ("Abdul", "23", "5", "81") 
            ]

### creating Dataframe from a python list


In [0]:
data=spark.createDataFrame(data_list)

In [0]:
data.show()

+-----+---+---+----+
|   _1| _2| _3|  _4|
+-----+---+---+----+
| Ravi| 28|  1|2002|
|Abdul| 23|  5|  81|
| John| 12| 12|   6|
| Rosy|  7|  8|  63|
|Abdul| 23|  5|  81|
+-----+---+---+----+



##### In the above row we don't have columns/ To add columns :


In [0]:
dataframe=data.toDF("name", "day", "month", "year").repartition(2)

In [0]:
dataframe.show()

+-----+---+-----+----+
| name|day|month|year|
+-----+---+-----+----+
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [0]:
dataframe.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [0]:
df=dataframe.withColumn("Id", monotonically_increasing_id())

In [0]:
df.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         Id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|  81| 8589934592|
| John| 12|   12|   6|17179869184|
| Rosy|  7|    8|  63|25769803776|
|Abdul| 23|    5|  81|25769803777|
+-----+---+-----+----+-----------+



In [0]:
year_df=df.withColumn("year", expr("""
        case when year < 21 then year + 2000
        when year < 100 then year + 1900
        else year
        end"""))

In [0]:
year_df.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         Id|
+-----+---+-----+------+-----------+
| Ravi| 28|    1|  2002|          0|
|Abdul| 23|    5|1981.0| 8589934592|
| John| 12|   12|2006.0|17179869184|
| Rosy|  7|    8|1963.0|25769803776|
|Abdul| 23|    5|1981.0|25769803777|
+-----+---+-----+------+-----------+



###### as we have decimal values in the year column / Casting the column resolves the issues.
There are two types to change :
1. Cast ("col" as datatype) - in this case only values get changed to int but the schema remains as stringtype
2. df.withColumn("col", "col2").cast(IntegerType()) - in the both the values and the schema changes to integer.

In [0]:
year_df=df.withColumn("year", expr("""
        case when year < 21 then cast(year as int) + 2000
        when year < 100 then cast(year as int) + 1900
        else year
        end"""))

In [0]:
year_df.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         Id|
+-----+---+-----+----+-----------+
| Ravi| 28|    1|2002|          0|
|Abdul| 23|    5|1981| 8589934592|
| John| 12|   12|2006|17179869184|
| Rosy|  7|    8|1963|25769803776|
|Abdul| 23|    5|1981|25769803777|
+-----+---+-----+----+-----------+



In [0]:
schema_year_df=df.withColumn("year", expr("""
                case when year < 21 then year + 2000
                when year < 100 then year + 1900
                else year
end""").cast(IntegerType()))

In [0]:
schema_year_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- Id: long (nullable = false)



#####inorder to cast both values and schema for a specific column

In [0]:
specific_df_col=df.withColumn("day", col("day").cast(IntegerType()))\
  .withColumn("month", col("month").cast(IntegerType()))\
  .withColumn("year", col("year").cast(IntegerType()))

##### when  and otherwise functions

In [0]:
when_func_df=specific_df_col.withColumn("year", \
                    when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100, col("year") + 1900) \
                    .otherwise(col("year")))
                           

##### concat day/ Month/ Year

In [0]:
dob_df=when_func_df.withColumn("dob",expr("to_date(concat(day,'/',month,'/',year),'d/m/y')"))
dob_df.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         Id|       dob|
+-----+---+-----+----+-----------+----------+
| Ravi| 28|    1|2002|          0|2002-01-28|
|Abdul| 23|    5|1981| 8589934592|1981-01-23|
| John| 12|   12|2006|17179869184|2006-01-12|
| Rosy|  7|    8|1963|25769803776|1963-01-07|
|Abdul| 23|    5|1981|25769803777|1981-01-23|
+-----+---+-----+----+-----------+----------+



In [0]:
final_df=dob_df.drop("day","month","year")\
    .drop_duplicates(["name","dob"])

In [0]:
final_df.show()

+-----+-----------+----------+
| name|         Id|       dob|
+-----+-----------+----------+
| Ravi|          0|2002-01-28|
|Abdul| 8589934592|1981-01-23|
| John|17179869184|2006-01-12|
| Rosy|25769803776|1963-01-07|
+-----+-----------+----------+

