# Mô tả lại các thao tác trên lakehouse

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Lakehouse") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [0]:
# df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/21070601.phong@student.iuh.edu.vn/housing.csv")

## Import file housing.csv và Đọc dữ liệu

In [0]:
# Set up paths for Delta Lake
data_path = "/mnt/delta/lakehouse_data"
csv_path = "dbfs:/FileStore/shared_uploads/21070601.phong@student.iuh.edu.vn/housing.csv" 

In [0]:
# Read file csv
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_path)

display(df)

price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished
12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished
11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished
10150000,16200,5,3,2,yes,no,no,no,no,0,no,unfurnished
9870000,8100,4,1,2,yes,yes,yes,no,yes,2,yes,furnished
9800000,5750,3,2,4,yes,yes,no,no,yes,1,yes,unfurnished


In [0]:
def initialize_delta_table_from_csv():
    """ Read data from a CSV file and create a Delta table."""
    # Read CSV into DataFrame
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_path)

    df = df.dropna()

    # Save as Delta table
    df.write.format("delta").mode("overwrite").save(data_path)
    print("Delta table created from CSV.")

# Initialize Delta Table from CSV
initialize_delta_table_from_csv()

Delta table created from CSV.


### Thao tác SQL trên lakehouse

In [0]:
def query_with_optimizations():
    """Query the Delta table with optimized storage and indexing."""
    df = spark.read.format("delta").load(data_path)
    df.createOrReplaceTempView("lakehouse_table")
    
    # Add Z-order clustering for better query performance
    df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(data_path)

    # Query the optimized Delta Table
    result = spark.sql("""
        SELECT * FROM lakehouse_table 
        WHERE price > 1000000 AND mainroad = 'yes'
    """)
    display(result)

query_with_optimizations()

price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished
12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished
11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished
10150000,16200,5,3,2,yes,no,no,no,no,0,no,unfurnished
9870000,8100,4,1,2,yes,yes,yes,no,yes,2,yes,furnished
9800000,5750,3,2,4,yes,yes,no,no,yes,1,yes,unfurnished


In [0]:
# Read data from Delta Lake. Query price > 1000000 AND mainroad = 'yes'
def read_and_query_delta_table():
    """Read and query data from Delta table."""
    df = spark.read.format("delta").load(data_path)
    df.createOrReplaceTempView("lakehouse_table")
    
    # Perform SQL query
    result = spark.sql("SELECT * FROM lakehouse_table WHERE price > 1000000 AND mainroad = 'yes'")
    display(result)

read_and_query_delta_table()

price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished
12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished
11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished
10150000,16200,5,3,2,yes,no,no,no,no,0,no,unfurnished
9870000,8100,4,1,2,yes,yes,yes,no,yes,2,yes,furnished
9800000,5750,3,2,4,yes,yes,no,no,yes,1,yes,unfurnished


In [0]:
# Read data from Delta Lake. 
def sum_area_delta_table():
    """Read and query data from Delta table."""
    df = spark.read.format("delta").load(data_path)
    df.createOrReplaceTempView("lakehouse_table")
    
    # Perform SQL query
    result = spark.sql("SELECT SUM(area) FROM lakehouse_table WHERE bedrooms = 4")
    display(result)

sum_area_delta_table()

sum(area)
530296


### Metadata Layers for Data Management

In [0]:
# Phân vùng dữ liệu theo cột "furnishingstatus"
df.write.format("delta") \
    .partitionBy("furnishingstatus") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")

# Hiển thị thông tin về các thư mục phân vùng
files = dbutils.fs.ls("dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")
for file in files:
    print(file.name)


_delta_log/
furnishingstatus=furnished/
furnishingstatus=semi-furnished/
furnishingstatus=unfurnished/


In [0]:
from delta.tables import DeltaTable

