In [None]:
import mlflow
import mlflow.sklearn
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from sklearn.metrics import mean_squared_error
import os

# SparkSessionの開始
spark = SparkSession.builder.appName("PredictionAndReportJob").getOrCreate()

# Delta Lake形式の新しいデータを読み込み
new_data_delta_path = "/Volumes/databricks_test_ws/main/test-volume/data/new/diabetes"
new_data_spark_df = spark.read.format("delta").load(new_data_delta_path)

# Spark DataFrameをPandas DataFrameに変換
new_data_pdf = new_data_spark_df.toPandas()

# 特徴量の抽出
X_new = new_data_pdf.drop("target", axis=1, errors='ignore')  # 予測対象のデータにはtargetがない可能性があるのでerrors='ignore'を追加

# MLflowに保存されたモデルをロード
model_uri = "models:/random-forest-model/Production"  # モデルのURIを指定
rf_model = mlflow.sklearn.load_model(model_uri)

# 新しいデータに対する予測
predictions = rf_model.predict(X_new)

# 予測結果を保存
new_data_pdf['predictions'] = predictions
predictions_delta_path = "/Volumes/databricks_test_ws/main/test-volume/data/predictions/diabetes"
predictions_spark_df = spark.createDataFrame(new_data_pdf)
predictions_spark_df.write.format("delta").mode("overwrite").save(predictions_delta_path)

# 予測結果の評価
if 'target' in new_data_pdf.columns:
    y_true = new_data_pdf['target']
    y_pred = new_data_pdf['predictions']
    mse = mean_squared_error(y_true, y_pred)
    rmse = mse ** 0.5
    print(f"Mean Squared Error: {mse}")
    print(f"Root Mean Squared Error: {rmse}")

    # 評価結果をMLflowにログ
    with mlflow.start_run():
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("rmse", rmse)

        # 残差プロットの作成
        residuals = y_pred - y_true
        plt.figure(figsize=(10, 6))
        plt.scatter(range(len(residuals)), residuals, color='blue', s=10)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel("Observation", fontsize=14)
        plt.ylabel("Residual", fontsize=14)
        plt.title("Residuals", fontsize=16)
        plt.grid(True)
        plt.tight_layout()

        # 残差プロットの保存
        output_path = "/dbfs/Volumes/databricks_test_ws/main/test-volume/results"
        os.makedirs(output_path, exist_ok=True)
        residuals_plot_path = f"{output_path}/residuals_plot_report.png"
        plt.savefig(residuals_plot_path, dpi=300)
        mlflow.log_artifact(residuals_plot_path)

        # 真値と予測値の散布図の作成
        plt.figure(figsize=(10, 6))
        plt.scatter(y_true, y_pred, color='blue', s=10)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
        plt.xlabel("Actual", fontsize=14)
        plt.ylabel("Predicted", fontsize=14)
        plt.title("Actual vs Predicted", fontsize=16)
        plt.grid(True)
        plt.tight_layout()

        # 散布図の保存
        scatter_plot_path = f"{output_path}/actual_vs_predicted_plot.png"
        plt.savefig(scatter_plot_path, dpi=300)
        mlflow.log_artifact(scatter_plot_path)

else:
    print("Target column not found in predictions data. Unable to evaluate performance.")

# 予測結果の一部を表示
predictions_spark_df.show()
