In [2]:
import pickle
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel

In [4]:
def preprocess_data(input_df, categorical_column):
    
    #Selecting the features that we will be using in our model
    selected_columns = ["id","type","amount", "oldbalanceorg", "newbalanceorig", "oldbalancedest", "newbalancedest", "isfraud"]
    df_new = input_df.select(*selected_columns)
    #df_new.show()
    
    # Step 1: StringIndexer to convert categorical values to numerical indices
    indexer = StringIndexer(inputCol=categorical_column, outputCol="categoryIndex")
    
    # Fit and transform the indexer on your DataFrame
    indexed_df = indexer.fit(df_new).transform(df_new)

    # Step 2: OneHotEncoder to perform one-hot encoding
    encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="encoded_" + categorical_column)
    
    # Fit and transform the encoder on your DataFrame
    encoded_df = encoder.transform(indexed_df)
    
    # Drop unnecessary columns
    encoded_df = encoded_df.drop(categorical_column)
    encoded_df = encoded_df.drop("categoryIndex")
    
    # Define the feature columns (excluding the target column "isfraud")
    feature_columns = [col for col in encoded_df.columns if col != "isfraud"]
    
    # Assemble feature vector
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    assembled_df = assembler.transform(encoded_df)

    return assembled_df

In [5]:
def create_predictions_table(spark, predictions):
    result_df = predictions.select("id", "isfraud", col("prediction").cast("int").alias("prediction"))
    #result_df.write.mode("overwrite").saveAsTable(table_name)
    return result_df

In [None]:
def save_predictions_to_hive(predictions, database_name, table_name, spark):
    
    # Select the required columns and cast "prediction" as an integer
    result_df = predictions.select("id", "isfraud", predictions["prediction"].cast("int"))

    # Create a temporary view for the DataFrame
    result_df.createOrReplaceTempView("temp_table")
    
    # Use the specified database
    spark.sql(f"USE {database_name}")

    # Create or replace the Hive table with explicit column definitions
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INT,
            actua_values INT,
            predictions INT
        )
    """)
    
    # Insert the data into the Hive table (use INSERT OVERWRITE if needed)
    spark.sql(f"INSERT INTO {table_name} SELECT * FROM temp_table")

    print(f"Table '{table_name}' created successfully with predictions.")

In [18]:
# DATA FROM KAFKA
df_predict = spark.sql("SELECT * FROM fraud_project.lastapifraud")
categorical_column = "type"




In [None]:
# PREPROCESSING INCOMING DATA
preprocessed_df = preprocess_data(df_predict, categorical_column)

In [None]:
# PREDICTIONS
predictions = model2.transform(preprocessed_df)
predictions.printSchema()

In [None]:
# Create Predictions Table
result_table = create_predictions_table(spark, predictions)

# Show the resulting DataFrame if needed
result_table.show()

In [None]:
#Amount of columns and rows
num_columns = len(result_table.columns)
num_rows= result_table.count()
print(f"Total Amount of Rows: {num_rows}")

In [None]:
#SAving PREDICTIONS TO HIVE
database_name = "fraud_project"
table_name = "predictions_table"
save_predictions_to_hive(predictions, database_name, table_name, spark)