In [0]:
%sql
use catalog db_academy;
use schema production;

SELECT
  current_catalog(),
  current_schema()

### Enviroment Initialization


##### Creating Catalog and Schemas

In [0]:
%sql
-- Creating Catalog
CREATE CATALOG IF NOT EXISTS db_Academy
MANAGED LOCATION 'abfss://unity-catalog-storage@dbstoragertkgqhhy66nu6.dfs.core.windows.net/1290381263006333'

In [0]:
%sql
create schema if not exists db_academy.production

In [0]:
%sql
show schemas in db_academy

In [0]:
%sql
describe schema extended db_academy.production

##### Data Source</br>
Volumes has been downloaded from:
[Link to Data Source](https://adb-1290381263006333.13.azuredatabricks.net/marketplace/consumer/listings/ca87a8be-9a17-4469-acee-d992d6cd16ca?o=1290381263006333) </br>
Saved as db_academy_ecomerce under group "Delta Shares Received"

In [0]:
%sql
describe volume db_academy_ecomerce.v01.raw

In [0]:
# exploring Volumes within Catalog
spark.sql("LIST '/Volumes/db_academy_ecomerce/v01/raw/users-historical/'").display()

### Lakeflow Connect

##### Batch Ingestion

In [0]:
%sql
drop table if exists historical_users_bronze_ctas;

create table historical_users_bronze_ctas as 
select * from read_files(
  '/Volumes/db_academy_ecomerce/v01/raw/users-historical/',
  format => 'parquet'
)


In [0]:
%sql
Select * from db_academy.production.historical_users_bronze_ctas
limit 5

In [0]:
%sql
describe table extended historical_users_bronze_ctas

##### Incremental Ingestion with COPY INTO </br>
- this is legacy option, new way of incremental ingest is to use autoloader

In [0]:
%sql
-- Create Empty table without schema
drop table if exists historical_users_bronze_copy_into;
create table historical_users_bronze_copy_into;

select * from historical_users_bronze_copy_into

In [0]:
%sql
-- Incremental and Idempotent file ingestion
-- Idempotent(means: ingest the same data multiple times and still end up with the same correct final state, without creating duplicates or inconsistencies)
copy into historical_users_bronze_copy_into
from '/Volumes/db_academy_ecomerce/v01/raw/users-historical/'
fileformat = parquet
copy_options ('mergeSchema' = 'true')

In [0]:
%sql
select * From historical_users_bronze_copy_into limit (10)

##### Incremental Ingestion with AUTOLADER (Basic) </br>
- this is new way of incremental ingestion

In [0]:
%sql
CREATE SCHEMA if not exists db_academy.Data_landing;
CREATE VOLUME if not exists db_academy.Data_landing.data;

In [0]:
# Create new directory for files from Delta Shares
dbutils.fs.mkdirs('/Volumes/db_academy/data_landing/data/autoloader')

In [0]:
# copy first file from delta Shares into new Directory
dbutils.fs.cp('/Volumes/db_academy_ecomerce/v01/raw/sales-csv/000.csv',
              '/Volumes/db_academy/data_landing/data/sales/000.csv')

In [0]:
(spark.readStream.format("cloudFiles")
                 .option("cloudFiles.format", "csv")
                 .option('cloudFiles.schemaLocation', '/Volumes/db_academy/data_landing/data/autoloader/')
                 .option("sep","|")
                 .option("inferSchema","true")
                 .load('/Volumes/db_academy/data_landing/data/sales/')
    .writeStream.option('checkpointLocation', '/Volumes/db_academy/data_landing/data/autoloader/')
                .trigger(availableNow=True)
                .toTable('db_academy.production.sales_autoloader_python')
)

In [0]:
%sql
select * from db_academy.production.sales_autoloader_python

In [0]:
dbutils.fs.cp('/Volumes/db_academy_ecomerce/v01/raw/sales-csv/001.csv',
              '/Volumes/db_academy/data_landing/data/sales/001.csv')

In [0]:
%sql
describe history db_academy.production.sales_autoloader_python

###### Documentation

**Links to documentation:**
</br>
[Streaming tables](https://docs.databricks.com/gcp/en/ldp/streaming-tables)</br>
[CREATE STREAMING TABLE](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table#syntax)</br>
[Use streaming tables in Databricks SQL](https://docs.databricks.com/aws/en/ldp/dbsql/streaming)

###### Cleaning resources

In [0]:
%sql
-- delete Delta Table
drop table if exists db_academy.production.sales_autoloader_python

In [0]:
# delte all files created by autoloader
dbutils.fs.rm(
    "/Volumes/db_academy/data_landing/data/autoloader/",
    recurse=True
)

##### Reading Data with metadata </br>
- options to add metadata from reading

In [0]:
import pyspark.sql.functions as sf

In [0]:
df = spark.read.format("parquet").load("/Volumes/db_academy_ecomerce/v01/raw/users-historical/")

df.select(
  "*", 
  sf.to_date(sf.from_unixtime(sf.col("user_first_touch_timestamp")/1_000_000)).alias("User_first_touch"),
  sf.current_timestamp().alias("Ingestion Time"),
  sf.col("_metadata.file_modification_time"),
  sf.col("_metadata.file_name"),
).limit(5).display()

In [0]:
%sql
-- Metadata are stored automaticaly but needs to be explicitly selected
select 
  _metadata.file_name as file_name,
  _metadata.file_modification_time as modif_time,
  count(*)
 from db_academy.production.sales_autoloader_python
 group by file_name,modif_time

  

###### converting UNIX time in SQL

In [0]:
%sql
SELECT
  user_first_touch_timestamp,
   from_unixtime(user_first_touch_timestamp/1000000) :: date as date_formated,
  from_unixtime(user_first_touch_timestamp/1000000) :: timestamp as time_formated
from read_files(
  "/Volumes/db_academy_ecomerce/v01/raw/users-historical/"
)
limit 5

##### Rescued Data Column </br>

In [0]:
%sql
-- quick file exploration
select * from text.`/Volumes/db_academy_ecomerce/v01/raw/sales-csv/000.csv`
limit 5

In [0]:
%sql
select 
  _rescued:transactions_timestamp as time_stamp,
  *
from read_files(
  '/Volumes/db_academy_ecomerce/v01/raw/sales-csv/',
  format => 'CSV',
  sep => '|',
  schema => 'order_id INT, email STRING',
  rescueddatacolumn => '_rescued',
  header => true
)
limit 2

##### Working With JSON Data </br>

###### Reading JSON using SQL

In [0]:
%sql
CREATE OR REPLACE TABLE kafka_bronze_decoded AS 
select
  -- *,
  unbase64(key) :: STRING as decoded_key,
  from_unixtime(timestamp/1000) :: DATE as date_formated,
  unbase64(value) :: STRING as value_decoded
from read_files(
  '/Volumes/db_academy_ecomerce/v01/raw/events-kafka/000.json',
  format => 'JSON'
)

In [0]:
%sql
SELECT 
  * 
FROM kafka_bronze_decoded
LIMIT 5

###### reading JSON using Python

In [0]:
import pyspark.sql.functions as sf

In [0]:
schema = spark.sql("SELECT schema_of_json(value_decoded) FROM json_decoded").collect()[0][1]

df = spark.read.format("json").load('/Volumes/db_academy_ecomerce/v01/raw/events-kafka/000.json')
# df.withColumn("value_decoded",sf.unbase64("value").cast("STRING")).select("value_decoded").limit(1).display()

df.select(
    sf.unbase64("key").cast("STRING").alias("key_decoded"),
    sf.from_unixtime(sf.col("timestamp")/1000).cast("date").alias("date_decoded"),
    sf.unbase64("value").cast("STRING").alias("value_decoded")
).withColumn("json_struct", sf.from_json(sf.col("value_decoded"),schema)) \
.drop("value_decoded") \
 .select("*", "json_struct.*") \
.limit(5).display()