## PySpark Dynamic Partition (Part 2)

In [0]:
dbutils.library.restartPython() # Removes Python state, but some libraries might not work without calling this command.dbutils.restartPython()

#### Load libraries

In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, StructField, ArrayType, MapType, DoubleType
from pyspark.sql.functions import lit, col, expr, when, sum, avg, max, min, mean, count, udf, explode, concat_ws

#### Create Spark session

In [0]:
spark = SparkSession.builder.appName('PySpark Dynamic Partitions').getOrCreate()

#### Create Dataframe

In [0]:
from datetime import datetime

data = [
  ('item1', 5, datetime.strptime('2021-06-15','%Y-%m-%d')),
  ('item2', 1, datetime.strptime('2021-06-20','%Y-%m-%d')),
  ('item8', 9, datetime.strptime('2021-06-20','%Y-%m-%d')),
  ('item3', 2, datetime.strptime('2021-06-20','%Y-%m-%d')),
  ('item1', 3, datetime.strptime('2021-07-05','%Y-%m-%d')),
  ('item3', 4, datetime.strptime('2021-07-25','%Y-%m-%d')),
  ('item2', 1, datetime.strptime('2021-07-30','%Y-%m-%d')),
  ('item4', 6, datetime.strptime('2021-08-01','%Y-%m-%d')),
  ('item2', 8, datetime.strptime('2021-08-01','%Y-%m-%d')),
  ('item5', 8, datetime.strptime('2021-08-03','%Y-%m-%d'))
]

schema = StructType([
  StructField('item', StringType(), True),
  StructField('quantity', IntegerType(), True),
  StructField('date', DateType(), True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show()

In [0]:
table_name = 'temp.partitions_testing'
table_path = f'/mnt/{table_name}'

#### Create a delta table

In [0]:
(df
.write
.format('delta')
.mode('overwrite')
.partitionBy('date')
.save(table_path))

In [0]:
%sh
ls -lah /dbfs/mnt/temp.partitions_testing/date=2021-06-20

In [0]:
%sh
ls -lah /dbfs/mnt/temp.partitions_testing/date=2021-07-05

#### Create a new dataset with data that should be added to table

In [0]:
from datetime import datetime

jul_data = [
  ('item1', 3, datetime.strptime('2021-07-05','%Y-%m-%d')),
  #('item3', 4, datetime.strptime('2021-07-25','%Y-%m-%d')), # remove
  ('item2', 7, datetime.strptime('2021-07-30','%Y-%m-%d')), # change quantity
  ('item5', 9, datetime.strptime('2021-07-16','%Y-%m-%d')) # new
]

schema = StructType([
  StructField('item', StringType(), True),
  StructField('quantity', IntegerType(), True),
  StructField('date', DateType(), True)
])

jul_df = spark.createDataFrame(data=jul_data, schema=schema)
jul_df.printSchema()
jul_df.show() 

In [0]:
spark.read.load(table_path).orderBy('date').show()

In [0]:
#dt = jul_df.agg(min('date').alias('st_dt'), max('date').alias('end_dt')).collect()
#st_dt = dt[0][0]
#end_dt = dt[0][1]
#
#print(f'{st_dt} - {end_dt}')
# query = f'(date >= "{st_dt}") and (date <= "{end_dt}")'

In [0]:
dt = jul_df.select('date').distinct().collect()
query = f'(date in {[datetime.strftime(d[0], "%Y-%m-%d") for d in dt]})'.replace('[','(').replace(']',')')
print(query)

In [0]:
(jul_df
.write
.format('delta')
.option('replaceWhere',query)
.partitionBy('date')
.mode('overwrite')
.save(table_path))

In [0]:
%sh
ls -lah /dbfs/mnt/temp.partitions_testing/date=2021-06-20

In [0]:
%sh
ls -lah /dbfs/mnt/temp.partitions_testing/date=2021-07-05

In [0]:
spark.read.load(table_path).orderBy('date').show()
# All rows for July are overwritten
# because the one removed falls in that interval

# with condition that is using dates from new dataframe
# we get same result as in parquet default behaviour with dynamuc

# the one we removed from 2021-07-16 is still there
# for item5 quantity is changed
# new row is added

In [0]:
%sh
ls -lah /dbfs/mnt/temp.partitions_testing

In [0]:
%sh
rm -rf /dbfs/mnt/

#### Same with Parquet

In [0]:
parquet_path = '/mnt/items'

In [0]:
(df
.write 
.mode('overwrite')
.partitionBy('date')
.parquet(parquet_path)
)

In [0]:
%sh
ls -lah /dbfs/mnt/items

In [0]:
spark.read.parquet(parquet_path).orderBy('date').show()

In [0]:
#spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

(jul_df
.write
.mode('overwrite')
.option('partitionOverwriteMode', 'dynamic')
.partitionBy('date')
.parquet(parquet_path))

In [0]:
spark.read.parquet(parquet_path).orderBy('date').show()
# the one we removed from 2021-07-16 is still there
# for item5 quantity is changed
# new row is added

In [0]:
%sh
rm -rf /dbfs/mnt/

#### The end of the notebook