In [None]:
from pyspark.sql import SparkSession

def create_spark_session():
    """Tạo Spark session với cấu hình MinIO và Delta Lake"""
    return SparkSession.builder \
        .appName("MinIO with Delta Lake") \
        .master("local[*]") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.jars.packages", 
                "org.apache.hadoop:hadoop-aws:3.3.4,"
                "com.amazonaws:aws-java-sdk-bundle:1.12.262,"
                "io.delta:delta-spark_2.12:3.0.0") \
        .getOrCreate()

if __name__ == "__main__":
    try:
        print("Đang khởi tạo Spark Session...")
        spark = create_spark_session()
        print(f"✓ Spark Session đã khởi tạo thành công! Version: {spark.version}")
        
        # Đọc Delta table từ MinIO
        print("\nĐang đọc dữ liệu từ MinIO...")
        df = spark.read.format("delta").load("s3a://datn/raw/careerviet/company")
        
        print("✓ Kết nối MinIO thành công!")
        print(f"\nSố lượng records: {df.count()}")
        print("\nSchema:")
        df.printSchema()
        print("\nDữ liệu mẫu:")
        df.show(10, truncate=False)
        
    except Exception as e:
        print(f"✗ Lỗi: {e}")
        print("\nGợi ý khắc phục:")
        
        if "Connection refused" in str(e):
            print("- MinIO chưa chạy. Chạy: docker-compose up -d minio")
        elif "delta" in str(e).lower():
            print("- Thiếu Delta Lake package. Đang tải...")
        elif "No such file" in str(e) or "Path does not exist" in str(e):
            print("- Bucket hoặc path không tồn tại trong MinIO")
            print("- Kiểm tra tại: http://localhost:9001")
        elif "Access Denied" in str(e):
            print("- Sai credentials. Kiểm tra MINIO_ACCESS_KEY và MINIO_SECRET_KEY")
        else:
            print(f"- Lỗi không xác định: {type(e).__name__}")
    
    finally:
        if 'spark' in locals():
            spark.stop()
            print("\n✓ Spark Session đã đóng")

Đang khởi tạo Spark Session...


25/10/04 17:17:51 WARN Utils: Your hostname, DESKTOP-HHC5U09 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/04 17:17:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/root/.pyenv/versions/3.9.13/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-45583c38-bf31-4d46-a7c2-fea1a0f26f43;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 404ms :: artifacts dl 15ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	io.delta#delta-spark_2.12;3.0.0 from central in [default]
	io.delta#delta-storage;3.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 

✓ Spark Session đã khởi tạo thành công! Version: 3.5.6

Đang đọc dữ liệu từ MinIO...


25/10/04 17:18:26 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [None]:
from deltalake import DeltaTable

# Kết nối đến MinIO
dt = DeltaTable(
    "s3://datn/raw/careerviet/recruit",
    storage_options={
        "AWS_ACCESS_KEY_ID": "minioadmin",
        "AWS_SECRET_ACCESS_KEY": "minioadmin",
        "AWS_ENDPOINT_URL": "http://localhost:9000",
        "AWS_REGION": "us-east-1",
        "AWS_ALLOW_HTTP": "true",
        "AWS_S3_FORCE_PATH_STYLE": "true"
    }
)

# Đọc thành Pandas và chỉ lấy cột job_id
df = dt.to_pandas(columns=["job_id"])

# Chuyển sang list
job_ids = df["job_id"].tolist()



ModuleNotFoundError: No module named 'deltalake'