## Data Lakehouse Exploration

Kh√°m ph√° d·ªØ li·ªáu ·ªü c√°c t·∫ßng Bronze, Silver v√† Gold trong Data Lakehouse

In [1]:
# Setup Spark ƒë·ªÉ k·∫øt n·ªëi ƒë·∫øn MinIO v√† Delta Lake
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, count, avg, sum, when
from pyspark.sql.types import *

# T·∫°o Spark Session v·ªõi Delta Lake v√† S3 (MinIO)
spark = SparkSession.builder \
    .appName("DataLakehouseExploration") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

print("‚úÖ Spark Session created successfully!")
spark.version

‚úÖ Spark Session created successfully!


'3.5.6'

### 1. Bronze Layer - Raw Transaction Data

In [2]:
# ƒê·ªçc d·ªØ li·ªáu t·ª´ Bronze layer
try:
    bronze_df = spark.read.format("delta").load("s3a://lakehouse/bronze/transactions")
    
    print("üìä BRONZE LAYER OVERVIEW")
    print("=" * 50)
    print(f"Total records: {bronze_df.count():,}")
    print(f"Columns: {len(bronze_df.columns)}")
    print("\nüìã Schema:")
    bronze_df.printSchema()
    
    print("\nüìù Sample data:")
    bronze_df.show(5, truncate=False)
    
except Exception as e:
    print(f"‚ùå Bronze layer not available: {e}")
    print("üí° Make sure data-producer is running and Bronze layer job has been executed")

‚ùå Bronze layer not available: An error occurred while calling o42.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl

### 2. Silver Layer - Engineered Features

In [3]:
# ƒê·ªçc d·ªØ li·ªáu t·ª´ Silver layer
try:
    silver_df = spark.read.format("delta").load("s3a://lakehouse/silver/transactions")
    
    print("ü•à SILVER LAYER OVERVIEW")
    print("=" * 50)
    print(f"Total records: {silver_df.count():,}")
    print(f"Columns: {len(silver_df.columns)}")
    
    print("\nüìã Schema:")
    silver_df.printSchema()
    
    # Ki·ªÉm tra c√°c features ƒë∆∞·ª£c engineer
    engineered_features = [
        'distance_km', 'age', 'hour', 'day_of_week', 'is_weekend', 
        'hour_sin', 'hour_cos', 'log_amount', 'is_zero_amount', 
        'is_high_amount', 'amount_bin', 'gender_encoded', 
        'is_distant_transaction', 'is_late_night'
    ]
    
    print("\nüîß Engineered Features:")
    for feature in engineered_features:
        if feature in silver_df.columns:
            print(f"  ‚úÖ {feature}")
        else:
            print(f"  ‚ùå {feature} - Missing")
    
    print("\nüìä Feature Statistics:")
    silver_df.select(*engineered_features[:5]).describe().show()
    
    print("\nüìù Sample data (first 3 rows):")
    silver_df.select("trans_num", "amt", "is_fraud", *engineered_features[:5]).show(3)
    
except Exception as e:
    print(f"‚ùå Silver layer not available: {e}")
    print("üí° Make sure Silver layer job has been executed")

‚ùå Silver layer not available: An error occurred while calling o46.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl

### 3. Gold Layer - Dimensional Model

In [None]:
# ƒê·ªçc d·ªØ li·ªáu t·ª´ Gold layer (Dimensional Model)
gold_tables = [
    "dim_customer", "dim_merchant", "dim_time", "dim_location", "fact_transactions"
]

print("ü•á GOLD LAYER OVERVIEW (Dimensional Model)")
print("=" * 60)

for table in gold_tables:
    try:
        df = spark.read.format("delta").load(f"s3a://lakehouse/gold/{table}")
        
        print(f"\nüìä {table.upper()}")
        print("-" * 40)
        print(f"Records: {df.count():,}")
        print(f"Columns: {len(df.columns)}")
        print(f"Schema: {[col for col in df.columns]}")
        
        # Show sample data
        print("Sample data:")
        df.show(3, truncate=False)
        
    except Exception as e:
        print(f"\n‚ùå {table}: Not available - {e}")
        print("üí° Make sure Gold layer job has been executed")

