In [142]:
#Importing Necessary Libraries and Creating the Spark Application
from pyspark.sql import SparkSession,functions as func

sparkApp = SparkSession.builder \
    .appName("Data Transformation") \
    .getOrCreate()


In [122]:
df = sparkApp.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/bigdataproj") \
    .option("dbtable", "transaction") \
    .option("user", "root") \
    .option("password", "") \
    .load()

In [123]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- accountNumber: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- creditLimit: decimal(10,2) (nullable = true)
 |-- availableMoney: decimal(10,2) (nullable = true)
 |-- currentBalance: decimal(10,2) (nullable = true)
 |-- transactionDateTime: timestamp (nullable = true)
 |-- transactionAmount: decimal(10,2) (nullable = true)
 |-- merchantName: string (nullable = true)
 |-- acqCountry: string (nullable = true)
 |-- merchantCountryCode: string (nullable = true)
 |-- posEntryMode: string (nullable = true)
 |-- posConditionCode: string (nullable = true)
 |-- merchantCategoryCode: string (nullable = true)
 |-- cardPresent: boolean (nullable = true)
 |-- currentExpDate: string (nullable = true)
 |-- accountOpenDate: date (nullable = true)
 |-- dateOfLastAddressChange: date (nullable = true)
 |-- cardCVV: string (nullable = true)
 |-- enteredCVV: string (nullable = true)
 |-- cardLast4Digits: string (nullable = true)
 |-- trans

In [124]:
df.show(10)

+---+-------------+----------+-----------+--------------+--------------+-------------------+-----------------+--------------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+----------+---------------+---------------+----------+------------+-------------+-----------+-------------+----------------+------------------------+-------+
| id|accountNumber|customerId|creditLimit|availableMoney|currentBalance|transactionDateTime|transactionAmount|        merchantName|acqCountry|merchantCountryCode|posEntryMode|posConditionCode|merchantCategoryCode|cardPresent|currentExpDate|accountOpenDate|dateOfLastAddressChange|cardCVV|enteredCVV|cardLast4Digits|transactionType|echoBuffer|merchantCity|merchantState|merchantZip|posOnPremises|recurringAuthInd|expirationDateKeyInMatch|isFraud|
+---+-------------+----------+-----------+--------------+--------------+-------------------+----------------

In [125]:
#Columns with only NULL Values, Does not contribute to the EDA process due to Data Loss
COLUMNS_TO_DROP = ['echoBuffer','merchantState','merchantZip','recurringAuthInd','posOnPremises','merchantCity']

df = df.drop(*COLUMNS_TO_DROP)

In [126]:
#Schema of the dataset
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- accountNumber: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- creditLimit: decimal(10,2) (nullable = true)
 |-- availableMoney: decimal(10,2) (nullable = true)
 |-- currentBalance: decimal(10,2) (nullable = true)
 |-- transactionDateTime: timestamp (nullable = true)
 |-- transactionAmount: decimal(10,2) (nullable = true)
 |-- merchantName: string (nullable = true)
 |-- acqCountry: string (nullable = true)
 |-- merchantCountryCode: string (nullable = true)
 |-- posEntryMode: string (nullable = true)
 |-- posConditionCode: string (nullable = true)
 |-- merchantCategoryCode: string (nullable = true)
 |-- cardPresent: boolean (nullable = true)
 |-- currentExpDate: string (nullable = true)
 |-- accountOpenDate: date (nullable = true)
 |-- dateOfLastAddressChange: date (nullable = true)
 |-- cardCVV: string (nullable = true)
 |-- enteredCVV: string (nullable = true)
 |-- cardLast4Digits: string (nullable = true)
 |-- trans

In [127]:
df.show(5)

+---+-------------+----------+-----------+--------------+--------------+-------------------+-----------------+-------------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+----------+---------------+---------------+------------------------+-------+
| id|accountNumber|customerId|creditLimit|availableMoney|currentBalance|transactionDateTime|transactionAmount|       merchantName|acqCountry|merchantCountryCode|posEntryMode|posConditionCode|merchantCategoryCode|cardPresent|currentExpDate|accountOpenDate|dateOfLastAddressChange|cardCVV|enteredCVV|cardLast4Digits|transactionType|expirationDateKeyInMatch|isFraud|
+---+-------------+----------+-----------+--------------+--------------+-------------------+-----------------+-------------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------

