In [2]:
from pyspark.sql.types import StructType,StructField, IntegerType, StringType

# create Spark Session

spark = SparkSession.builder.appName('DateFrameOps').getOrCreate()

# sample data
data = [
    (1,'Alice',25),
    (2,'Bob',30),
    (3,'Charlie',35)
]

# define schema
schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('name', StringType(), False),
    StructField('age',IntegerType(), False)
])

# create DataFrame
df = spark.createDataFrame(data, schema)

# show data
df.show()

25/07/10 14:13:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



In [3]:
df.show(2)

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 25|
|  2|  Bob| 30|
+---+-----+---+
only showing top 2 rows



In [4]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)



In [5]:
df.describe().show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------+---+-------+----+
|summary| id|   name| age|
+-------+---+-------+----+
|  count|  3|      3|   3|
|   mean|2.0|   NULL|30.0|
| stddev|1.0|   NULL| 5.0|
|    min|  1|  Alice|  25|
|    max|  3|Charlie|  35|
+-------+---+-------+----+



                                                                                

In [6]:
# select and filtering data use cases

df.select('name','age').show()

+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [7]:
df.filter(df.age>25).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



In [8]:
df.where(df.name=='Alice').show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 25|
+---+-----+---+



In [9]:
df.distinct().show()



+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



                                                                                

In [10]:
# Sorting and Ordering
df.orderBy('age').show()



+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



                                                                                

In [11]:
df.orderBy(df.age.desc()).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 35|
|  2|    Bob| 30|
|  1|  Alice| 25|
+---+-------+---+





In [12]:
# adding or dropping columns

df.withColumn('new_age',df.age+5).show()

+---+-------+---+-------+
| id|   name|age|new_age|
+---+-------+---+-------+
|  1|  Alice| 25|     30|
|  2|    Bob| 30|     35|
|  3|Charlie| 35|     40|
+---+-------+---+-------+



In [13]:
# aggregation and grouping

df.groupBy('name').count().show()



+-------+-----+
|   name|count|
+-------+-----+
|Charlie|    1|
|    Bob|    1|
|  Alice|    1|
+-------+-----+



                                                                                

In [14]:
df.agg({'age':'avg'}).show()

+--------+
|avg(age)|
+--------+
|    30.0|
+--------+



In [15]:
data_2 = [(
1,'usa'),(2,'uk'),(3,'india')]


schema_2 = StructType(
    [StructField('id',IntegerType(),True),
    StructField('country',StringType(),True)]
)

In [16]:
df_2 = spark.createDataFrame(data_2,schema_2)

In [17]:
df.join(df_2,'id').show()

[Stage 27:>                                                         (0 + 2) / 2]

+---+-------+---+-------+
| id|   name|age|country|
+---+-------+---+-------+
|  1|  Alice| 25|    usa|
|  2|    Bob| 30|     uk|
|  3|Charlie| 35|  india|
+---+-------+---+-------+



[Stage 30:>                                                         (0 + 1) / 1]                                                                                

In [18]:
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 35|
+---+-------+---+



In [20]:
data = [
    (1,'john doe','Bangalore','2023-01-15','152.75','True'),
    (2,'jane smith','delhi','2023-05-20','89.50','False'),
    (3,'RObert Brown','Mumbai','invalidDate',None,'yes'),
    (4,'Lind White','Kolkata','2023-02-29','NaN','1'),
    (5,'Mike Green','Chennai','2023-08-29','Nan','1'),
    (6,'Sarah Blue','Hyderbad','InvalidDate','300.40','No')
]


Columns = ['id','name','city','date','amount','is_active']


df = spark.createDataFrame(data, schema = Columns)

# Show the DataFrame
df.show()

+---+------------+---------+-----------+------+---------+
| id|        name|     city|       date|amount|is_active|
+---+------------+---------+-----------+------+---------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|
|  2|  jane smith|    delhi| 2023-05-20| 89.50|    False|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|
|  5|  Mike Green|  Chennai| 2023-08-29|   Nan|        1|
|  6|  Sarah Blue| Hyderbad|InvalidDate|300.40|       No|
+---+------------+---------+-----------+------+---------+



In [21]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- is_active: string (nullable = true)



In [None]:
# handle Integer Column

In [22]:
df.id

Column<'id'>

In [23]:
df['id']

Column<'id'>

In [24]:
df.filter(df.id>3).show()