# Kiểm tra metadata của Delta Table
delta_table = DeltaTable.forPath(spark, "dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")
display(delta_table.history())

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2024-12-20T04:53:39.000+0000,5217458779593112,21070601.phong@student.iuh.edu.vn,WRITE,"Map(mode -> Overwrite, partitionBy -> [""furnishingstatus""])",,List(1802152467725744),1220-043928-ibta32zr,2.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 545, numOutputBytes -> 16056)",,Databricks-Runtime/12.2.x-scala2.12
2,2024-12-19T15:26:32.000+0000,5217458779593112,21070601.phong@student.iuh.edu.vn,WRITE,"Map(mode -> Overwrite, partitionBy -> [""furnishingstatus""])",,List(1802152467725744),1219-143300-y0ypcu7q,1.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 545, numOutputBytes -> 16056)",,Databricks-Runtime/12.2.x-scala2.12
1,2024-12-19T15:25:51.000+0000,5217458779593112,21070601.phong@student.iuh.edu.vn,WRITE,"Map(mode -> Overwrite, partitionBy -> [""furnishingstatus""])",,List(1802152467725744),1219-143300-y0ypcu7q,0.0,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 545, numOutputBytes -> 16056)",,Databricks-Runtime/12.2.x-scala2.12
0,2024-12-19T15:25:20.000+0000,5217458779593112,21070601.phong@student.iuh.edu.vn,WRITE,"Map(mode -> Overwrite, partitionBy -> [""furnishingstatus""])",,List(1802152467725744),1219-143300-y0ypcu7q,,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 545, numOutputBytes -> 16056)",,Databricks-Runtime/12.2.x-scala2.12


In [0]:
df_partitioned = spark.read.format("delta").load("dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")

# Hiển thị dữ liệu
df_partitioned.show(3)

+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|   price|area|bedrooms|bathrooms|stories|mainroad|guestroom|basement|hotwaterheating|airconditioning|parking|prefarea|furnishingstatus|
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|13300000|7420|       4|        2|      3|     yes|       no|      no|             no|            yes|      2|     yes|       furnished|
|12250000|8960|       4|        4|      4|     yes|       no|      no|             no|            yes|      3|      no|       furnished|
|12215000|7500|       4|        2|      2|     yes|       no|     yes|             no|            yes|      3|     yes|       furnished|
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
only showing top 3 rows



In [0]:
# Truy vấn dữ liệu chỉ cho các bản ghi có `furnishingstatus` là 'Furnished' và mainroad là yes
df_furnished = df_partitioned.filter((col("furnishingstatus") == "furnished") & (col("mainroad") == "yes"))

display(df_furnished)

price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished
12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished
12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished
11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished
9870000,8100,4,1,2,yes,yes,yes,no,yes,2,yes,furnished
9800000,13200,3,1,2,yes,no,yes,no,yes,2,yes,furnished
9240000,3500,4,2,2,yes,no,no,yes,no,2,no,furnished
8960000,8500,3,2,4,yes,no,no,no,yes,2,no,furnished
8890000,4600,3,2,2,yes,yes,no,no,yes,2,no,furnished
8645000,8050,3,1,1,yes,yes,yes,no,yes,1,no,furnished


In [0]:
# Truy vấn dữ liệu ở một phiên bản cụ thể
df_version = spark.read.format("delta").option("versionAsOf", 1).load("dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")

# Hoặc truy vấn dữ liệu theo thời gian (timestamp)
df_time = spark.read.format("delta").option("timestampAsOf", "2024-12-19 15:25:20").load("dbfs:/FileStore/partitioned/lakehouse_partitioned_by_furnishingstatus")

display(df_time)
display(df_version)


price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished
9681000,6000,4,3,2,yes,yes,yes,yes,no,2,no,semi-furnished
9310000,6550,4,2,2,yes,no,no,no,yes,1,yes,semi-furnished
9240000,7800,3,2,2,yes,no,no,no,no,0,yes,semi-furnished
9100000,6000,4,1,2,yes,no,yes,no,no,2,no,semi-furnished
8855000,6420,3,2,2,yes,no,no,no,yes,1,yes,semi-furnished
8750000,4320,3,1,2,yes,no,yes,yes,no,2,no,semi-furnished
8463000,6000,3,2,4,yes,yes,yes,no,yes,0,yes,semi-furnished


price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished
9681000,6000,4,3,2,yes,yes,yes,yes,no,2,no,semi-furnished
9310000,6550,4,2,2,yes,no,no,no,yes,1,yes,semi-furnished
9240000,7800,3,2,2,yes,no,no,no,no,0,yes,semi-furnished
9100000,6000,4,1,2,yes,no,yes,no,no,2,no,semi-furnished
8855000,6420,3,2,2,yes,no,no,no,yes,1,yes,semi-furnished
8750000,4320,3,1,2,yes,no,yes,yes,no,2,no,semi-furnished
8463000,6000,3,2,4,yes,yes,yes,no,yes,0,yes,semi-furnished


### Phân tích nâng cao

