In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Bulk file Handling").getOrCreate()

credit_df = spark.read.csv("/FileStore/tables/credit_card.csv/creditcard.csv", header=True, inferSchema=True)

deduplicated_df = credit_df.dropDuplicates()
df=deduplicated_df

# to Check the shape of the dataset (number of rows and columns)
num_rows = df.count()
num_columns = len(df.columns)
print(f"Number of rows :{num_rows}")
print(f"Number of columns : {num_columns}")
# show only first few rows
df.show(5)
# describes data provides descriptive statistics of data 
df.describe().show()
# print the schema 
df.printSchema()

# Checking for null or missing values in each column
missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_values.show()

df_cleaned = df.dropna()

# calculate mean and standard deviation before adding new column
mean_std = df.select(mean("Amount").alias("mean"),          # getting mean of Amount
                     stddev("Amount").alias("stddev")     # getting std deviation 
                    ).collect()
                     
mean = mean_std[0]["mean"]          #Extract the mean value from the result
stddev = mean_std[0]["stddev"]      # Extract stddev from result

# We can subtract the mean from each Amount value and divide the result by the stddev. This will give us the normalized value, which can be used in new_column
# adding new column "NormalizedAmount" ,that contains the values of the "Amount" column normalized (i.e., subtract the mean and divide by the standard deviation).

df = df.withColumn("NormalizedAmount" , col("Amount")-mean/stddev )

df.show()

# Create a new column "AmountLog" that contains the natural logarithm of the "Amount" column plus one (to handle zero amounts).
# This transformation can help in reducing the impact of extreme values or outliers.

df = df.withColumn("AmountLog" , log(col("Amount")+1))
df.show()

# File Conversion: Convert the cleaned and transformed DataFrame into a different file format (e.g., Parquet). storing in local machine
# df.write.mode("append").parquet("C:/Users/saikumar/Downloads/output_parquet")
df.write.mode("append").parquet("file:///C:/Users/saikumar/Downloads/output_parquet")


 # Registered the DataFrame as a temporary SQL view

df.createOrReplaceTempView("credit_table")

print(type(df))
df.printSchema()

#the average normalized amount in fraudulent transactions
avg_result = spark.sql("""
    SELECT AVG(NormalizedAmount) AS avg_normalized_amount
    FROM credit_table
    WHERE Class = 1
""")

avg_result.show()

# What is the maximum normalized amount in non-fraudulent transactions
max_result = spark.sql("""
    select MAX(NormalizedAmount) AS Max_normalized_amount
    FROM credit_table
    where class=0
""")
max_result.show()

avg_amount_log = spark.sql("""
    select class,AVG(AmountLog) AS avg_amount_log
    FROM credit_table
    group by Class
""")
avg_amount_log.show()


Number of rows :283726
Number of columns : 31
+----+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|