### 開發與測試 Notebook

本 Notebook 用於交互式地開發、測試和驗證 `data_pipeline` 中的核心 ETL 邏輯。

In [None]:
# Cell 1: 魔法命令和導入
%load_ext autoreload
%autoreload 2


import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

# 導入我們需要測試的核心任務





def get_spark_session_connect() -> SparkSession:
    """
    这是我们【新】的 get_spark_session 实现，使用 Spark Connect。
    """
    print("--- ✅ 正在调用 x.py 中【新定义的】get_spark_session_connect ---")
    SPARK_CONNECT_URL = os.getenv("SPARK_CONNECT_URL", "sc://localhost:31002")
    print(f"      尝试连接到: {SPARK_CONNECT_URL}")
    try:
        spark = SparkSession.builder.remote(SPARK_CONNECT_URL).getOrCreate()
        print(f"      ✅ 成功连接到 Spark Connect Server, 版本: {spark.version}")
        return spark
    except Exception as e:
        print(f"      ❌ 连接 Spark Connect Server 失败: {e}")
        raise
# import src.pipelines.data_pipeline as data_pipeline
# data_pipeline.get_spark_session = get_spark_session_connect

spark = get_spark_session_connect()

print("開發環境準備就緒！")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
--- ✅ 正在调用 x.py 中【新定义的】get_spark_session_connect ---
      尝试连接到: sc://localhost:15002
      ✅ 成功连接到 Spark Connect Server, 版本: 4.0.0
開發環境準備就緒！


In [3]:
# Cell 3: 運行您的核心任務進行測試
from src.pipelines.data_pipeline import process_features_for_single_day
if spark:
    try:
        print("\nStart testing process_features_for_single_day...")

        test_date = "2019-01-15"
        

        process_features_for_single_day.fn(
            target_date_str=test_date,
            feature_groups_to_process=["user_rolling_ratings"],
            spark=spark

        )
        
        print("\n 函數執行成功！")

    except Exception as e:
        print(f"\n 函數執行失敗: {e}")
        # 在這裡您可以進行斷點調試


  user_entity = Entity(name="user_id", description="The user entity for MovieLens")
  movie_entity = Entity(name="movie_id", description="The movie entity for MovieLens")



Start testing process_features_for_single_day...

--- [DEBUG] Schema before applying window function ---
root
 |-- user_id: integer (nullable = true)
 |-- timestamp: date (nullable = false)
 |-- timestamp_long: long (nullable = false)
 |-- rating: float (nullable = true)


--- [DEBUG] Sample data before applying window function (first 5 rows) ---
+-------+----------+--------------+------+
|user_id|timestamp |timestamp_long|rating|
+-------+----------+--------------+------+
|33019  |2019-01-15|1547510400    |5.0   |
|33019  |2019-01-15|1547510400    |4.5   |
|33019  |2019-01-15|1547510400    |5.0   |
|33019  |2019-01-15|1547510400    |5.0   |
|33019  |2019-01-15|1547510400    |4.0   |
+-------+----------+--------------+------+
only showing top 5 rows

 函數執行成功！


In [4]:
# Cell 4: 驗證結果
# 函數執行成功後，您可以在這裡用 Spark SQL 來檢查輸出是否符合預期

if spark:
    try:
        print("\n🔍 驗證輸出數據...")
        output_table = "feature_store.user_rolling_ratings_historical"
        result_df = spark.sql(f"""
            SELECT * FROM {output_table} 
            WHERE timestamp = to_date('{test_date}', 'yyyy-MM-dd') 
            ORDER BY user_id
            LIMIT 10
        """)

        result_df.show()
        
    except Exception as e:
        print(f"❌ 驗證失敗: {e}")
    finally:
        # 在 Notebook 的末尾關閉會話
        print("\n shutting down spark session...")
        spark.stop()



🔍 驗證輸出數據...
+-------+-------------------+---------------------+----------+
|user_id|avg_rating_past_30d|rating_count_past_30d| timestamp|
+-------+-------------------+---------------------+----------+
|  33019|                4.7|                    5|2019-01-15|
|  33183|               NULL|                    0|2019-01-15|
|  33294|  3.647727272727273|                   44|2019-01-15|
+-------+-------------------+---------------------+----------+


 shutting down spark session...
