In [0]:
%sql
Drop table incremental_load_mappings

####IncrementalLoad CsvtoSQLdb-SingleTablesUsingWatermark####

####Creating Watermark table####

In [0]:

%sql

CREATE TABLE Incremental_Load_Mappings(
  TableName STRING,
  WaterMarkColumn STRING,
  WaterMarkValue DATE
)USING delta


####Updating the Watermark Value with default date and tables#### 

In [0]:
%sql
INSERT INTO Incremental_Load_Mappings (TableName, WaterMarkColumn, WaterMarkValue)
VALUES
('dev.club_db.sales_delta', 'SalesDate', '2015-01-01');

SELECT * FROM Incremental_Load_Mappings;

####Creating target table delta####

In [0]:
%sql
CREATE TABLE dev.club_db.sales_delta (
  SalesDate DATE,
  Country STRING,
  Product STRING,
  SalesAmount INT
)
USING DELTA;

####Fetching the max date from watermark table assinging to dataframe####

In [0]:
maxDate = spark.table("Incremental_Load_Mappings") \
    .select("WaterMarkValue") \
    .where("TableName = 'dev.club_db.sales_delta'") \
    .collect()

maxDate[0]["WaterMarkValue"]


#### Reading the CSV file from git API and converting into dataframe####

In [0]:
import requests

url = "https://raw.githubusercontent.com/esathya888-wq/delta_incremental_load_assets/refs/heads/main/Sales2.csv"

response = requests.get(url)
response.raise_for_status()

dbutils.fs.put(
    "/Volumes/dev/club_db/data/gitSales.csv",
    response.text,
    overwrite=True
)

df = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "true")
         .csv("/Volumes/dev/club_db/data/gitSales.csv")
)

display(df)


In [0]:
# From DBFS valume
 #df = (spark.read.format("csv")
 #     .option("header", "true")
 #     .option("inferSchema", "true")
 #     .load("/Volumes/dev/club_db/data/Sales2.csv")
 #   )

#display(df)


In [0]:
delta_load = df.filter(df.SalesDate > maxDate[0]["WaterMarkValue"])
display(delta_load)


#####Loaded DF is saving as table using delta append feature #####

In [0]:
delta_load.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("dev.club_db.sales_delta")


In [0]:
%sql
select * from dev.club_db.sales_delta limit 10


#####Fetching new watermark value from target table and updating to watermark table####

In [0]:
import pyspark.sql.functions as F


In [0]:
New_WaterMark_Value = delta_load.withColumn(
    "SalesDate", F.col("SalesDate").cast("date")
).agg(F.max("SalesDate")).collect()[0][0]

print(New_WaterMark_Value)


In [0]:
query = f"""
UPDATE incremental_load_mappings
SET WaterMarkValue = CAST('{New_WaterMark_Value}' AS DATE)
WHERE TableName = 'dev.club_db.sales_delta'
"""


In [0]:

spark.sql(query)

In [0]:
%sql
select * from incremental_load_mappings

####Unity catalog Practise 

In [0]:
%sql 
grant select on table mysql_catalog.olistproject_detailnomy.olist_order_payments to `Hr_group`;

In [0]:
%sql
grant use on catalog mysql_catalog to `esathya8888@gmail.com`;


In [0]:
%sql
use catalog mysql_catalog

In [0]:
%sql
select current_user()

In [0]:
%sql
use catalog main;
use schema default;
create function order_id_mask(order_id string)
  return case when is_member('esathya8888@gmail.com' ) then order_id else '***##***' end;

In [0]:
%sql
select * from mysql_catalog.olistproject_detailnomy.olist_order_payments limit 10