In [0]:
import json

# Mount
# Set up the configurations for mounting the GCS bucket
gcs_bucket_name = "gbmecom"
mount_point = "/mnt/mskltestmnt033"
project_id = "mentorsko-1723044085161"
service_account_key = "/dbfs/FileStore/tables/mentorsko_1723044085161_d092fa612a29.json"


In [0]:

# Read the service account key file
with open(service_account_key, 'r') as key_file:
    service_account_info = json.load(key_file)

# Define the GCS service account credentials
config = {
    "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
    "fs.gs.auth.service.account.enable": "true",
    "fs.gs.auth.service.account.email": service_account_info["client_email"],
    "fs.gs.auth.service.account.private.key.id": service_account_info["private_key_id"],
    "fs.gs.auth.service.account.private.key": service_account_info["private_key"],
    "fs.gs.project.id": project_id
}

# Mount the GCS bucket
dbutils.fs.mount(
    source=f"gs://{gcs_bucket_name}",
    mount_point=mount_point,
    extra_configs=config
)

In [0]:
dbutils.fs.ls(mount_point)

In [0]:
df = spark.read.csv("dbfs:/mnt/mskltestmnt033/returns.csv")

In [0]:
display(df)

## Enquiriuos-1. Part 2


In [0]:
addresses_path = 'dbfs:/FileStore/tables/addresses.csv'
addresses_df = spark.read.option("header", True).option("inferSchema", True).csv(addresses_path)
customers_path = 'dbfs:/FileStore/tables/customers.csv'
customers_df = spark.read.option("header", True).option("inferSchema", True).csv(customers_path)
orders_path = 'dbfs:/FileStore/tables/orders.csv'
orders_df = spark.read.option("header", True).option("inferSchema", True).csv(orders_path)
order_items_path = 'dbfs:/FileStore/tables/orders_items.csv'
orders_items_df = spark.read.option("header", True).option("inferSchema", True).csv(order_items_path)
products_path = 'dbfs:/FileStore/tables/products.csv'
products_df = spark.read.option("header", True).option("inferSchema", True).csv(products_path)
suppliers_path = 'dbfs:/FileStore/tables/suppliers.csv'
suppliers_df = spark.read.option("header", True).option("inferSchema", True).csv(suppliers_path)
# 
payments_path = 'dbfs:/FileStore/tables/payments.csv'
payment_method_path = 'dbfs:/FileStore/tables/payment_methods.csv'
payments_df = spark.read.option("header", True).option("inferSchema", True).csv(payments_path)
payment_method_df = spark.read.option("header", True).option("inferSchema", True).csv(payment_method_path)
shipping_path = 'dbfs:/FileStore/tables/shipping_tier.csv'
shipping_df = spark.read.option("header", True).option("inferSchema", True).csv(shipping_path)
returns_path = 'dbfs:/FileStore/tables/returns.csv'
returns_df = spark.read.option("header", True).option("inferSchema", True).csv(returns_path)



In [0]:
# orders_df.display()
# order_items_df.display()
# payments_df.display()
# payment_method_df.display()

In [0]:
from pyspark.sql.functions import col
# The above line of code is to import the sql functions in pyspark.
payment_method_preference = (
    payments_df
    .join(payment_method_df, on='PaymentMethodID')
    .groupBy('MethodName')
    .count()
    .orderBy(col("count").desc())
)

payment_method_preference.display()

In [0]:
orders_df.createOrReplaceTempView("Order_items")

In [0]:
%sql
select
    ProductID,
    sum(Quantity) as TotalQuantitySold
from
  order_items_df
GROUP BY ProductID
ORDER BY TotalQuantitySold DESC
;

In [0]:
orders_df.createOrReplaceGlobalTempView("orders")
orders_items_df.createOrReplaceGlobalTempView("orders_items")
payments_df.createOrReplaceGlobalTempView("payments")
payment_method_df.createOrReplaceGlobalTempView("payment_methods")
products_df.createOrReplaceGlobalTempView("products")
suppliers_df.createOrReplaceGlobalTempView("suppliers")
addresses_df.createOrReplaceGlobalTempView("addresses")
customers_df.createOrReplaceGlobalTempView("customers")
shipping_df.createOrReplaceGlobalTempView("shipping")
returns_df.createOrReplaceGlobalTempView("returns")

