In [None]:
import pickle
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel

In [None]:
def ground_truth(input_df):
    # Extract the "isfraud" column from the input DataFrame
    return input_df["isfraud"]

In [None]:
def preprocess_data(input_df, categorical_column):
    # Step 1: StringIndexer to convert categorical values to numerical indices
    indexer = StringIndexer(inputCol=categorical_column, outputCol="categoryIndex")
    
    # Fit and transform the indexer on your DataFrame
    indexed_df = indexer.fit(input_df).transform(input_df)

    # Step 2: OneHotEncoder to perform one-hot encoding
    encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="encoded_" + categorical_column)
    
    # Fit and transform the encoder on your DataFrame
    encoded_df = encoder.transform(indexed_df)
    
    # Drop unnecessary columns
    encoded_df = encoded_df.drop(categorical_column)
    encoded_df = encoded_df.drop("categoryIndex")

    # Define the feature columns (excluding the target column "isfraud")
    feature_columns = [col for col in encoded_df.columns if col != "isfraud"]
    
    # Assemble feature vector
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    assembled_df = assembler.transform(encoded_df)

    return assembled_df

In [None]:
# Load the saved model
loaded_model = PipelineModel.load(model_file_path)

In [None]:
#APP
spark = SparkSession.builder.appName("spark-predictions").enableHiveSupport().getOrCreate()
df = spark.sql("SELECT * FROM fraud_project")
categorical_column = "transaction_type"

#Preprocessed DataFrame
preprocessed_df = preprocess_data(df, categorical_column)

# Make predictions using the loaded model
predictions = loaded_model.transform(preprocessed_df)

#Ground Truth
ground_truth = ground_truth(df)

In [None]:
# Create DataFrames for predictions and ground truth
predictions_df = spark.createDataFrame(predictions, ["predictions"])
ground_truth_df = spark.createDataFrame(ground_truth, ["ground_truth"])

# Add auto-incrementing ID columns
predictions_df = predictions_df.withColumn("ID", monotonically_increasing_id())
ground_truth_df = ground_truth_df.withColumn("ID", monotonically_increasing_id())

# Join predictions and ground truth on the auto-incrementing ID
result_df = predictions_df.join(ground_truth_df, "ID", "inner")

# Show the resulting DataFrame
result_df.show()

# Save the DataFrame as a Hive table
result_df.write.mode("overwrite").saveAsTable("predictions_table")