### How to create year and month wise partitioning


#### Pyspark : Write dataframe partition by year/month/sub-directory


In [0]:
# Traditional database date format :- dd-mm-yyyy

# Spark date format :- yyyy-MM-dd

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Spark_df_1").getOrCreate()

In [0]:
df = spark.read.option("nullValue", "null").csv("dbfs:/FileStore/tables/emp.csv")
df.show()

+-----+------+---------+----+----------+----+----+------+------------+
|  _c0|   _c1|      _c2| _c3|       _c4| _c5| _c6|   _c7|         _c8|
+-----+------+---------+----+----------+----+----+------+------------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|
| 7369| SMITH|    CLERK|7902|17-12-1980| 800|null|    20|  2022-01-01|
| 7499| ALLEN| SALESMAN|7698|20-02-1981|1600| 300|    30|  2022-01-01|
| 7521|  WARD| SALESMAN|7698|22-02-1981|1250| 500|    30|  2022-01-01|
| 7566| JONES|  MANAGER|7839|04-02-1981|2975|null|    20|  2022-01-05|
| 7654|MARTIN| SALESMAN|7698|21-09-1981|1250|1400|    30|  2022-01-03|
| 7698|   SGR|  MANAGER|7839|05-01-1981|2850|null|    30|  2022-01-04|
| 7782|  RAVI|  MANAGER|7839|06-09-1981|2450|null|    10|  2022-01-02|
| 7788| SCOTT|  ANALYST|7566|19-04-1987|3000|null|    20|  2022-01-02|
| 7839|  KING|PRESIDENT|null|01-11-1981|5000|null|    10|  2022-01-02|
| 7844|TURNER| SALESMAN|7698|09-08-1981|1500|   0|    30|  2022-01-02|
| 7876

In [0]:
df = spark.read.format("csv")\
                .option("header","true")\
                .option("inferSchema", "True")\
                .option("nullValue","null")\
                .load("dbfs:/FileStore/tables/emp-1.csv")


df.show()

+-----+------+---------+----+----------+----+----+------+------------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|
+-----+------+---------+----+----------+----+----+------+------------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|  2022-01-01|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|  2022-01-01|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|  2022-01-01|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|  2022-01-05|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|  2022-01-03|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|  2022-01-04|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|  2022-01-02|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|  2022-01-02|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|  2022-01-02|
| 7844|TURNER| SALESMAN|7698|1981-08-09|1500|   0|    30|  2022-01-02|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|null|    20|  2022-01-03|
| 7900

In [0]:
df.printSchema()

root
 |-- EMPNO: integer (nullable = true)
 |-- ENAME: string (nullable = true)
 |-- JOB: string (nullable = true)
 |-- MGR: integer (nullable = true)
 |-- HIREDATE: date (nullable = true)
 |-- SAL: integer (nullable = true)
 |-- COMM: integer (nullable = true)
 |-- DEPTNO: integer (nullable = true)
 |-- UPDATED_DATE: date (nullable = true)



In [0]:
df.display()

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,UPDATED_DATE
7369.0,SMITH,CLERK,7902.0,1980-12-17,800.0,,20.0,2022-01-01
7499.0,ALLEN,SALESMAN,7698.0,1981-02-20,1600.0,300.0,30.0,2022-01-01
7521.0,WARD,SALESMAN,7698.0,1981-02-22,1250.0,500.0,30.0,2022-01-01
7566.0,JONES,MANAGER,7839.0,1981-02-04,2975.0,,20.0,2022-01-05
7654.0,MARTIN,SALESMAN,7698.0,1981-09-21,1250.0,1400.0,30.0,2022-01-03
7698.0,SGR,MANAGER,7839.0,1981-01-05,2850.0,,30.0,2022-01-04
7782.0,RAVI,MANAGER,7839.0,1981-09-06,2450.0,,10.0,2022-01-02
7788.0,SCOTT,ANALYST,7566.0,1987-04-19,3000.0,,20.0,2022-01-02
7839.0,KING,PRESIDENT,,1981-11-01,5000.0,,10.0,2022-01-02
7844.0,TURNER,SALESMAN,7698.0,1981-08-09,1500.0,0.0,30.0,2022-01-02


In [0]:
# Change dateformat with to_date function and fill with 9999-12-31 for empty values in hiredate column

from pyspark.sql.functions import to_date

df_emp = df.withColumn("HIREDATE", to_date("HIREDATE", "dd-MM-yyyy")).fillna({"HIREDATE":"9999-12-31"})
df_emp.show()

+-----+------+---------+----+----------+----+----+------+------------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|
+-----+------+---------+----+----------+----+----+------+------------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|  2022-01-01|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|  2022-01-01|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|  2022-01-01|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|  2022-01-05|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|  2022-01-03|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|  2022-01-04|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|  2022-01-02|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|  2022-01-02|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|  2022-01-02|
| 7844|TURNER| SALESMAN|7698|1981-08-09|1500|   0|    30|  2022-01-02|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|null|    20|  2022-01-03|
| 7900

In [0]:
# Now partition on the basis of the month for that use date_format function

from pyspark.sql.functions import date_format

df_emp = df_emp.withColumn("YEAR", date_format("HIREDATE", "yyyy")).withColumn("MONTH", date_format("HIREDATE","MM"))
df_emp.show()

+-----+------+---------+----+----------+----+----+------+------------+----+-----+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|YEAR|MONTH|
+-----+------+---------+----+----------+----+----+------+------------+----+-----+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|  2022-01-01|1980|   12|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|  2022-01-01|1981|   02|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|  2022-01-01|1981|   02|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|  2022-01-05|1981|   02|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|  2022-01-03|1981|   09|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|  2022-01-04|1981|   01|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|  2022-01-02|1981|   09|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|  2022-01-02|1987|   04|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|  2022-01-02|1981|   11|
| 7844|TURNER| S

In [0]:
# Now save the date using the partition by, use mode overwrite to make changes and save the table 

df_emp.write.format("delta").partitionBy("YEAR","MONTH").mode("overwrite").saveAsTable('emp_part_1')

In [0]:
%fs ls /user/hive/warehouse/emp_part_1/YEAR=1980/MONTH=12/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/emp_part_1/YEAR=1980/MONTH=12/part-00000-3bc39be1-773a-455b-a49f-3030d7a278d4.c000.snappy.parquet,part-00000-3bc39be1-773a-455b-a49f-3030d7a278d4.c000.snappy.parquet,2687,1711719308000


In [0]:
%sql

select * from emp_part_1 where year = 1980

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,UPDATED_DATE,YEAR,MONTH
7369,SMITH,CLERK,7902,1980-12-17,800,,20,2022-01-01,1980,12
7369,SMITH,CLERK,7902,1980-12-17,800,,20,2022-01-04,1980,12