+---+----------+--------+-----------+------+---------+
| id|      name|    city|       date|amount|is_active|
+---+----------+--------+-----------+------+---------+
|  4|Lind White| Kolkata| 2023-02-29|   NaN|        1|
|  5|Mike Green| Chennai| 2023-08-29|   Nan|        1|
|  6|Sarah Blue|Hyderbad|InvalidDate|300.40|       No|
+---+----------+--------+-----------+------+---------+



In [25]:
df.withColumn('id_double',df.id*2).show()

+---+------------+---------+-----------+------+---------+---------+
| id|        name|     city|       date|amount|is_active|id_double|
+---+------------+---------+-----------+------+---------+---------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|        2|
|  2|  jane smith|    delhi| 2023-05-20| 89.50|    False|        4|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|        6|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|        8|
|  5|  Mike Green|  Chennai| 2023-08-29|   Nan|        1|       10|
|  6|  Sarah Blue| Hyderbad|InvalidDate|300.40|       No|       12|
+---+------------+---------+-----------+------+---------+---------+



In [30]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col


df = df.withColumn('id', col('id').cast(IntegerType()))

In [31]:
df.show()

+---+------------+---------+-----------+------+---------+
| id|        name|     city|       date|amount|is_active|
+---+------------+---------+-----------+------+---------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|
|  2|  jane smith|    delhi| 2023-05-20| 89.50|    False|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|
|  5|  Mike Green|  Chennai| 2023-08-29|   Nan|        1|
|  6|  Sarah Blue| Hyderbad|InvalidDate|300.40|       No|
+---+------------+---------+-----------+------+---------+



In [32]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- is_active: string (nullable = true)



In [33]:
# String Columns

from pyspark.sql.functions import *

df = df.withColumn('name_upper',upper(df.name))
df.show()

+---+------------+---------+-----------+------+---------+------------+
| id|        name|     city|       date|amount|is_active|  name_upper|
+---+------------+---------+-----------+------+---------+------------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|    JOHN DOE|
|  2|  jane smith|    delhi| 2023-05-20| 89.50|    False|  JANE SMITH|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|ROBERT BROWN|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|  LIND WHITE|
|  5|  Mike Green|  Chennai| 2023-08-29|   Nan|        1|  MIKE GREEN|
|  6|  Sarah Blue| Hyderbad|InvalidDate|300.40|       No|  SARAH BLUE|
+---+------------+---------+-----------+------+---------+------------+



In [35]:
# Lower case

df = df.withColumn('name_lower',lower(df.name))
df.show()

+---+------------+---------+-----------+------+---------+------------+------------+
| id|        name|     city|       date|amount|is_active|  name_upper|  name_lower|
+---+------------+---------+-----------+------+---------+------------+------------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|    JOHN DOE|    john doe|
|  2|  jane smith|    delhi| 2023-05-20| 89.50|    False|  JANE SMITH|  jane smith|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|ROBERT BROWN|robert brown|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|  LIND WHITE|  lind white|
|  5|  Mike Green|  Chennai| 2023-08-29|   Nan|        1|  MIKE GREEN|  mike green|
|  6|  Sarah Blue| Hyderbad|InvalidDate|300.40|       No|  SARAH BLUE|  sarah blue|
+---+------------+---------+-----------+------+---------+------------+------------+



In [37]:
df.filter(df.city.startswith('B')).show()

+---+--------+---------+----------+------+---------+----------+----------+
| id|    name|     city|      date|amount|is_active|name_upper|name_lower|
+---+--------+---------+----------+------+---------+----------+----------+
|  1|john doe|Bangalore|2023-01-15|152.75|     True|  JOHN DOE|  john doe|
+---+--------+---------+----------+------+---------+----------+----------+



In [38]:
df = df.withColumn('amount',col('amount').cast('float'))
df.printSchema()
df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- is_active: string (nullable = true)
 |-- name_upper: string (nullable = true)
 |-- name_lower: string (nullable = true)

+---+------------+---------+-----------+------+---------+------------+------------+
| id|        name|     city|       date|amount|is_active|  name_upper|  name_lower|
+---+------------+---------+-----------+------+---------+------------+------------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|    JOHN DOE|    john doe|
|  2|  jane smith|    delhi| 2023-05-20|  89.5|    False|  JANE SMITH|  jane smith|
|  3|RObert Brown|   Mumbai|invalidDate|  NULL|      yes|ROBERT BROWN|robert brown|
|  4|  Lind White|  Kolkata| 2023-02-29|   NaN|        1|  LIND WHITE|  lind white|
|  5|  Mike Green|  Chennai| 2023-08-29|   NaN|        1|  MIKE GREEN|  mike green|
|  6|  Sarah Blue

