In [5]:
# session creation

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CreditCards").getOrCreate()


24/12/21 22:58:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [65]:
# Reading csv files, de duplicate

# Wildcard loading , creates single DF from multiple files, loads data in parallel.
df = spark.read.format('csv').option("header", "true").option('InferSchema',True).load('/home/ranjith/Downloads/archive/*csv')

print('count before removing duplicates',df.count())

# removing duplicate rows
df = df.dropDuplicates()

print('count after removing duplicates',df.count())
df.show(1)

                                                                                

count before removing duplicates 569614


                                                                                

count after removing duplicates 283726




+----+------------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+---------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------+-----+
|Time|                V1|               V2|              V3|               V4|               V5|              V6|               V7|             V8|                V9|               V10|              V11|              V12|               V13|              V14|              V15|               V16|               V17|               V18|               V19|               V20|               V21|              V22|               V23|                V24|               

                                                                                

In [20]:
# Data Exploration
# Size of the DF

rows = df.count()
cols = len(df.columns)

print(f"DF having {rows} rows and {cols} columns")



DF having 283726 rows and 31 columns


                                                                                

In [31]:
# checking for null / missing values
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

null_counts.show()



+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|Time| V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|Amount|Class|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+
|   0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|     0|    0|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+------+-----+



                                                                                

In [66]:
# Statistics 
from pyspark.sql.functions import col, mean, stddev, min, max

# column wise statistics
df.describe().show()

# Amount column statistics
df.select(
    mean("Amount").alias("Mean_Amount"),
    min("Amount").alias("Min_Amount"),
    max("Amount").alias("Max_Amount"),
    stddev("Amount").alias("StdDev_Amount")
).show()


                                                                                

+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V14|                 V15|



+-----------------+----------+----------+------------------+
|      Mean_Amount|Min_Amount|Max_Amount|     StdDev_Amount|
+-----------------+----------+----------+------------------+
|88.47268731101789|       0.0|  25691.16|250.39943711577322|
+-----------------+----------+----------+------------------+



                                                                                

In [48]:
# median of the columns using percentile_approx function (this is suitable for large data).

from pyspark.sql.functions import col,expr

#function to calculate median of each column.
def calculate_median_spark_native(df, column):
    # null values filtered
    median = df.filter(col(column).isNotNull()).select(
        expr(f"percentile_approx({column}, 0.5)").alias("median")
    ).collect()[0]["median"]
    return median

# creating empty dictionary
median_results = {}

# Calculate median for every column
for column in df.columns:
    median_results[column] = calculate_median_spark_native(df, column)

# Output the median values for each column
for column, median in median_results.items():
    print(f"Median for {column}: {median}")


[Stage 567:>                                                        (0 + 4) / 4]

Median for Time: 84681.0
Median for V1: 0.0201435755087573
Median for V2: 0.0637941997421073
Median for V3: 0.179871147511258
Median for V4: -0.0224219695555284
Median for V5: -0.0536312510232358
Median for V6: -0.275358573562355
Median for V7: 0.0407401875880306
Median for V8: 0.0218575949616256
Median for V9: -0.0525956513955632
Median for V10: -0.0933317436055039
Median for V11: -0.0323385470334912
Median for V12: 0.138996143050385
Median for V13: -0.0129937551085427
Median for V14: 0.0500901541828535
Median for V15: 0.049184212432686
Median for V16: 0.0670268080034959
Median for V17: -0.0659678076478633
Median for V18: -0.0022405035283073
Median for V19: 0.00330335625186285
Median for V20: -0.0623879623178263
Median for V21: -0.0295078434834623
Median for V22: 0.00666190163775685
Median for V23: -0.0111704465132397
Median for V24: 0.0409487542693962
Median for V25: 0.0162138522088198
Median for V26: -0.0522052302313139
Median for V27: 0.00145992468138233
Median for V28: 0.011276890

                                                                                

In [67]:
# mode of columns
from pyspark.sql.functions import col,sum

# creating empty list
mod_col_data = []

# looping through every column to get the mode of that column
for col_name in df.columns:
    mode_df = df.groupBy(col(col_name)).count().orderBy('count',ascending=False).limit(1)
    mode = mode_df.collect()[0][0]
    mod_col_data.append((col_name,mode))

print(mod_col_data)



