In [0]:
%fs
ls /FileStore/tables

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Khởi tạo ứng dụng Spark
spark_apps = SparkSession.builder \
    .master("local[*]") \
        .appName("Delta Lake Spark Application") \
            .config("spark.executor.memory", "4g") \
                .config("spark.executor.instances", 2 ) \
                    .config("spark.driver.memory", "4g") \
                        .getOrCreate()



In [0]:
lego_sets_schema = StructType([
    StructField("set_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("theme", StringType(), True),
    StructField("subtheme", StringType(), True),
    StructField("themeGroup", StringType(), True),
    StructField("category", StringType(), True),
    StructField("pieces", IntegerType(), True),
    StructField("minifigs", IntegerType(), True),
    StructField("agerange_min", IntegerType(), True),
    StructField("US_retailPrice", DecimalType(3,2), True),
    StructField("bricksetURL", StringType(), True),
    StructField("thumbnailURL", StringType(), True),
    StructField("imageURL", StringType(), True)
])



In [0]:
df = spark_apps.read \
    .option("header","true") \
        .option("inferSchema","true") \
            .csv("dbfs:/FileStore/tables/lego_sets.csv")

df_read_csv.show(5)

In [0]:
# Spark Write Dataframe
df_write_parquet = df.write \
    .mode("overwrite") \
        .partitionBy("themeGroup") \
            .format("parquet") \
                .save("dbfs:/FileStore/output/lego_sets_parquet")


In [0]:
%fs
ls /FileStore/output/lego_sets_parquet

In [0]:
%fs
ls dbfs:/FileStore/output/lego_sets_parquet/themeGroup=Vintage/

Khác nhau giữa
- spark (bản built-in trong Databricks)
- SparkSession
Cơ bản nó như nhau, spark là cái default còn SparkSession là cái mình tự config

In [0]:
df_write_delta = df.write \
    .mode("overwrite") \
        .format("delta") \
            .partitionBy("themeGroup") \
                .save("/FileStore/output/lego_sets_delta")

In [0]:
%fs
ls dbfs:/FileStore/output/lego_sets_delta/_delta_log/

In [0]:
spark.read.format("json").load("dbfs:/FileStore/output/lego_sets_delta/_delta_log/00000000000000000000.json").display()

In [0]:
%fs
ls dbfs:/FileStore/output/lego_sets_parquet/

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS mydb

In [0]:
%sql
CREATE TABLE IF NOT EXISTS mydb.lego_sets USING parquet LOCATION "dbfs:/FileStore/output/lego_sets_parquet/"

In [0]:
%sql
MSCK REPAIR TABLE mydb.lego_sets;
SELECT * FROM mydb.lego_sets LIMIT 10;
-- Cần có MSCK REPAIR TABLE vì sao ?
/*
Đây là cơ chế Catalog và Partition Discovery của Spark/Hive
Thiết kế này sinh ra nhằm sử dụng trong trường hợp
- Partition cần được chỉ định thủ công
- Spark tránh việc đọc toàn bộ dữ liệu trong mỗi lần query
Spark không quản lý metadata, câu lệnh CREATE TABLE chỉ tạo metadata trong metastore -> partition chỉ được tạo khi thực hiện MSCK REPAIR TABLE
*/

In [0]:
%sql
CREATE TABLE IF NOT EXISTS mydb.lego_sets_delta 
USING DELTA
LOCATION "dbfs:/FileStore/output/lego_sets_delta/";

SELECT * FROM mydb.lego_sets_delta LIMIT 10;
-- Tại sao khi tạo bảng DELTA thì không cần MSCK REPAIR TABLE ?
/*
Cơ chế Catalog và Partition discovery của tổ hợp Spark/Hive - sử dụng parquet khác so với cách quản lý catalog và partition của DELTA LAKE
Delta Lake = Parquet + Transaction Logs
Tệp Transaction Logs đó chính là metadata của Delta Lake, Delta Lake không dựa vào metastore để quản lý metadata mà nó tự quản lý bằng Transaction Logs của riêng mình
*/