In [40]:
df_filled = df.fillna({'amount':0})
df_filled.show()

+---+------------+---------+-----------+------+---------+------------+------------+
| id|        name|     city|       date|amount|is_active|  name_upper|  name_lower|
+---+------------+---------+-----------+------+---------+------------+------------+
|  1|    john doe|Bangalore| 2023-01-15|152.75|     True|    JOHN DOE|    john doe|
|  2|  jane smith|    delhi| 2023-05-20|  89.5|    False|  JANE SMITH|  jane smith|
|  3|RObert Brown|   Mumbai|invalidDate|   0.0|      yes|ROBERT BROWN|robert brown|
|  4|  Lind White|  Kolkata| 2023-02-29|   0.0|        1|  LIND WHITE|  lind white|
|  5|  Mike Green|  Chennai| 2023-08-29|   0.0|        1|  MIKE GREEN|  mike green|
|  6|  Sarah Blue| Hyderbad|InvalidDate| 300.4|       No|  SARAH BLUE|  sarah blue|
+---+------------+---------+-----------+------+---------+------------+------------+



In [None]:
# Handle Date Column

In [41]:
from pyspark.sql import SparkSession


# Intialize Spark session
spark = SparkSession.builder \
.appName('Handling Dates in Pyspark') \
.getOrCreate()

25/07/10 16:14:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [43]:
# Sending the Data to hdfs via Notebook

# Create a Csv File
csv_data = '''id,date_iso,date_dmy,date_mdy,timestamp
1,2025-07-08,08/07/2025,07/08/2025,2025-07-08 14:30:00
2,2024-12-25,25/12/2024,12/25/2024,2024-12-25 09:00:00
3,2023-01-01,01/01/2023,01/01/2023,2023-01-01 00:00:00
4,2022-11-15,15/11/2022,11/15/2022,2022-11-15 18:45:00
'''

# Save the Csv file
with open('dates_data.csv','w') as f:
    f.write(csv_data)

In [44]:
!hadoop fs -put dates_data.csv /data/dates_data.csv

In [46]:
# DDl String for the schema

ddl_schema = '''
id INT,
date_iso STRING,
date_dmy STRING,
date_mdy STRING,
timestamp STRING'''


from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# read the csv file into a datafram
df_file = spark.read.option('header',True).schema(ddl_schema).csv('/data/dates_data.csv')

# Show the DataFrame
df_file.show(truncate = False)

[Stage 53:>                                                         (0 + 1) / 1]

+---+----------+----------+----------+-------------------+
|id |date_iso  |date_dmy  |date_mdy  |timestamp          |
+---+----------+----------+----------+-------------------+
|1  |2025-07-08|08/07/2025|07/08/2025|2025-07-08 14:30:00|
|2  |2024-12-25|25/12/2024|12/25/2024|2024-12-25 09:00:00|
|3  |2023-01-01|01/01/2023|01/01/2023|2023-01-01 00:00:00|
|4  |2022-11-15|15/11/2022|11/15/2022|2022-11-15 18:45:00|
+---+----------+----------+----------+-------------------+



                                                                                

In [47]:
df_file.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date_iso: string (nullable = true)
 |-- date_dmy: string (nullable = true)
 |-- date_mdy: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [49]:
data = [
    (1,'2025-07-08','08/07/2025','07/08/2025','2025-07-08 14:30:00'),
    (2,'2024-12-25','25/12/2024','12/25/2024','2024-12-25 09:00:00'),
    (3,'2023-01-01','01/01/2023','01/01/2023','2023-01-01 00:00:00'),
    (4,'2022-11-15','15/11/2022','11/15/2022','2022-11-15 18:45:00')
]

columns = ['id','date_iso','date_dmy','date_mdy','timestamp']

# create DataFrame
df = spark.createDataFrame(data, schema = columns)

# show the DataFrame
df.show(truncate=False)

+---+----------+----------+----------+-------------------+
|id |date_iso  |date_dmy  |date_mdy  |timestamp          |
+---+----------+----------+----------+-------------------+
|1  |2025-07-08|08/07/2025|07/08/2025|2025-07-08 14:30:00|
|2  |2024-12-25|25/12/2024|12/25/2024|2024-12-25 09:00:00|
|3  |2023-01-01|01/01/2023|01/01/2023|2023-01-01 00:00:00|
|4  |2022-11-15|15/11/2022|11/15/2022|2022-11-15 18:45:00|
+---+----------+----------+----------+-------------------+



