In [0]:
library(readr)
library(fs)
library(sparklyr)
library(dplyr)
library(glue)
library(tidyr)
library(DBI)

### Step 1. Load data

Use Auto Loader to Load all .csv into delta table. We put this table as our bronze layer.

In [0]:
# Create checkpoint location in Volume
dbutils.fs.mkdirs("/Volumes/dev/bronze/raw_data/checkpoint/autoloader")

In [0]:
 %python

 # Read files using Auto Loader with checkpoint
 # and schema location "/Volumes/dev/bronze/raw_data/checkpoint/autoloader"
 #.option("cloudFiles.schemaHints", "Quantity int, UnitPrice double")

df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("pathGlobFilter", "*.csv")
    .option("header", "true")
    .option("cloudFiles.schemaHints", "EvapA_Var1 int, EvapB_Var1 int, EvapC_Var1 int,EvapA_Var2 float, EvapB_Var2 float, EvapC_Var2 float, EvapA_State TinyInt, EvapB_State TinyInt, EvapC_State TinyInt")
    .option("cloudFiles.schemaLocation", "/Volumes/dev/bronze/raw_data/checkpoint/autoloader")
    .load("/Volumes/dev/bronze/raw_data/files/*")

)

In [0]:
%python

# Write data to delta table - dev.bronze.evap_raw_al

from pyspark.sql.functions import col
(
    df
    .withColumn("__file",col("_metadata.file_name"))
    .writeStream
    .option("checkpointLocation", "/Volumes/dev/bronze/raw_data/checkpoint/autoloader")
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("dev.bronze.evap_raw_al")
)

In [0]:
%sql 
select * from dev.bronze.evap_raw_al 

In [0]:
%sql
select `__file`,count(1) from dev.bronze.evap_raw_al group by `__file`

### Step 2. Wide Format Data

Perform wide format processing on the data based on the input date.

In [0]:
# Create View to Wide Format Data

# 1. Specifies the date, if no date is specified, defaut day is yesterday

dbutils.widgets.text("Date",format(Sys.Date()-1, "%Y-%m-%d"), "Date of Analysis")

In [0]:
# get input_date

input_date <- dbutils.widgets.get("Date")

print(input_date)

In [0]:
sc <- spark_connect(method = "databricks")  

query <- paste0("SELECT * FROM dev.bronze.evap_raw_al WHERE `__file` LIKE '", input_date, "%'")
df_raw <- dbGetQuery(sc,query)

In [0]:
head(df_raw)

#### Step2.1 Task_3 (interspersed with a process)
Entering data from the previous day and converting it.

The idea is to calculate the length of the data from the previous day when the Evap A or B or C data was converted at the last moment, and the shortest data will be considered as the data to be converted. The shortest data will be considered as the data to be converted and saved in Silver's temporary table for Task_2 to take into account.

In [0]:
before_input_date=as.Date(input_date)-1
print(input_date)
print(before_input_date)

In [0]:
query <- paste0("SELECT * FROM dev.bronze.evap_raw_al WHERE `__file` LIKE '", before_input_date, "%'")
df_raw_before <- dbGetQuery(sc,query)

In [0]:
head(df_raw_before)

In [0]:
# Task 3
# Get the data length for each EvapA_State or EvapB_State or EvapC_State has change

# 1. Calculating the EvapA_State EvapB_State EvapC_State column difference
state_diff_A <- diff(df_raw_before$EvapA_State)
state_diff_B <- diff(df_raw_before$EvapB_State)
state_diff_C <- diff(df_raw_before$EvapC_State)

# 2. Find the location of the last state change
last_change_index_A <- max(which(state_diff_A != 0))
last_change_index_B <- max(which(state_diff_B != 0))
last_change_index_C <- max(which(state_diff_C != 0))

# 3. Compare these data, find the Max of them

last_change_index <- max(last_change_index_A,last_change_index_B,last_change_index_C)

# 4. Calculate the number of rows remaining from the last change to the end of the data
remaining_rows <- nrow(df_raw_before) - last_change_index

# output result
remaining_rows

In [0]:
# Test: delete the last row for testing
# df_raw_before <- df_raw_before[-nrow(df_raw_before), ]

In [0]:
# Get the remaining_rows row
df_remaining_rows <- tail(df_raw_before, remaining_rows)
print(df_remaining_rows)

In [0]:
# Combine the data
df_raw <- bind_rows(df_remaining_rows,df_raw)

In [0]:
display(df_raw)

In [0]:
# Continuing Wide Format Data Operations
# Extend Wide Format so that State=1 and State=0 data are stored separately.
df_wide <- df_raw %>%
  mutate(
    # EvapA state splitting
    EvapA_Var1_State1 = ifelse(EvapA_State == 1, EvapA_Var1, NA),
    EvapA_Var1_State0 = ifelse(EvapA_State == 0, EvapA_Var1, NA),
    EvapA_Var2_State1 = ifelse(EvapA_State == 1, EvapA_Var2, NA),
    EvapA_Var2_State0 = ifelse(EvapA_State == 0, EvapA_Var2, NA),

    # EvapB state splitting
    EvapB_Var1_State1 = ifelse(EvapB_State == 1, EvapB_Var1, NA),
    EvapB_Var1_State0 = ifelse(EvapB_State == 0, EvapB_Var1, NA),
    EvapB_Var2_State1 = ifelse(EvapB_State == 1, EvapB_Var2, NA),
    EvapB_Var2_State0 = ifelse(EvapB_State == 0, EvapB_Var2, NA),

    # EvapC state splitting
    EvapC_Var1_State1 = ifelse(EvapC_State == 1, EvapC_Var1, NA),
    EvapC_Var1_State0 = ifelse(EvapC_State == 0, EvapC_Var1, NA),
    EvapC_Var2_State1 = ifelse(EvapC_State == 1, EvapC_Var2, NA),
    EvapC_Var2_State0 = ifelse(EvapC_State == 0, EvapC_Var2, NA)
  ) %>%
  select(Date_time, 
         EvapA_Var1_State1, EvapA_Var1_State0, EvapA_Var2_State1, EvapA_Var2_State0,
         EvapB_Var1_State1, EvapB_Var1_State0, EvapB_Var2_State1, EvapB_Var2_State0,
         EvapC_Var1_State1, EvapC_Var1_State0, EvapC_Var2_State1, EvapC_Var2_State0
         )

# Prints processed Wide format data
head(df_wide)

### Step 3. Stored in Spark Temporary Table

Preparation for Task 2 can also be done into a Spark persistent table, in this case a temporary table is used because the calculations are done on a daily basis. If you rewrite the data, the previous data will disappear.

In [0]:
%sql

-- create silver layer

CREATE SCHEMA IF NOT EXISTS dev.silver;

In [0]:
df_wide_spark <- copy_to(sc, df_wide, "df_wide_temp", overwrite = TRUE)

In [0]:
head(df_wide_spark)

In [0]:
spark_write_table(df_wide_spark, "dev.silver.evap_daily_temp", mode = "overwrite")

In [0]:
%sql

select * from dev.silver.evap_daily_temp

In [0]:
row_count <- df_wide_spark %>% sdf_nrow()
dbutils.notebook.exit(as.character(row_count))