[('Time', 3767.0), ('V1', 2.05579700630039), ('V2', 0.166975019545401), ('V3', 0.488305742562781), ('V4', 0.6353219207244), ('V5', -0.562776680773863), ('V6', -1.01107261632698), ('V7', 0.0149526614685896), ('V8', -0.160210863301812), ('V9', 0.608605870267216), ('V10', -0.0445745893804268), ('V11', -0.35674901847752), ('V12', 0.350563573253678), ('V13', -0.517759694198053), ('V14', 0.690971618395625), ('V15', 1.2752570390935), ('V16', 0.342469754110769), ('V17', -0.601956802828445), ('V18', -0.0526401462570187), ('V19', -0.330590448442944), ('V20', -0.180370118559693), ('V21', 0.269764951361357), ('V22', -0.816263763157847), ('V23', 0.140304302014326), ('V24', 0.726211883811499), ('V25', 0.366624307004913), ('V26', -0.398827514959463), ('V27', 0.0277351215052822), ('V28', 0.0184945729704665), ('Amount', 1.0), ('Class', 0)]


                                                                                

In [59]:
# Data cleaning
from pyspark.sql.functions import col,sum

#dropping rows having nulls
df_cleaned = df.dropna()

# removing duplicate records
df_cleaned = df_cleaned.dropDuplicates()

df_cleaned.show(1)



+----+------------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+---------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------+-----+
|Time|                V1|               V2|              V3|               V4|               V5|              V6|               V7|             V8|                V9|               V10|              V11|              V12|               V13|              V14|              V15|               V16|               V17|               V18|               V19|               V20|               V21|              V22|               V23|                V24|               

                                                                                

In [60]:
# Data Transformation: NormalizedAmount column added.
from pyspark.sql.functions import col

# Amount column statistics
df_temp = df_cleaned.select(
    mean("Amount").alias("Mean_Amount"),
    min("Amount").alias("Min_Amount"),
    max("Amount").alias("Max_Amount"),
    stddev("Amount").alias("StdDev_Amount")
)

# extracting mean from above statistics df
Mean_Amount = df_temp.select('Mean_Amount').collect()[0][0]

# extracting standard deviation from above statistics df
StdDev_Amount = df_temp.select('StdDev_Amount').collect()[0][0]

# adding new column "NormalizedAmount" 
df_col_added = df_cleaned.withColumn("NormalizedAmount",(col('Amount')-Mean_Amount)/StdDev_Amount)

# showing sample data
df_col_added.select('Amount','NormalizedAmount').show(1)

[Stage 651:>                                                        (0 + 4) / 4]

+------+--------------------+
|Amount|    NormalizedAmount|
+------+--------------------+
|  6.14|-0.32880540092009486|
+------+--------------------+
only showing top 1 row



                                                                                

In [61]:
# Mathematical Transformation: 
# adding column 

from pyspark.sql.functions import log, col

# adding column "AmountLog"
df_col2_added = df_col_added.withColumn('AmountLog',log(col('Amount')+1))

# showing sample data
df_col2_added.select('Amount','AmountLog').show(1)




+------+-----------------+
|Amount|        AmountLog|
+------+-----------------+
|  6.14|1.965712776351493|
+------+-----------------+
only showing top 1 row



                                                                                

In [62]:
# File Conversion

# writing as parquet format to compress it and also to improve query performance.
df_col2_added.write.format("parquet").mode('overwrite').save('/home/ranjith/creditCardResultDf')

# reading the file back to spark df.
parquet_df = spark.read.format('parquet').load('/home/ranjith/creditCardResultDf')

parquet_df.show(1)

                                                                                

+----+------------------+----------------+----------------+-----------------+-----------------+------------------+---------------+------------------+-----------------+----------------+----------------+-----------------+----------------+-------------------+----------------+-----------------+------------------+-----------------+----------------+-----------------+-------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+------------------+------+-----+--------------------+-----------------+
|Time|                V1|              V2|              V3|               V4|               V5|                V6|             V7|                V8|               V9|             V10|             V11|              V12|             V13|                V14|             V15|              V16|               V17|              V18|             V19|              V20|                V21|              V22|               V23|           

In [64]:
# SQL Querying: 

# creating temporary view to perform sql operations.
parquet_df.createOrReplaceTempView("parquet_df_view")

# What is the average normalized amount in fraudulent transactions?

spark.sql('''
SELECT AVG(NormalizedAmount) AS avg_normalized_amount
FROM PARQUET_DF_VIEW
WHERE CLASS = '1'
''').show()

# What is the maximum normalized amount in non-fraudulent transactions?

spark.sql('''
SELECT MAX(NormalizedAmount) AS max_normalized_amount
FROM PARQUET_DF_VIEW
WHERE CLASS = '0'
''').show()


+---------------------+
|avg_normalized_amount|
+---------------------+
|  0.14137081760983125|
+---------------------+

+---------------------+
|max_normalized_amount|
+---------------------+
|   102.24738365067279|
+---------------------+

