In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# 1. Create SparkSession
# SPARK_MASTER=spark://spark-master:7077 is set,
# so getOrCreate() will connect to the cluster automatically.
# Use config("spark.master", "spark://<your container name>:<your port>") if needed
spark = SparkSession.builder \
    .appName("KafkaToDeltaLake") \
    .master("spark://spark-master:7077") \
    .config(
        "spark.jars.packages",
        "org.elasticsearch:elasticsearch-spark-30_2.12:8.11.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,io.delta:delta-spark_2.12:3.3.0"
    ) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "2") \
    .config("spark.ui.reverseProxy", "true") \
    .getOrCreate()

print("Spark Session Created and connected to cluster.")

# 2. 定义我们Kafka消息中JSON 'value'的Schema
# 这必须与我们的 producer.py 脚本生成的JSON结构完全匹配
json_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", StringType(), True), # 先作为字符串读取
    StructField("page", StringType(), True),
    StructField("value", DoubleType(), True)
])

# 3. 定义Kafka源 (Read Stream)

# Python生产者在宿主机上，通过localhost:9094访问Kafka
# Spark容器通过容器内的9092端口访问Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user_events") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

print("Kafka stream reader created.")

# 4. 解析数据 (Transform)
# Kafka流的 'value' 字段是二进制的
# a. 将其转换为字符串
# b. 使用 from_json 和我们的schema将其解析为结构体
# c. 将时间戳字符串转换为真正的Timestamp类型
parsed_df = kafka_df \
    .select(from_json(col("value").cast("string"), json_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_timestamp", expr("to_timestamp(timestamp)")) \
    .drop("timestamp") # 丢弃原始的字符串时间戳

# 5. 定义Delta Lake目标 (Write Stream)
# 我们将数据写入 `/home/jovyan/data/user_events_delta`
# 这个路径被挂载到了你本地的 `./spark-data/user_events_delta` 目录
delta_stream_query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/opt/spark/data/checkpoints/user_events_delta") \
    .trigger(processingTime="15 seconds") \
    .start("/opt/spark/data/user_events_delta")

print("Delta Lake write stream started.")
print("数据正在从Kafka实时流入，并写入到 /home/jovyan/data/user_events_delta 目录中。")

# (可选) 你可以稍后停止这个流
# delta_stream_query.stop()

:: loading settings :: url = jar:file:/opt/conda/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /opt/spark/.ivy2/cache
The jars for the packages stored in: /opt/spark/.ivy2/jars
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-54c2f959-97b7-4686-9450-0d2864d9c830;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;8.11.3 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in central
	found commons-logging#commons-logging;1.1.1 in central
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in central
	found org.apache.spark#spark-yarn_2.12;3.3.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.3 in central
	found org.apache.kafka#kafka-clients;3.4.1 in centra

Spark Session Created and connected to cluster.
Kafka stream reader created.
Delta Lake write stream started.
数据正在从Kafka实时流入，并写入到 /home/jovyan/data/user_events_delta 目录中。
这个单元格将保持'运行中'状态，这是正常的。


25/11/03 06:30:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/11/03 06:30:30 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [1]:
import great_expectations as gx
import datetime
import sys
from pyspark.sql import SparkSession
from great_expectations.exceptions import DataContextError
from great_expectations.checkpoint import UpdateDataDocsAction

# ==============================================================================
# Step 0: Initialize Spark Session (Reuse)
# ==============================================================================
# Assume the 'spark' session already exists from the previous script.
# If not, create a new one.
try:
    # Check if 'spark' variable is defined and is a SparkSession
    if 'spark' in locals() and isinstance(spark, SparkSession):
        print("Reusing existing Spark Session.")
    else:
        print("Spark Session not found, creating a new one...")
        spark = SparkSession.builder \
            .appName("ReusableSparkSessionForGE") \
            .master("local[*]") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()
        print("New Spark Session created.")
except Exception as e:
    print(f"Error initializing Spark Session: {e}")
    # In environments like Jupyter/Databricks, getting the session might differ
    # Exit here as Spark Session is required
    sys.exit("Spark Session is mandatory.")


# ==============================================================================
# Step 1: Get Great Expectations Context
# ==============================================================================
# Use FileDataContext to persist configuration, expectations, and validation results to disk
# This is crucial for generating Data Docs
print("Step 1: Getting Great Expectations Context...")
project_root_dir = "/home/jovyan/work/ge_spark_project"
try:
    context = gx.get_context(mode="file", project_root_dir=project_root_dir)
    print(f"Successfully got or created FileDataContext, project root: {project_root_dir}")
except Exception as e:
    print(f"Error getting Context: {e}")
    sys.exit("Could not initialize GE Context.")

# ==============================================================================
# Step 2: Get or Update Data Source
# ==============================================================================
print("\nStep 2: Getting or Updating Data Source...")
data_source_name = "my_spark_delta_source" # Renamed for clarity
try:
    data_source = context.data_sources.add_or_update_spark(name=data_source_name)
    print(f"Successfully got/updated Spark Data Source: {data_source_name}")
except Exception as e:
    print(f"Error adding/updating Data Source: {e}")
    sys.exit("Could not set up Data Source.")

# ==============================================================================
# Step 3: Get or Update Data Asset
# ==============================================================================
print("\nStep 3: Getting or Updating Data Asset...")
# This Asset represents a "slot" where we will put our DataFrame
data_asset_name = "my_delta_dataframe_asset" # Renamed for clarity
try:
    data_asset = data_source.add_dataframe_asset(name=data_asset_name)
    print(f"Successfully got/updated DataFrame Asset: {data_asset_name}")
except Exception as e:
    print(f"Error adding/updating Data Asset: {e}")
    sys.exit("Could not set up Data Asset.")

# ==============================================================================
# Step 4: Add Batch Definition to Asset
# ==============================================================================
print("\nStep 4: Adding Batch Definition to Asset...")
# This definition tells GE that we will provide a full DataFrame as a batch
batch_definition_name = "my_whole_dataframe_definition" # Renamed for clarity
try:
    batch_definition = data_asset.add_batch_definition_whole_dataframe(
        batch_definition_name
    )
    print(f"Successfully added Batch Definition: {batch_definition_name}")
except Exception as e:
    print(f"Error adding Batch Definition: {e}")
    sys.exit("Could not set up Batch Definition.")

# ==============================================================================
# Step 5: Read Delta Lake Data from Spark
# ==============================================================================
print("\nStep 5: Reading Delta Lake Data from Spark...")
full_path = "/opt/spark/data/user_events_delta"
try:
    # Note: .option("versionAsOf", 1041) is hardcoded. Remove this line if you need the latest data.
    df = spark.read.format("delta").load(full_path)
    # df = spark.read.format("delta").option("versionAsOf", 1041).load(full_path)
    
    print(f"Successfully loaded Delta Lake data from {full_path}.")
    print("Data Schema:")
    df.printSchema()
except Exception as e:
    print(f"Error reading Delta Lake data: {e}")
    sys.exit("Could not load source data.")

# ==============================================================================
# Step 6: Create Batch Parameters
# ==============================================================================
print("\nStep 6: Creating Batch Parameters...")
# This is the key to passing our DataFrame to the Batch Definition
batch_parameters = {"dataframe": df}
print("Batch Parameters are ready.")

# ==============================================================================
# Step 7: Define and Add Expectation Suite
# ==============================================================================
print("\nStep 7: Defining and Adding Expectation Suite...")
suite_name = "user_events_delta_suite"
try:
    # Create the suite object
    suite = gx.ExpectationSuite(name=suite_name)

    # Add expectations to the suite object
    suite.add_expectation(gx.expectations.ExpectColumnToExist(column="user_id"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"))
    
    suite.add_expectation(gx.expectations.ExpectColumnToExist(column="event_type"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
        column="event_type", 
        value_set=['click', 'page_view', 'purchase', 'add_to_cart']
    ))

    suite.add_expectation(gx.expectations.ExpectColumnToExist(column="page"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
        column="page", 
        value_set=['/home', '/products/1', '/products/2', '/cart', '/checkout']
    ))
    
    suite.add_expectation(gx.expectations.ExpectColumnToExist(column="event_timestamp"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="event_timestamp"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(column="event_timestamp", type_="TimestampType"))

    suite.add_expectation(gx.expectations.ExpectColumnToExist(column="value"))
    suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
        column="value", 
        min_value=5.0, 
        max_value=100.0,
        mostly=0.95, # Allow a few nulls or invalid values (e.g., for non-purchase events)
        # row_condition='col("event_type") == "purchase"'  # Custom condition to allow nulls
    ))

    # Save (or update) the configured suite to the Context
    context.suites.add_or_update(suite)
    print(f"Successfully defined and saved Expectation Suite: {suite_name}")
except Exception as e:
    print(f"Error processing Expectation Suite: {e}")
    sys.exit("Could not create Expectation Suite.")

# ==============================================================================
# Step 8: Define and Add Validation Definition
# ==============================================================================
print("\nStep 8: Defining and Adding Validation Definition...")
validation_definition_name = "validate_my_delta_dataframe"
try:
    validation_definition = gx.core.validation_definition.ValidationDefinition(
        name=validation_definition_name, 
        data=batch_definition,  # Tell it which data definition to use
        suite=suite             # Tell it which expectation suite to use
    )
    # Save the validation definition to the Context
    context.validation_definitions.add_or_update(validation_definition)
    print(f"Successfully defined and saved Validation Definition: {validation_definition_name}")
except Exception as e:
    print(f"Error defining Validation Definition: {e}")
    sys.exit("Could not create Validation Definition.")

# ==============================================================================
# Step 9 & 10: Define Data Docs Site and Actions
# ==============================================================================
print("\nStep 9 & 10: Configuring Data Docs Site and Actions...")
site_name = "my_data_docs_site"
base_directory = "uncommitted/validations/" # Relative to project_root_dir

try:
    # Define Site configuration (if needed)
    # Note: GE v1.x often handles a default site automatically, but explicit is clearer
    site_config = {
        "class_name": "SiteBuilder",
        "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
        "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": base_directory,
        },
    }
    # context.add_data_docs_site(site_name=site_name, site_config=site_config) # Use on first run
    context.update_data_docs_site(site_name=site_name, site_config=site_config) # Use on subsequent runs
    print(f"Successfully configured Data Docs Site: {site_name}")

    # Define Actions to run after the Checkpoint
    actions = [
        UpdateDataDocsAction(name="update_all_data_docs", site_names=[site_name])
    ]
    print("Successfully defined UpdateDataDocsAction.")

except Exception as e:
    print(f"Error configuring Data Docs Site or Actions: {e}")
    # We can still run the validation, but docs might not update
    actions = [] # Clear actions to prevent failure

# ==============================================================================
# Step 11: Define Checkpoint
# ==============================================================================
print("\nStep 11: Defining and Adding Checkpoint...")
checkpoint_name = "my_delta_dataframe_checkpoint"
try:
    checkpoint = gx.Checkpoint(
        name=checkpoint_name,
        validation_definitions=[validation_definition], # Use our definition from Step 8
        actions=actions,                               # Use our Action from Step 10
        result_format={"result_format": "COMPLETE"}  # Get detailed results
    )
    # Save the Checkpoint to the Context
    context.checkpoints.add_or_update(checkpoint)
    print(f"Successfully defined and saved Checkpoint: {checkpoint_name}")
except Exception as e:
    print(f"Error defining Checkpoint: {e}")
    sys.exit("Could not create Checkpoint.")

# ==============================================================================
# Step 12 & 13: Run Checkpoint and View Results
# ==============================================================================
print("\nStep 12 & 13: Running Checkpoint...")
try:
    # Define a unique Run ID
    run_id = gx.core.run_identifier.RunIdentifier(
        run_name=f"delta_validation_run_{datetime.datetime.now():%Y%m%d_%H%M%S}",
        run_time=datetime.datetime.now()
    )
    print(f"Using Run ID: {run_id.run_name}")

    # Run the Checkpoint!
    # We pass the batch_parameters defined in Step 6 here
    validation_results = checkpoint.run(
        batch_parameters=batch_parameters, 
        run_id=run_id
    )

    print("\n================ Validation Results ================")
    print(validation_results)
    print("====================================================")

    # (Optional) Automatically build and open data docs
    # print("\nBuilding and opening Data Docs...")
    # context.build_data_docs(site_names=[site_name])
    # context.open_data_docs(site_name=site_name) # This may not open a browser in some environments

    print(f"\nValidation complete. Check the Data Docs here:")
    print(f"file://{project_root_dir}/{base_directory}/index.html")

except Exception as e:
    print(f"Error running Checkpoint: {e}")
    sys.exit("Checkpoint run failed.")



ModuleNotFoundError: No module named 'great_expectations'