In [128]:
#Dropping customerId as it is equivalent to accountNumber
df=df.drop('customerId')

df.show(5)

+---+-------------+-----------+--------------+--------------+-------------------+-----------------+-------------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+----------+---------------+---------------+------------------------+-------+
| id|accountNumber|creditLimit|availableMoney|currentBalance|transactionDateTime|transactionAmount|       merchantName|acqCountry|merchantCountryCode|posEntryMode|posConditionCode|merchantCategoryCode|cardPresent|currentExpDate|accountOpenDate|dateOfLastAddressChange|cardCVV|enteredCVV|cardLast4Digits|transactionType|expirationDateKeyInMatch|isFraud|
+---+-------------+-----------+--------------+--------------+-------------------+-----------------+-------------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+----------+-------

In [129]:
for col in df.columns:
    numOfNullValues = df.filter(func.col(col).isNull()).count()
    print(f'Number of null values in column {col} is {numOfNullValues} \n')

Number of null values in column id is 0 

Number of null values in column accountNumber is 0 

Number of null values in column creditLimit is 0 

Number of null values in column availableMoney is 0 

Number of null values in column currentBalance is 0 

Number of null values in column transactionDateTime is 0 

Number of null values in column transactionAmount is 0 

Number of null values in column merchantName is 0 

Number of null values in column acqCountry is 0 

Number of null values in column merchantCountryCode is 0 

Number of null values in column posEntryMode is 0 

Number of null values in column posConditionCode is 0 

Number of null values in column merchantCategoryCode is 0 

Number of null values in column cardPresent is 0 

Number of null values in column currentExpDate is 0 

Number of null values in column accountOpenDate is 0 

Number of null values in column dateOfLastAddressChange is 0 

Number of null values in column cardCVV is 0 

Number of null values in column

In [130]:
#Creating new features to faciliate analysis
df = df.withColumn('transactionDate',func.col('transactionDateTime').substr(0,10))

df = df.withColumn('transactionTime',func.col('transactionDateTime').substr(11,19))

df = df.drop('transactionDateTime')

In [151]:
#Ordering Columns
ORDERED_COLS = ["id", "accountNumber", "creditLimit", "availableMoney", "currentBalance", "transactionDate","transactionTime", "transactionAmount", "merchantName", "acqCountry", "merchantCountryCode", "posEntryMode", "posConditionCode", "merchantCategoryCode", "cardPresent", "currentExpDate", "accountOpenDate", "dateOfLastAddressChange", "cardCVV", "enteredCVV", "cardLast4Digits", "transactionType", "expirationDateKeyInMatch", "isFraud"]

orderedDF = df.select(*ORDERED_COLS)

orderedDF.show(1)

#Creating a new DF for further transformations
transformationDF = orderedDF

+---+-------------+-----------+--------------+--------------+---------------+---------------+-----------------+------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+----------+---------------+---------------+------------------------+-------+
| id|accountNumber|creditLimit|availableMoney|currentBalance|transactionDate|transactionTime|transactionAmount|merchantName|acqCountry|merchantCountryCode|posEntryMode|posConditionCode|merchantCategoryCode|cardPresent|currentExpDate|accountOpenDate|dateOfLastAddressChange|cardCVV|enteredCVV|cardLast4Digits|transactionType|expirationDateKeyInMatch|isFraud|
+---+-------------+-----------+--------------+--------------+---------------+---------------+-----------------+------------+----------+-------------------+------------+----------------+--------------------+-----------+--------------+---------------+-----------------------+-------+---

In [163]:
#Handling NULL Values in 'acqCountry' column by filling them using Mode
transformationDF.groupBy('acqCountry').count().show()

#Acquiring the mode value and storing it in a variable
acqCountryMode = transformationDF.select(func.mode("acqCountry")).first()[0]

#Filling NULL values in the 'acqCountry' using the Mode Value
finalDF = transformationDF.fillna({'acqCountry':acqCountryMode})

+----------+-----+
|acqCountry|count|
+----------+-----+
|        CA|  156|
|        ME|  208|
|        US|49258|
|          |  264|
|        PR|  113|
+----------+-----+



In [156]:
# Write the results back to MySQL
finalDF.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/bigdataproj") \
    .option("dbtable", "fraudtransactions") \
    .option("user", "root") \
    .option("password", "") \
    .mode("overwrite")\
    .save()
print("Data inserted into database successfully")

Data inserted into database successfully


In [None]:
sparkApp.stop()