print("\n" + "=" * 60)

### 4. Data Quality Analysis

In [None]:
# Ph√¢n t√≠ch ch·∫•t l∆∞·ª£ng d·ªØ li·ªáu qua c√°c t·∫ßng
def analyze_data_quality(df, layer_name):
    print(f"\nüìä {layer_name} - Data Quality Analysis")
    print("-" * 50)
    
    total_rows = df.count()
    print(f"Total rows: {total_rows:,}")
    
    # Ki·ªÉm tra NULL values
    print("\nüîç NULL Values:")
    for col_name in df.columns:
        null_count = df.filter(col(col_name).isNull()).count()
        null_percentage = (null_count / total_rows) * 100 if total_rows > 0 else 0
        if null_count > 0:
            print(f"  {col_name}: {null_count:,} ({null_percentage:.2f}%)")
    
    # Ki·ªÉm tra fraud distribution n·∫øu c√≥ column is_fraud
    if 'is_fraud' in df.columns:
        print("\nüö® Fraud Distribution:")
        fraud_stats = df.groupBy("is_fraud").count().collect()
        for row in fraud_stats:
            fraud_percentage = (row['count'] / total_rows) * 100
            fraud_label = "Fraud" if row['is_fraud'] else "Normal"
            print(f"  {fraud_label}: {row['count']:,} ({fraud_percentage:.2f}%)")

# Analyze each layer if available
try:
    if 'bronze_df' in locals():
        analyze_data_quality(bronze_df, "BRONZE LAYER")
except:
    print("‚ùå Bronze layer not available for analysis")

try:
    if 'silver_df' in locals():
        analyze_data_quality(silver_df, "SILVER LAYER")
except:
    print("‚ùå Silver layer not available for analysis")

### 5. Feature Analysis for ML

In [None]:
# Ph√¢n t√≠ch features cho Machine Learning (n·∫øu Silver layer available)
if 'silver_df' in locals() and silver_df is not None:
    print("ü§ñ FEATURE ANALYSIS FOR MACHINE LEARNING")
    print("=" * 60)
    
    # ML Features ƒë∆∞·ª£c s·ª≠ d·ª•ng trong training
    ml_features = [
        'amt', 'hour', 'day_of_week', 'log_amount', 'amount_bin', 
        'is_zero_amount', 'is_high_amount', 'distance_km', 
        'is_distant_transaction', 'age', 'gender_encoded', 
        'is_weekend', 'is_late_night', 'hour_sin', 'hour_cos'
    ]
    
    available_features = [f for f in ml_features if f in silver_df.columns]
    missing_features = [f for f in ml_features if f not in silver_df.columns]
    
    print(f"\n‚úÖ Available ML Features ({len(available_features)}/{len(ml_features)}):")
    for feature in available_features:
        print(f"  ‚úÖ {feature}")
    
    if missing_features:
        print(f"\n‚ùå Missing ML Features ({len(missing_features)}):")
        for feature in missing_features:
            print(f"  ‚ùå {feature}")
    
    # Feature distributions by fraud class
    if available_features and 'is_fraud' in silver_df.columns:
        print(f"\nüìä Feature Statistics by Fraud Class:")
        print("-" * 50)
        
        # T√≠nh to√°n th·ªëng k√™ cho m·ªôt v√†i features quan tr·ªçng
        important_features = ['amt', 'distance_km', 'age', 'hour']
        
        for feature in important_features:
            if feature in silver_df.columns:
                stats = silver_df.groupBy("is_fraud") \
                    .agg(
                        avg(feature).alias(f"avg_{feature}"),
                        count(feature).alias("count")
                    ).collect()
                
                print(f"\n{feature.upper()}:")
                for row in stats:
                    fraud_label = "Fraud" if row['is_fraud'] else "Normal"
                    avg_val = row[f'avg_{feature}'] if row[f'avg_{feature}'] else 0
                    print(f"  {fraud_label}: avg={avg_val:.2f}, count={row['count']:,}")
    
else:
    print("‚ùå Silver layer not available for ML feature analysis")
    print("üí° Run Silver layer job first to see feature analysis")

In [None]:
# Cleanup
spark.stop()
print("‚úÖ Spark session stopped")