In [0]:
%sql
SELECT 
    o.OrderChannel,
    p.PaymentMethodID,
    COUNT(o.OrderID) as NumberOfOrders
FROM global_temp.orders o
JOIN 
    global_temp.payments p on o.OrderID = p.OrderID
GROUP BY o.OrderChannel, p.PaymentMethodID
ORDER BY NumberOfOrders DESC;

In [0]:
from pyspark.sql.functions import *

In [0]:
%sql
with t1 as (
SELECT c.CustomerID, c.FirstName, p.Product_ID, p.Discounted_Price , oi.Quantity 
FROM global_temp.customers c
JOIN global_temp.orders o on c.CustomerID = o.CustomerID
JOIN global_temp.orders_items oi on o.OrderID = oi.OrderID
JOIN global_temp.products p on oi.ProductID = p.Product_ID
)
select CustomerID,FirstName, sum(CAST(regexp_replace(discounted_price, '[^0-9]', '') AS INT) * Quantity) as ttl_sales
from t1
group by CustomerID,firstname
order by ttl_sales desc
limit 10
;

In [0]:
%sql
SELECT s.TierName, COUNT(o.OrderID) AS order_count
FROM global_temp.orders o
JOIN global_temp.shipping s ON o.ShippingTierID = s.ShippingTierID
GROUP BY s.TierName
ORDER BY order_count DESC
;

In [0]:
# Generated = DBX_helper
most_popular_shiptier = spark.sql("""
    SELECT s.TierName, COUNT(*) AS order_count
    FROM global_temp.orders o
    JOIN global_temp.shipping s
      ON o.ShippingTierID = s.ShippingTierID
    GROUP BY s.TierName
    ORDER BY order_count DESC
    -- LIMIT 1
""")
display(most_popular_shiptier)

In [0]:
%sql
SELECT p.Product_ID, p.Product_Name, count(o.orderID) as total_sales
FROM global_temp.products p
JOIN global_temp.orders_items o on o.ProductID = p.Product_ID
GROUP BY p.Product_ID, p.Product_Name
ORDER BY total_sales DESC
;

In [0]:
%sql
with t1 as (
SELECT c.CustomerID, o.OrderID,(CAST(regexp_replace(p.discounted_price, '[^0-9]', '') AS INT) * oi.Quantity) as sales
        ,oi.Quantity
        , O.OrderDate, O.ActualDeliveryDate
FROM global_temp.customers c
JOIN global_temp.orders o on c.CustomerID = o.CustomerID
JOIN global_temp.orders_items oi on o.OrderID = oi.OrderID
JOIN global_temp.products p on oi.ProductID = p.Product_ID
)
select customerid
        , count(OrderID) AS total_orders, sum(sales) as ttl_sales, avg(Quantity) as avg_bucket_size
        , datediff(DAY, min(OrderDate),max(ActualDeliveryDate)) as length_of_stay_days
from t1
group by customerID
;

Lake House = Data Lake(cheap,can have any kind of data,not an ACID complaince,) + Data Warehouse(OLAP,storage, compute,making reports/analytics).

Data bricks is storing the entire database in Data lake. So we use Delta lake, which supports ACID property-helpful.

**Delta Lake** : 
Delta lake supports all ACID properties.
parquet file format is the default format in delta lake.
parquet is a columnar structured format whereas .csv is a row structured form of the data.
if we apply SQL commands on a .csv file it will be processed in row-wise.It will take time for specific row related data.

Delta lake usually creates versions of the data for updating and other tasks.
Delta lake creates new parquet file everytime u change something in the data.


**Ways to build Bronze Layer**

insert into ? -- does it append?
insert override -- if destination has 500 records and destination has 20 records. It will override the 500 data as 20 data records and we will lose previous 500 records.

So we use COPY INTO -- It's souce = cloud storage  && destination = Delta table.
Copy Into command skips all the already processed data and processes only the new data maintaining the old records in the data itself.
