In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### **Data Reading**

In [0]:
df = spark.read.format("parquet")\
  .load("abfss://bronze@databricksete80.dfs.core.windows.net/products")

In [0]:
df.display()

In [0]:
df = df.drop("_rescued_data")

In [0]:
df.createOrReplaceTempView("products")

### **Functions**

In [0]:
%sql
create or replace function databricks_cata.bronze.discount_func(p_price double)
returns double
language sql
return p_price * 0.90

In [0]:
%sql
select product_id, price, databricks_cata.bronze.discount_func(price) as discounted_price
from products

In [0]:
df = df.withColumn("discounted_price", expr("databricks_cata.bronze.discount_func(price)"))
df.display()

In [0]:
%sql
create or replace function databricks_cata.bronze.upper_func(p_brand string)
returns string
language python
as
$$

return p_brand.upper()


$$

In [0]:
%sql
select product_id, brand, databricks_cata.bronze.upper_func(brand) as brand_upper
from products

In [0]:
df.write.format("delta").mode("append").option("path","abfss://silver@databricksete80.dfs.core.windows.net/products").save()

In [0]:
%sql
create table if not exists databricks_cata.silver.products_silver
using delta
location 'abfss://silver@databricksete80.dfs.core.windows.net/products'