In [5]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, mean, stddev, abs
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
import pandas as pd

# Setup logging
logging.basicConfig(filename="C:\\Users\\jagadeesh02.TRN\\Desktop\\Project Details\\botnull.csv",
                    level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Start Spark session
spark = SparkSession.builder.appName("InteractionDataCleaning").getOrCreate()

# Define schema
schema = StructType([
    StructField("interaction_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user_query", StringType(), True),
    StructField("intent_detected", StringType(), True),
    StructField("bot_response", StringType(), True),
    StructField("response_time_ms", LongType(), True),
    StructField("prediction_accuracy_percent", DoubleType(), True),
    StructField("entity_extraction_accuracy_percent", DoubleType(), True),
    StructField("user_sentiment", StringType(), True),
    StructField("user_feedback", StringType(), True),
    StructField("conversation_success", StringType(), True)
])

# Load CSV file
try:
    df = spark.read.csv("C:\\Users\\jagadeesh02.TRN\\Desktop\\Project Details\\botnull.csv",
                        header=True, schema=schema, mode="DROPMALFORMED")
    logging.info("CSV data loaded successfully.")
except Exception as e:
    logging.error(f"Error reading dataset: {e}")
    raise e

# ---------- Data Cleaning Functions ----------

def remove_duplicates(df):
    return df.dropDuplicates()

def handle_nulls(df, strategy='drop'):
    if strategy == 'drop':
        return df.dropna()
    elif strategy == 'fill':
        return df.fillna(0)
    else:
        logging.warning(f"Unknown null handling strategy: {strategy}")
        return df

def standardize_column(df, column_name, new_type="string"):
    return df.withColumn(column_name, col(column_name).cast(new_type))

def standardize_timestamp(df, column_name, format="M/d/yyyy H:mm"):
    return df.withColumn(column_name, to_timestamp(col(column_name), format))

def log_data_quality_issues(df):
    if df.filter(df["interaction_id"].isNull()).count() > 0:
        logging.warning("There are null interaction IDs.")
    if df.filter(df["timestamp"].isNull()).count() > 0:
        logging.warning("Some timestamps could not be parsed and are NULL.")

def detect_outliers(df, column_name, threshold=3.0):
    mean_val = df.select(mean(column_name)).collect()[0][0]
    stddev_val = df.select(stddev(column_name)).collect()[0][0]

    if stddev_val == 0 or stddev_val is None:
        logging.warning(f"Standard deviation is zero or None for column {column_name}. Skipping outlier detection.")
        return df

    df = df.withColumn("z_score", (col(column_name) - mean_val) / stddev_val)
    outlier_count = df.filter(abs(col("z_score")) > threshold).count()
    logging.info(f"{outlier_count} outliers detected in column '{column_name}' using Z-score threshold {threshold}.")

    return df.filter(abs(col("z_score")) <= threshold).drop("z_score")

def generate_summary_report(initial_count, final_count):
    dropped_percentage = 100 * (initial_count - final_count) / initial_count if initial_count > 0 else 0
    logging.info(f"Initial record count: {initial_count}")
    logging.info(f"Final record count: {final_count}")
    logging.info(f"Percentage of records dropped: {dropped_percentage:.2f}%")

# ---------- Cleaning Pipeline ----------

initial_count = df.count()
df_cleaned = remove_duplicates(df)
df_cleaned = handle_nulls(df_cleaned, strategy='drop')
df_cleaned = standardize_timestamp(df_cleaned, "timestamp", format="M/d/yyyy H:mm")
df_cleaned = standardize_column(df_cleaned, "interaction_id", "string")
log_data_quality_issues(df_cleaned)
df_cleaned = detect_outliers(df_cleaned, "response_time_ms", threshold=2.5)
final_count = df_cleaned.count()

generate_summary_report(initial_count, final_count)

# ---------- Export Cleaned Data to CSV (Sorted by interaction_id) ----------

try:
    df_cleaned_sorted = df_cleaned.orderBy(col("interaction_id").cast("int"))
    df_cleaned_sorted.show(truncate=False)
    df_cleaned_pandas = df_cleaned_sorted.toPandas()
    df_cleaned_pandas.to_csv("C:\\Users\\jagadeesh02.TRN\\Desktop\\Project Details\\botnull_cleaned.csv", index=False)
    logging.info("Cleaned and sorted interaction data saved to CSV successfully.")
except Exception as e:
    logging.error(f"Error exporting cleaned data: {e}")
    raise e


+--------------+-------------------+---------------------------+--------------------+-----------------------------------------------------+----------------+---------------------------+----------------------------------+--------------+-----------------+--------------------+
|interaction_id|timestamp          |user_query                 |intent_detected     |bot_response                                         |response_time_ms|prediction_accuracy_percent|entity_extraction_accuracy_percent|user_sentiment|user_feedback    |conversation_success|
+--------------+-------------------+---------------------------+--------------------+-----------------------------------------------------+----------------+---------------------------+----------------------------------+--------------+-----------------+--------------------+
|1             |2025-05-01 09:00:00|Buy BTC now?               |Buy Order           |BTC is bullish, buying recommended.                  |150             |95.0                  