In [1]:
import logging
from pathlib import Path
from typing import Dict, List, Optional

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *

class ParquetReader:
    def __init__(self, spark_config: Dict[str, str] = None):
        """
        Khởi tạo ParquetReader với cấu hình Spark tùy chỉnh
        """
        self.logger = self._setup_logging()
        self.spark = self._create_spark_session(spark_config)
        
    def _setup_logging(self) -> logging.Logger:
        """Cấu hình logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        return logging.getLogger(__name__)

    def _create_spark_session(self, spark_config: Dict[str, str] = None) -> SparkSession:
        """Tạo SparkSession với cấu hình tùy chỉnh"""
        default_config = {
            "spark.sql.files.maxPartitionBytes": "128MB",
            "spark.sql.shuffle.partitions": "10",
            "spark.driver.memory": "2g",
            "spark.executor.memory": "2g"
        }
        
        builder = SparkSession.builder.appName("ParquetReader")
        
        # Áp dụng cấu hình mặc định
        for key, value in default_config.items():
            builder = builder.config(key, value)
            
        # Ghi đè bằng cấu hình tùy chỉnh nếu có
        if spark_config:
            for key, value in spark_config.items():
                builder = builder.config(key, value)
                
        return builder.getOrCreate()

    def read_parquet(
        self, 
        input_path: str,
        columns: List[str] = None,
        filters: List[str] = None,
        partition_filter: Dict[str, str] = None
    ) -> DataFrame:
        """
        Đọc dữ liệu từ file/thư mục Parquet
        
        Args:
            input_path: Đường dẫn đến file/thư mục Parquet
            columns: Danh sách các cột cần đọc
            filters: Các điều kiện lọc dạng SQL
            partition_filter: Filter theo partition columns
        """
        try:
            self.logger.info(f"Đọc dữ liệu từ: {input_path}")
            
            # Kiểm tra path tồn tại
            if not Path(input_path).exists():
                raise ValueError(f"Path không tồn tại: {input_path}")
            
            # Đọc dữ liệu
            df = self.spark.read.option("mergeSchema", "true").parquet(input_path)
            
            # Select các cột cần thiết
            if columns:
                df = df.select(columns)
            
            # Áp dụng partition filter nếu có
            if partition_filter:
                for column_name, value in partition_filter.items():
                    df = df.filter(col(column_name) == value)  # Đổi tên biến để tránh lỗi

            
            # Áp dụng các điều kiện lọc
            if filters:
                for filter_expr in filters:
                    df = df.filter(expr(filter_expr))  # Sử dụng expr() function
            
            # Log thông tin
            self.logger.info(f"Schema của DataFrame:")
            df.printSchema()
            
            count = df.count()
            self.logger.info(f"Số lượng records: {count}")
            
            return df
            
        except Exception as e:
            self.logger.error(f"Lỗi khi đọc Parquet: {str(e)}", exc_info=True)
            raise

    def analyze_data(self, df: DataFrame) -> Dict:
        """Phân tích cơ bản về DataFrame"""
        try:
            stats = {}
            
            # Thống kê cơ bản
            stats['record_count'] = df.count()
            stats['column_count'] = len(df.columns)
            
            # Đếm null values cho mỗi cột
            null_counts = {}
            for column in df.columns:
                null_count = df.filter(col(column).isNull()).count()
                null_counts[column] = null_count
            stats['null_counts'] = null_counts
            
            # Thống kê cho các cột số
            numeric_stats = df.describe().collect()
            stats['numeric_stats'] = {row['summary']: {c: row[c] for c in df.columns if c != 'summary'}
                                    for row in numeric_stats}
            
            return stats
            
        except Exception as e:
            self.logger.error(f"Lỗi khi phân tích dữ liệu: {str(e)}", exc_info=True)
            raise

    def cleanup(self):
        """Dọn dẹp tài nguyên"""
        if self.spark:
            self.spark.stop()

def main():
    # Cấu hình Spark
    spark_config = {
        "spark.driver.memory": "2g",
        "spark.executor.memory": "2g"
    }
    
    reader = ParquetReader(spark_config)
    
    try:
        # Đọc dữ liệu với filters
        df = reader.read_parquet(
            input_path="D:/Project/delta_lake/bronze/yelp_data/",
            columns=["business_id", "name", "city", "state", "stars"],
            filters=["stars >= 4.0"],  # Sử dụng string expression
            partition_filter={"state": "AZ"}
        )
        
        # Show một số rows
        print("\nMẫu dữ liệu:")
        df.show(5)
        
        # Phân tích dữ liệu
        stats = reader.analyze_data(df)
        
        print("\nThống kê:")
        print(f"Tổng số records: {stats['record_count']}")
        print(f"Số cột: {stats['column_count']}")
        
        print("\nSố lượng giá trị null theo cột:")
        for col, count in stats['null_counts'].items():
            print(f"{col}: {count}")
            
        print("\nThống kê cho các cột số:")
        for metric, values in stats['numeric_stats'].items():
            print(f"\n{metric}:")
            for col, val in values.items():
                print(f"{col}: {val}")
        
    except Exception as e:
        reader.logger.error(f"Lỗi trong main: {str(e)}", exc_info=True)
    finally:
        reader.cleanup()

if __name__ == "__main__":
    main()

2025-02-13 12:24:32,872 - __main__ - INFO - Đọc dữ liệu từ: D:/Project/delta_lake/bronze/yelp_data/
2025-02-13 12:24:32,873 - __main__ - ERROR - Lỗi khi đọc Parquet: Path không tồn tại: D:/Project/delta_lake/bronze/yelp_data/
Traceback (most recent call last):
  File "C:\Users\trieu\AppData\Local\Temp\ipykernel_13148\365934481.py", line 67, in read_parquet
    raise ValueError(f"Path không tồn tại: {input_path}")
ValueError: Path không tồn tại: D:/Project/delta_lake/bronze/yelp_data/
2025-02-13 12:24:32,874 - __main__ - ERROR - Lỗi trong main: Path không tồn tại: D:/Project/delta_lake/bronze/yelp_data/
Traceback (most recent call last):
  File "C:\Users\trieu\AppData\Local\Temp\ipykernel_13148\365934481.py", line 143, in main
    df = reader.read_parquet(
         ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\trieu\AppData\Local\Temp\ipykernel_13148\365934481.py", line 67, in read_parquet
    raise ValueError(f"Path không tồn tại: {input_path}")
ValueError: Path không tồn tại: D:/Project/delta

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, to_timestamp, datediff, current_date, split, size, when, mean
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.ml.feature import Imputer
from functools import reduce
import pyspark.sql.functions as F
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("Users to Silver") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:3.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

def load_users_data(spark, path):
    schema = StructType([
        StructField("user_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("review_count", IntegerType(), True),
        StructField("yelping_since", StringType(), True),
        StructField("useful", IntegerType(), True),
        StructField("funny", IntegerType(), True),
        StructField("cool", IntegerType(), True),
        StructField("elite", StringType(), True),
        StructField("friends", StringType(), True),
        StructField("fans", IntegerType(), True),
        StructField("average_stars", DoubleType(), True),
        StructField("compliment_hot", IntegerType(), True),
        StructField("compliment_more", IntegerType(), True),
        StructField("compliment_profile", IntegerType(), True),
        StructField("compliment_cute", IntegerType(), True),
        StructField("compliment_list", IntegerType(), True),
        StructField("compliment_note", IntegerType(), True),
        StructField("compliment_plain", IntegerType(), True),
        StructField("compliment_cool", IntegerType(), True),
        StructField("compliment_funny", IntegerType(), True),
        StructField("compliment_writer", IntegerType(), True),
        StructField("compliment_photos", IntegerType(), True)
    ])
    return spark.read.json(path, schema=schema)

# Đọc dữ liệu Bronze
df = load_users_data(spark, "D:/Project/delta_lake/bronze/yelp_academic_dataset_user.json")
df.show(5)

def handle_missing_values(df):
    # Xác định các cột số cần impute
    numeric_cols = ["review_count", "useful", "funny", "cool", "fans", "average_stars"] + \
                   [c for c in df.columns if c.startswith("compliment_")]
    
    # Impute giá trị thiếu bằng median
    imputer = Imputer(
        inputCols=numeric_cols,
        outputCols=[f"{c}_imputed" for c in numeric_cols]
    ).setStrategy("median")
    df = imputer.fit(df).transform(df)
    
    # Thay thế cột gốc bằng các giá trị đã được impute
    for c in numeric_cols:
        df = df.drop(c).withColumnRenamed(f"{c}_imputed", c)
    
    # Điền giá trị mặc định cho các cột phân loại
    df = df.na.fill({
        "name": "Unknown",
        "yelping_since": "1970-01-01",
        "elite": "",
        "friends": ""
    })
    return df

df = handle_missing_values(df)
df.show(5)

def standardize_data(df):
    # Chuẩn hóa dữ liệu: làm sạch tên, chuyển đổi kiểu ngày, tính tuổi tài khoản,...
    df = df.withColumn("name", trim(lower(col("name")))) \
           .withColumn("yelping_since", to_timestamp(col("yelping_since"))) \
           .withColumn("account_age_days", datediff(current_date(), col("yelping_since"))) \
           .withColumn("friends_array", split(col("friends"), ",")) \
           .withColumn("friends_count", size(col("friends_array"))) \
           .withColumn("elite_years", split(col("elite"), ","))
    return df

df = standardize_data(df)
df.show(5)

def feature_engineering(df):
    # Tính tổng số compliment sử dụng reduce để cộng các cột lại với nhau
    compliment_cols = [col(c) for c in df.columns if c.startswith("compliment_")]
    df = df.withColumn("total_compliments", reduce(lambda a, b: a + b, compliment_cols))
    
    # Tính điểm engagement dựa trên review, fans và số lượng bạn bè
    df = df.withColumn("engagement_score", 
                       (col("review_count") + col("fans") + col("friends_count")) / col("account_age_days"))
    
    # Xác định trạng thái người dùng dựa trên điều kiện
    df = df.withColumn("user_status",
        when(size(col("elite_years")) > 0, "Elite")
        .when(col("fans") > 10, "Popular")
        .when(col("review_count") > 50, "Active")
        .otherwise("Regular"))
    
    # Phân loại hành vi đánh giá
    df = df.withColumn("rating_behavior",
        when(col("average_stars") >= 4.0, "Positive")
        .when(col("average_stars") <= 2.0, "Critical")
        .otherwise("Neutral"))
    
    return df

df = feature_engineering(df)
df.show(5)

def validate_data(df):
    # Kiểm tra dữ liệu: lọc những dòng có giá trị không hợp lệ
    df = df.filter(
        (col("average_stars").between(1, 5)) &
        (col("review_count") >= 0) &
        (col("fans") >= 0) &
        (F.length(col("user_id")) > 0)
    )
    
    # Lọc các cột số khác để đảm bảo không có giá trị âm
    other_numeric_cols = ["useful", "funny", "cool"] + [c for c in df.columns if c.startswith("compliment_")]
    for c in other_numeric_cols:
        df = df.filter(col(c) >= 0)
    
    return df

df = validate_data(df)
df.show(5)

def quality_checks(df):
    # Kiểm tra số lượng giá trị null trên mỗi cột
    null_counts = df.select([F.sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    
    # Tính số dòng trùng lặp
    total_count = df.count()
    dedup_count = df.dropDuplicates().count()
    duplicate_count = total_count - dedup_count
    
    # Thống kê giá trị trung bình của một số cột quan trọng
    value_dist = df.select(
        mean("review_count").alias("mean_reviews"),
        mean("fans").alias("mean_fans"),
        mean("average_stars").alias("mean_rating"),
        mean("total_compliments").alias("mean_compliments"),
        mean("engagement_score").alias("mean_engagement")
    )
    
    print("Null Counts:")
    null_counts.show()
    print(f"Duplicate Count: {duplicate_count}")
    print("Value Distributions:")
    value_dist.show()
    
    return df

df = quality_checks(df)

# Ghi dữ liệu ra layer Silver dưới định dạng Delta, phân vùng theo user_status
df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .partitionBy("user_status") \
    .save("D:/Project/delta_lake/silver/users")


Py4JJavaError: An error occurred while calling o78.showString.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.catalogPluginClassNotFoundForCatalogError(QueryExecutionErrors.scala:1925)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:70)
	at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:67)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:86)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:86)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:85)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:51)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:143)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:140)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:295)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:295)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:275)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3314)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3537)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.delta.catalog.DeltaCatalog
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
	... 65 more
