# Creating Schema Layers for Data Warehousing

First, let's create the layers for each of the stages. Don't be confused by names, it's similar to Medallion architecture.

The stage layer we create are:
* Landing - Where raw data is put, but still partitioned/optimized for querying for downstream layers
* Conformed - Where the cleaned and de-depulicated data is stored. Also, it will house only a subset of data that needs frequent querying
* Consumption - Where the views and other reporting tables are created if needed (materialized) exposed to dashboarding tools.

In [0]:
%sql

--Create Schema for Landing Layer
CREATE SCHEMA IF NOT EXISTS LANDING;

--Create Schema for Conformed Layer
CREATE SCHEMA IF NOT EXISTS CONFORMED;

--Create Schema for Consumption Layer
CREATE SCHEMA IF NOT EXISTS CONSUMPTION;


# Reading, Transforming and Storing data

In this section, the data is read from external locations to move in Databricks managed storage where it's transformed and moved to other layers.

## Read data from external location

Here we'll read the data from external location which we created previously. As you'll see, the number of partitions are same as number of CPU cores available. Also we are printing the size for each partition.

In [0]:
%python

file_path = 'gs://partition-store/output.csv'

# Read the data and set the logical partitions through maxByteSize
df = (spark.read
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .option("delimiter", ",")
 .load(file_path)
 )

print('[INFO] Number of partitions : ', df.rdd.getNumPartitions())
print('[INFO] Size Per Partition (MB) : ', int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace('b',''))/(1024*1024) )

df.printSchema()

[INFO] Number of partitions :  12
[INFO] Size Per Partition (MB) :  128.0
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- UserID: string (nullable = true)
 |-- Clicks: integer (nullable = true)



## Tranform Columns and Order by columns to optimize write operation

In [0]:
%python
from pyspark.sql.functions import col, concat, to_date, lit

new_cols = {i:i.lower() for i in list(df.columns)}

transformed_df = (
    df
    .withColumnsRenamed(new_cols)
    .withColumn("date", to_date(concat( col("year"), lit('-'), col("month"), lit('-'),col("day")) ))
    .drop("year", "month", "day")
    .orderBy("date", "userid", "clicks")
    )

transformed_df.printSchema()

root
 |-- userid: string (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- date: date (nullable = true)



## Writing the transformed data to Databricks 

Data is moved to managed storage as Delta table. Plus it's partition by date since ETL job will operated on daily append to the Delta table.

In [0]:
%python

clean_data = (transformed_df
 .select("date", "userid", "clicks")
 .write
 .format("delta")
 .partitionBy("date")
 .saveAsTable("landing.raw_clicks")
 )



## Read and check the optimization done to landing table

Here we read and check how many partitions are created in Delta store.

In [0]:
landing_df = spark.read.format('delta').table('landing.raw_clicks')
print('[INFO] Number of Partitions : ', landing_df.rdd.getNumPartitions())

[INFO] Number of Partitions :  22


In [0]:
from pyspark.sql.functions import min, max

landing_df \
    .select(min("date").alias("min_date"), max("date").alias("max_date")) \
    .show()

+----------+----------+
|  min_date|  max_date|
+----------+----------+
|2023-01-01|2024-12-31|
+----------+----------+



## Transform Landing data for Conformed

In [0]:
from pyspark.sql.functions import year, month

clean_landing_df = (landing_df
 .where(year("date") == 2024)
 .dropDuplicates()
 .withColumn("month", month("date"))
 .select("date", "userid", "clicks", "month")
 .orderBy("date", "userid", "clicks")
)

clean_landing_df.printSchema()

The data for 2024 is the cut-off year and the duplicates are dropped. Further an addition column for month is added which will be used to partition while stored in Delta format.

In [0]:
clean_landing_df \
    .write \
    .format("delta") \
    .partitionBy("month") \
    .saveAsTable("conformed.clicks_2024")

# Creating Views for Reporting

In [0]:
%sql

CREATE OR REPLACE VIEW consumption.clicks_user1
AS
SELECT 
* FROM conformed.clicks_2024
WHERE userid = 'user1';

CREATE OR REPLACE VIEW consumption.clicks_user2
AS 
SELECT
* FROM conformed.clicks_2024
WHERE userid = 'user2';

CREATE OR REPLACE VIEW consumption.clicks_user100
AS
SELECT
* FROM conformed.clicks_2024
WHERE userid = 'user100 ';