In [0]:
#Create dataframe
df=spark.read.format("csv").load("dbfs:/FileStore/tables/emp.csv",inferSchema=True,header=True)

In [0]:
df.show(3)

+-----+-----+--------+----+----------+----+----+------+------------+----+-----+
|EMPNO|ENAME|     JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|YEAR|MONTH|
+-----+-----+--------+----+----------+----+----+------+------------+----+-----+
| 7369|SMITH|   CLERK|7902|1980-12-17| 800|null|    20|  2022-01-01|1980|   12|
| 7499|ALLEN|SALESMAN|7698|1981-02-20|1600| 300|    30|  2022-01-01|1981|   02|
| 7521| WARD|SALESMAN|7698|1981-02-22|1250| 500|    30|  2022-01-01|1981|   02|
+-----+-----+--------+----+----------+----+----+------+------------+----+-----+
only showing top 3 rows



In [0]:
#to_date function - To convert to date
#Spark date format: yyyy-MM-dd

from pyspark.sql.functions import to_date

df=df.withColumn("HIREDATE",to_date("HIREDATE",'dd-MM-yyyy')).fillna({"HIREDATE":'2023-07-31'})
df.display()
df.printSchema()

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,UPDATED_DATE
7369.0,SMITH,CLERK,7902.0,1980-12-17,800.0,,20.0,2022-01-01
7499.0,ALLEN,SALESMAN,7698.0,1981-02-20,1600.0,300.0,30.0,2022-01-01
7521.0,WARD,SALESMAN,7698.0,1981-02-22,1250.0,500.0,30.0,2022-01-01
7566.0,JONES,MANAGER,7839.0,1981-02-04,2975.0,,20.0,2022-01-05
7654.0,MARTIN,SALESMAN,7698.0,1981-09-21,1250.0,1400.0,30.0,2022-01-03
7698.0,SGR,MANAGER,7839.0,1981-01-05,2850.0,,30.0,2022-01-04
7782.0,RAVI,MANAGER,7839.0,1981-09-06,2450.0,,10.0,2022-01-02
7788.0,SCOTT,ANALYST,7566.0,1987-04-19,3000.0,,20.0,2022-01-02
7839.0,KING,PRESIDENT,,1981-11-01,5000.0,,10.0,2022-01-02
7844.0,TURNER,SALESMAN,7698.0,1981-08-09,1500.0,0.0,30.0,2022-01-02


root
 |-- EMPNO: string (nullable = true)
 |-- ENAME: string (nullable = true)
 |-- JOB: string (nullable = true)
 |-- MGR: string (nullable = true)
 |-- HIREDATE: date (nullable = true)
 |-- SAL: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- DEPTNO: string (nullable = true)
 |-- UPDATED_DATE: string (nullable = true)



In [0]:
#Extract year and Month from date
from pyspark.sql.functions import date_format
df=df.withColumn("YEAR",date_format("HIREDATE",'yyyy')).withColumn("MONTH",date_format("HIREDATE",'MM'))
df.show()

+-----+------+---------+----+----------+----+----+------+------------+----+-----+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|YEAR|MONTH|
+-----+------+---------+----+----------+----+----+------+------------+----+-----+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|  2022-01-01|1980|   12|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|  2022-01-01|1981|   02|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|  2022-01-01|1981|   02|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|  2022-01-05|1981|   02|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|  2022-01-03|1981|   09|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|  2022-01-04|1981|   01|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|  2022-01-02|1981|   09|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|  2022-01-02|1987|   04|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|  2022-01-02|1981|   11|
| 7844|TURNER| S

In [0]:
#Create delta table from df using partition Column with emp_part
#mode "overwrite," which means that if the "emp_part" Delta Lake table already exists, 
#it will be overwritten with the new data. If the table does not exist, it will be created.
df.write.format("delta").partitionBy("YEAR","MONTH").mode("overwrite").saveAsTable("emp_part")

In [0]:
%fs ls /user/hive/warehouse/emp_part/YEAR=1980/MONTH=12/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/emp_part/YEAR=1980/MONTH=12/part-00000-a2d0171b-1ccb-4410-bd1d-dafd07609b72.c000.snappy.parquet,part-00000-a2d0171b-1ccb-4410-bd1d-dafd07609b72.c000.snappy.parquet,2779,1690771815000


In [0]:
%sql

explain select * from emp_part where year=1980

--If you used year=1980 then spark will cast year into integer so make sure we are passing value in right way.
--Instead used year='1980'


plan
"== Physical Plan == *(1) ColumnarToRow +- FileScan parquet spark_catalog.default.emp_part[EMPNO#3506,ENAME#3507,JOB#3508,MGR#3509,HIREDATE#3510,SAL#3511,COMM#3512,DEPTNO#3513,UPDATED_DATE#3514,YEAR#3515,MONTH#3516] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/emp_part], PartitionFilters: [isnotnull(YEAR#3515), (cast(YEAR#3515 as int) = 1980)], PushedFilters: [], ReadSchema: struct"