In [50]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- date_iso: string (nullable = true)
 |-- date_dmy: string (nullable = true)
 |-- date_mdy: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [52]:
from pyspark.sql.functions import to_date

df = df\
.withColumn('parsed_date_iso',to_date(df.date_iso, 'yyyy-MM-dd'))\
.withColumn('parsed_date_dmy',to_date(df.date_dmy, 'dd/MM/yyyy'))\
.withColumn('parsed_date_mdy',to_date(df.date_mdy,'MM/dd/yyyy'))

In [53]:
df.show(truncate=False)

+---+----------+----------+----------+-------------------+---------------+---------------+---------------+
|id |date_iso  |date_dmy  |date_mdy  |timestamp          |parsed_date_iso|parsed_date_dmy|parsed_date_mdy|
+---+----------+----------+----------+-------------------+---------------+---------------+---------------+
|1  |2025-07-08|08/07/2025|07/08/2025|2025-07-08 14:30:00|2025-07-08     |2025-07-08     |2025-07-08     |
|2  |2024-12-25|25/12/2024|12/25/2024|2024-12-25 09:00:00|2024-12-25     |2024-12-25     |2024-12-25     |
|3  |2023-01-01|01/01/2023|01/01/2023|2023-01-01 00:00:00|2023-01-01     |2023-01-01     |2023-01-01     |
|4  |2022-11-15|15/11/2022|11/15/2022|2022-11-15 18:45:00|2022-11-15     |2022-11-15     |2022-11-15     |
+---+----------+----------+----------+-------------------+---------------+---------------+---------------+



In [54]:
# Timestamp

from pyspark.sql.functions import to_timestamp, year, month, dayofmonth, hour, minute

df = df.withColumn('parsed_timestamp',to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---+----------+----------+----------+-------------------+---------------+---------------+---------------+-------------------+
| id|  date_iso|  date_dmy|  date_mdy|          timestamp|parsed_date_iso|parsed_date_dmy|parsed_date_mdy|   parsed_timestamp|
+---+----------+----------+----------+-------------------+---------------+---------------+---------------+-------------------+
|  1|2025-07-08|08/07/2025|07/08/2025|2025-07-08 14:30:00|     2025-07-08|     2025-07-08|     2025-07-08|2025-07-08 14:30:00|
|  2|2024-12-25|25/12/2024|12/25/2024|2024-12-25 09:00:00|     2024-12-25|     2024-12-25|     2024-12-25|2024-12-25 09:00:00|
|  3|2023-01-01|01/01/2023|01/01/2023|2023-01-01 00:00:00|     2023-01-01|     2023-01-01|     2023-01-01|2023-01-01 00:00:00|
|  4|2022-11-15|15/11/2022|11/15/2022|2022-11-15 18:45:00|     2022-11-15|     2022-11-15|     2022-11-15|2022-11-15 18:45:00|
+---+----------+----------+----------+-------------------+---------------+---------------+---------------+-----

In [57]:
from pyspark.sql.functions import datediff

df = df.withColumn('days_difference', datediff(df.parsed_date_mdy,df.parsed_date_iso))
df.select('parsed_date_mdy','parsed_date_iso','days_difference').show(truncate = False)

+---------------+---------------+---------------+
|parsed_date_mdy|parsed_date_iso|days_difference|
+---------------+---------------+---------------+
|2025-07-08     |2025-07-08     |0              |
|2024-12-25     |2024-12-25     |0              |
|2023-01-01     |2023-01-01     |0              |
|2022-11-15     |2022-11-15     |0              |
+---------------+---------------+---------------+



In [58]:
spark.stop()

25/07/10 17:20:23 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row.
java.io.InterruptedIOException: Call interrupted
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1557) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.Client.call(Client.java:1509) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.Client.call(Client.java:1406) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139) ~[hadoop-client-api-3.3.6.jar:?]
	at com.sun.proxy.$Proxy40.allocate(Unknown Source) ~[?:?]
	at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:78) ~[hadoop-client-api-3.3.6.jar:?]
	at jdk.internal.reflect.GeneratedMethodAccessor34.invoke(Unknown Source) ~[?:?]
	at java.base/jdk.internal.reflec