In [0]:
# Perform advanced analytics
def advanced_analytics():
    """Perform advanced analytics on Delta Lake data."""
    df = spark.read.format("delta").load(data_path)
    
    df = df.withColumn("income", (rand() * (100000 - 500) + 500).cast("int"))

    # These 2 columns are not saved to the delta table
    df = df.withColumn("land_tax", when(col("area") > 5000, col("price") * 0.2).otherwise(col("price") * 0.1))
    df = df.withColumn("land_tax", round(col("land_tax")).cast("int"))

    display(df)

advanced_analytics()

price,area,bedrooms,bathrooms,stories,mainroad,guestroom,basement,hotwaterheating,airconditioning,parking,prefarea,furnishingstatus,income,land_tax
13300000,7420,4,2,3,yes,no,no,no,yes,2,yes,furnished,65351,2660000
12250000,8960,4,4,4,yes,no,no,no,yes,3,no,furnished,37401,2450000
12250000,9960,3,2,2,yes,no,yes,no,no,2,yes,semi-furnished,60144,2450000
12215000,7500,4,2,2,yes,no,yes,no,yes,3,yes,furnished,30925,2443000
11410000,7420,4,1,2,yes,yes,yes,no,yes,2,no,furnished,99740,2282000
10850000,7500,3,3,1,yes,no,yes,no,yes,2,yes,semi-furnished,62097,2170000
10150000,8580,4,3,4,yes,no,no,no,yes,2,yes,semi-furnished,11985,2030000
10150000,16200,5,3,2,yes,no,no,no,no,0,no,unfurnished,30491,2030000
9870000,8100,4,1,2,yes,yes,yes,no,yes,2,yes,furnished,76381,1974000
9800000,5750,3,2,4,yes,yes,no,no,yes,1,yes,unfurnished,82851,1960000


### ML

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline


# Encode binary categorical columns
binary_columns = ["mainroad", "guestroom", "basement", "hotwaterheating", "airconditioning", "prefarea"]
for col_name in binary_columns:
    df = df.withColumn(f"{col_name}_index", when(col(col_name) == "yes", 1).otherwise(0))

# Drop original binary columns after encoding
df = df.drop(*binary_columns)

display(df)

price,area,bedrooms,bathrooms,stories,parking,furnishingstatus,mainroad_index,guestroom_index,basement_index,hotwaterheating_index,airconditioning_index,prefarea_index
13300000,7420,4,2,3,2,furnished,1,0,0,0,1,1
12250000,8960,4,4,4,3,furnished,1,0,0,0,1,0
12250000,9960,3,2,2,2,semi-furnished,1,0,1,0,0,1
12215000,7500,4,2,2,3,furnished,1,0,1,0,1,1
11410000,7420,4,1,2,2,furnished,1,1,1,0,1,0
10850000,7500,3,3,1,2,semi-furnished,1,0,1,0,1,1
10150000,8580,4,3,4,2,semi-furnished,1,0,0,0,1,1
10150000,16200,5,3,2,0,unfurnished,1,0,0,0,0,0
9870000,8100,4,1,2,2,furnished,1,1,1,0,1,1
9800000,5750,3,2,4,1,unfurnished,1,1,0,0,1,1


In [0]:
# Add new features

df = df.withColumn("price_per_area", col("price") / col("area"))
df = df.withColumn("price_per_bedroom", col("price") / col("bedrooms"))

In [0]:
# Encode multi-class categorical column "furnishingstatus"
indexer = StringIndexer(inputCol="furnishingstatus", outputCol="furnishingstatus_index")
df = indexer.fit(df).transform(df)

# Select final features
feature_columns = [
    "area", "bedrooms", "bathrooms", "stories", "parking", "price_per_area", "price_per_bedroom"
] + [f"{col}_index" for col in binary_columns] + ["furnishingstatus_index"]


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Split data into training and test sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=100)

In [0]:
def train_and_evaluate_model(train_df, test_df):
    """ Train and evaluate the regression model."""
    # Initialize Linear Regression model
    lr = LinearRegression(featuresCol="features", labelCol="price")

    # Train the model
    lr_model = lr.fit(train_df)

    # Evaluate the model
    predictions = lr_model.transform(test_df)

    evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
    r2 = evaluator_r2.evaluate(predictions, {evaluator.metricName: "r2"})

    evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
    mae = evaluator_mae.evaluate(predictions)

    print(f"Root Mean Squared Error (RMSE): {rmse}")
    print(f"R2 Score: {r2}")
    print(f"Mean Absolute Error (MAE): {mae}")

    return lr_model

# Train and evaluate the model
model = train_and_evaluate_model(train_df, test_df)

Root Mean Squared Error (RMSE): 462704.39603311825
R2 Score: 0.9546869448044272
Mean Absolute Error (MAE): 292131.31291530747
