### CLEANED_JOINED_TRANSACTION/CUSTOMERS/MERCHANT

In [24]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, IntegerType, TimestampType


StatementMeta(cfraudsparkpool, 69, 2, Finished, Available, Finished)

##### User Defined Schema For Customer

In [25]:

customer_schema = StructType([
    StructField("trans_num", StringType(), True),
    StructField("cc_num", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("job", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", IntegerType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("firstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("age", IntegerType(), True)
])
cus_df = spark.read.csv(
    "abfss://clean-data-creditcard@ccfrauddatalake.dfs.core.windows.net/cleansed_files_csv/customers/",
    header=True,
    schema=customer_schema
)


StatementMeta(cfraudsparkpool, 69, 3, Finished, Available, Finished)

##### User Defined Schema For Merchant

In [26]:

merchant_schema = StructType([
    StructField("trans_num", StringType(), True),
    StructField("category", StringType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("merch_zipcode", IntegerType(), True),
    StructField("merchant_clean", StringType(), True)
])
merch_df = spark.read.csv(
    "abfss://clean-data-creditcard@ccfrauddatalake.dfs.core.windows.net/cleansed_files_csv/merchant/",
    header=True,
    schema=merchant_schema
)


StatementMeta(cfraudsparkpool, 69, 4, Finished, Available, Finished)

##### User Defined Schema for Transaction

In [27]:


schema = StructType([
    StructField("trans_num", StringType(), True),
    StructField("trans_date_trans_time", TimestampType(), True),  # enforce timestamp
    StructField("cc_num", StringType(), True),
    StructField("amt", DoubleType(), True),
    StructField("unix_time", LongType(), True),
    StructField("is_fraud", IntegerType(), True),
    StructField("amt_corrected", DoubleType(), True)
])

tran_df = spark.read.csv(
    "abfss://clean-data-creditcard@ccfrauddatalake.dfs.core.windows.net/cleansed_files_csv/Transaction/",
    header=True,
    schema=schema
)

StatementMeta(cfraudsparkpool, 69, 5, Finished, Available, Finished)

##### Joining merchant with customer 

In [28]:
cust_merch_df = cus_df.join(merch_df , on="trans_num" , how= "inner")

StatementMeta(cfraudsparkpool, 69, 6, Finished, Available, Finished)

In [29]:
cust_merch_df.show(2)

StatementMeta(cfraudsparkpool, 69, 7, Finished, Available, Finished)

+--------------------+----------------+------+----------+--------------------+-----------------+---------+-----+-----+-------+---------+--------+---------+--------+---+-------------+---------+----------+-------------+--------------+
|           trans_num|          cc_num|gender|       dob|                 job|           street|     city|state|  zip|    lat|     long|city_pop|firstName|LastName|age|     category|merch_lat|merch_long|merch_zipcode|merchant_clean|
+--------------------+----------------+------+----------+--------------------+-----------------+---------+-----+-----+-------+---------+--------+---------+--------+---+-------------+---------+----------+-------------+--------------+
|000088fe170f044d2...|6011679934075347|     F|1974-04-16|    Public librarian|  5517 Stacy Land|     Jelm|   WY|82063|41.0539|-106.0763|     100| Jennifer|Gonzalez| 51|  food_dining|    40.07|    -105.1|        80516|  fraud_Deckow|
|0000dfd04a508bc2b...|4908846471916297|     F|1992-07-24|Radiographe

In [30]:
cust_merch_df.count()

StatementMeta(cfraudsparkpool, 69, 8, Finished, Available, Finished)

1100555

##### Removing the duplicate  column credit card number mentioned as cc_num

In [31]:
tran_df = tran_df.drop("cc_num")

StatementMeta(cfraudsparkpool, 69, 9, Finished, Available, Finished)

##### Joined previously obtained dataframe with transactions on trans_num

In [32]:
cus_mer_tran_df = cust_merch_df.join(tran_df , on="trans_num" , how='inner')

StatementMeta(cfraudsparkpool, 69, 10, Finished, Available, Finished)

##### Count of records after joining 

In [33]:
print("Number of Row inside the Data:",cus_mer_tran_df.count())

StatementMeta(cfraudsparkpool, 69, 11, Finished, Available, Finished)

Number of Row inside the Data: 1100572


##### Schema showdown

In [34]:
cus_mer_tran_df.schema


StatementMeta(cfraudsparkpool, 69, 12, Finished, Available, Finished)

StructType([StructField('trans_num', StringType(), True), StructField('cc_num', StringType(), True), StructField('gender', StringType(), True), StructField('dob', DateType(), True), StructField('job', StringType(), True), StructField('street', StringType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('firstName', StringType(), True), StructField('LastName', StringType(), True), StructField('age', IntegerType(), True), StructField('category', StringType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('merch_zipcode', IntegerType(), True), StructField('merchant_clean', StringType(), True), StructField('trans_date_trans_time', TimestampType(), True), StructField('amt', DoubleType(), True), StructField

##### Count of total number of columns

In [35]:
# cus_mer_tran_df = cus_mer_tran_df.drop("cc_num")
print("This is Final of Column:",len(cus_mer_tran_df.schema))


StatementMeta(cfraudsparkpool, 69, 13, Finished, Available, Finished)

This is Final of Column: 25


##### Masking the credit card number

In [36]:
cus_mer_tran_df.schema

StatementMeta(cfraudsparkpool, 69, 14, Finished, Available, Finished)

StructType([StructField('trans_num', StringType(), True), StructField('cc_num', StringType(), True), StructField('gender', StringType(), True), StructField('dob', DateType(), True), StructField('job', StringType(), True), StructField('street', StringType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('firstName', StringType(), True), StructField('LastName', StringType(), True), StructField('age', IntegerType(), True), StructField('category', StringType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('merch_zipcode', IntegerType(), True), StructField('merchant_clean', StringType(), True), StructField('trans_date_trans_time', TimestampType(), True), StructField('amt', DoubleType(), True), StructField

In [37]:
from pyspark.sql.functions import concat, lit, col

cus_mer_tran_df = cus_mer_tran_df.withColumn(
    "cc_num_masked",
    concat(lit("************"), col("cc_num").substr(-4, 4))
)


StatementMeta(cfraudsparkpool, 69, 15, Finished, Available, Finished)

###### Dropping the previos column named cc_num 

In [38]:
cus_mer_tran_df = cus_mer_tran_df.drop("cc_num")
cus_mer_tran_df = cus_mer_tran_df.drop("amt_corrected")
cus_mer_tran_df.schema

StatementMeta(cfraudsparkpool, 69, 16, Finished, Available, Finished)

StructType([StructField('trans_num', StringType(), True), StructField('gender', StringType(), True), StructField('dob', DateType(), True), StructField('job', StringType(), True), StructField('street', StringType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('firstName', StringType(), True), StructField('LastName', StringType(), True), StructField('age', IntegerType(), True), StructField('category', StringType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('merch_zipcode', IntegerType(), True), StructField('merchant_clean', StringType(), True), StructField('trans_date_trans_time', TimestampType(), True), StructField('amt', DoubleType(), True), StructField('unix_time', LongType(), True), StructFiel

##### Dropping null values from dataframe 

In [39]:
cus_mer_tran_df = cus_mer_tran_df.dropna()

StatementMeta(cfraudsparkpool, 69, 17, Finished, Available, Finished)

In [40]:
print("No Of  column",len(cus_mer_tran_df.columns))
print("CreditCardCleanData Schema -----------:",cus_mer_tran_df.schema)

print("No of row in " , cus_mer_tran_df.count())


StatementMeta(cfraudsparkpool, 69, 18, Finished, Available, Finished)

No Of  column 24
CreditCardCleanData Schema -----------: StructType([StructField('trans_num', StringType(), True), StructField('gender', StringType(), True), StructField('dob', DateType(), True), StructField('job', StringType(), True), StructField('street', StringType(), True), StructField('city', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('firstName', StringType(), True), StructField('LastName', StringType(), True), StructField('age', IntegerType(), True), StructField('category', StringType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('merch_zipcode', IntegerType(), True), StructField('merchant_clean', StringType(), True), StructField('trans_date_trans_time', TimestampType(), True), StructField('amt', DoubleType(), True

In [20]:
# gold-layer-data

# Define the correct base path
base_path = "abfss://gold-layer-data@ccfrauddatalake.dfs.core.windows.net/credit-card-cus-mer-tran-data/"

#cus_mer_tran_df.write.mode("overwrite").option("header", "true").csv(base_path + "CreditCardData")

StatementMeta(cfraudsparkpool, 58, 19, Finished, Available, Finished)

##### Writing the cleansed data into gold layer 

In [21]:
cus_mer_tran_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv(base_path + "CreditCardDataSingleFile")


StatementMeta(cfraudsparkpool, 58, 20, Submitted, Running, Running)