# 1.0 Get and view data

In [2]:
import os

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
from pyspark.sql.types import StringType
from uszipcode import SearchEngine

In [3]:
data_path = os.path.join('..', 'data', 'cc_sample_transaction.json')
spark = SparkSession.builder.appName('CC Data Analysis').getOrCreate()
cc_data = spark.read.json(data_path)
cc_data.limit(5).show()  # Looking at the first 5 rows

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|     personal_detail|trans_date_trans_time|           trans_num|
+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|         0|  4.97|     misc_net|CITIUS33CHI|2703186189652095|       0|1325376018798532|         1325376018666|         36.011293| -82.048315|        28705|fraud_Rippin, Kub...|{"person_name":"J...|  2019-01-01 00:00:18|0b242abb623afc578...|
|         1|107.23|  grocery_pos

## 1.1 Summary of data

In [4]:
cc_data.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- category: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_eff_time: string (nullable = true)
 |-- merch_last_update_time: string (nullable = true)
 |-- merch_lat: string (nullable = true)
 |-- merch_long: string (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- trans_num: string (nullable = true)



In [5]:
cc_data.describe().show()

+-------+-----------------+-----------------+-------------+-------+--------------------+--------------------+--------------------+----------------------+-----------------+------------------+------------------+-------------------+--------------------+---------------------+--------------------+
|summary|       Unnamed: 0|              amt|     category| cc_bic|              cc_num|            is_fraud|      merch_eff_time|merch_last_update_time|        merch_lat|        merch_long|     merch_zipcode|           merchant|     personal_detail|trans_date_trans_time|           trans_num|
+-------+-----------------+-----------------+-------------+-------+--------------------+--------------------+--------------------+----------------------+-----------------+------------------+------------------+-------------------+--------------------+---------------------+--------------------+
|  count|          1296675|          1296675|      1296675|1296675|             1296675|             1296675|         

1. Initially, all columns are strings.
2. There are 1,296,675 transactions.
3. Data spans January 1, 2019 to June 21, 2020.
4. The data contains null values.
5. Most transactions relate to travel, least transactions relate to entertainment.
6. There are also fraud cases in the data.

# 2.0 Data cleaning

## 2.1 Casting columns to the proper type

In [6]:
cc_data = cc_data.withColumnRenamed('Unnamed: 0', 'index')
col_dtypes = {
    'index': 'integer',
    'amt': 'float',
    'merch_eff_time': 'long',
    'merch_last_update_time': 'long',
    'merch_lat': 'float',
    'merch_long': 'float',
    'trans_date_trans_time': 'timestamp'
}

# Change multiple column types
for column in col_dtypes.keys():
    cc_data = cc_data.withColumn(column, col(column).cast(col_dtypes[column]))

cc_data.printSchema()

root
 |-- index: integer (nullable = true)
 |-- amt: float (nullable = true)
 |-- category: string (nullable = true)
 |-- cc_bic: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- is_fraud: string (nullable = true)
 |-- merch_eff_time: long (nullable = true)
 |-- merch_last_update_time: long (nullable = true)
 |-- merch_lat: float (nullable = true)
 |-- merch_long: float (nullable = true)
 |-- merch_zipcode: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- personal_detail: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- trans_num: string (nullable = true)



In [7]:
cc_data.describe().show()

+-------+-----------------+------------------+-------------+-------+--------------------+--------------------+--------------------+----------------------+-----------------+------------------+------------------+-------------------+--------------------+--------------------+
|summary|            index|               amt|     category| cc_bic|              cc_num|            is_fraud|      merch_eff_time|merch_last_update_time|        merch_lat|        merch_long|     merch_zipcode|           merchant|     personal_detail|           trans_num|
+-------+-----------------+------------------+-------------+-------+--------------------+--------------------+--------------------+----------------------+-----------------+------------------+------------------+-------------------+--------------------+--------------------+
|  count|          1296675|           1296675|      1296675|1296675|             1296675|             1296675|             1296675|               1296675|          1296675|         

`merch_eff_time` column contains 11 to 16 digit values.

`merch_last_udpate_time` column contains 11 to 13 digit values.

## 2.2 Addressing missing values

In [8]:
null_counts = cc_data.select([
    sum(
        (col(column).isNull() |  # For non-string columns
         (col(column) == 'Null') |
         (col(column) == 'NA') |
         (col(column) == '')).cast('int')
        ).alias(column) for column in cc_data.columns
])
null_counts.show()

+-----+----+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
|index| amt|category|cc_bic|cc_num|is_fraud|merch_eff_time|merch_last_update_time|merch_lat|merch_long|merch_zipcode|merchant|personal_detail|trans_date_trans_time|trans_num|
+-----+----+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+
| NULL|NULL|       0|432980|     0|       0|          NULL|                  NULL|     NULL|      NULL|       195973|       0|              0|                 NULL|        0|
+-----+----+--------+------+------+--------+--------------+----------------------+---------+----------+-------------+--------+---------------+---------------------+---------+



The `cc_bic` column contains null values. However, I do not believe there is a way to find this information from the data.

The `merch_zipcode` column also contains null values and they can be filled in using the `merch_lat` and `merch_long` columns.

### 2.2.1 `cc_bic` column

In [9]:
# Replace 'null' values in cc_bic with None
cc_data_processed = cc_data.withColumn(
    'cc_bic',
    when((col('cc_bic') == 'Null') |
         ((col('cc_bic') == 'NA')) |
         ((col('cc_bic') == '')), None).otherwise(col('cc_bic'))
)

### 2.2.2 `merch_zipcode` column

In [10]:
# Verifying the results of uszipcode library over a sample data
merch_location_test = cc_data_processed.filter(col('merch_zipcode').isNotNull()).limit(5)
merch_location_test = merch_location_test.select(*['merch_lat', 'merch_long', 'merch_zipcode'])
merch_location_test_pandas = merch_location_test.toPandas()
search = SearchEngine()
def get_zipcode(lat, long):
    result = search.by_coordinates(lat=lat, lng=long)
    if result:
        return result[0].zipcode
    else:
        return ''
merch_location_test_pandas['calculated_zipcode'] = merch_location_test_pandas.apply(
    lambda x: get_zipcode(x.merch_lat, x.merch_long), axis=1)
merch_location_test_pandas


Unnamed: 0,merch_lat,merch_long,merch_zipcode,calculated_zipcode
0,36.011292,-82.048317,28705,28705
1,43.150703,-112.15448,83236,83236
2,38.674999,-78.632462,22844,22844
3,40.653381,-76.152664,17972,17972
4,38.94809,-78.540298,22644,22644


The zipcodes match exactly over the sample dataset so we can run it for the missing values.

In [11]:
# missing_merch_zipcode = cc_data_processed.filter(col('merch_zipcode').isNull())
# missing_merch_zipcode = missing_merch_zipcode.select(*['merch_lat', 'merch_long', 'merch_zipcode'])
# missing_merch_zipcode = missing_merch_zipcode.toPandas()
# missing_merch_zipcode['calculated_zipcode'] = missing_merch_zipcode.apply(
#     lambda x: get_zipcode(x.merch_lat, x.merch_long), axis=1)
# # Saving results to save time in the future
# missing_merch_zipcode[['merch_lat', 'merch_long', 'calculated_zipcode']].to_parquet(os.path.join('..', 'data', 'calculated_zipcodes.parquet'), index=False)
# missing_merch_zipcode.head()

In [12]:
missing_merch_zipcode = spark.read.parquet(os.path.join('..', 'data', 'calculated_zipcodes.parquet'))
missing_merch_zipcode = missing_merch_zipcode.withColumnRenamed('merch_lat', 'merch_lat_todrop')
missing_merch_zipcode = missing_merch_zipcode.withColumnRenamed('merch_long', 'merch_long_todrop')
cc_data_processed = cc_data_processed.join(missing_merch_zipcode,
                              (cc_data_processed['merch_lat'] == missing_merch_zipcode['merch_lat_todrop']) &
                              (cc_data_processed['merch_long'] == missing_merch_zipcode['merch_long_todrop']),
                              'left')
cc_data_processed = cc_data_processed.withColumn(
    'merch_zipcode',
    when(
        col('merch_zipcode').isNull(), col('calculated_zipcode')
        ).otherwise(col('merch_zipcode'))
)

for column in missing_merch_zipcode.columns:
    cc_data_processed = cc_data_processed.drop(column)

In [16]:
null_counts = cc_data_processed.select([
    sum(
        (col('merch_zipcode')=='').cast('int')
        ).alias('merch_zipcode')])
null_counts.show()

+-------------+
|merch_zipcode|
+-------------+
|        59882|
+-------------+



We've managed to reduced the missing data in `merch_zipcode` from 195,973 (15%) to 59,882 (5%) rows.

## 2.3 Formatting Time Columns

I assume all time columns are recorded in UTC time.

In [11]:
from datetime import datetime, timezone

# Given epoch time in microseconds
epoch_time = 1325376018798532
epoch_time = 1325376018666

# Convert to seconds (divide by 1,000,000)
epoch_time_seconds = epoch_time / 1_000_000

# Convert epoch seconds to a readable datetime
readable_time = datetime.fromtimestamp(epoch_time_seconds, tz=timezone.utc)

print("Readable Time (UTC):", readable_time)


Readable Time (UTC): 2012-01-01 00:00:18.798532+00:00


In [12]:
1000000 == 1_000_000

True

In [15]:
from datetime import datetime, timezone

# Given epoch time in microseconds (example value)
epoch_time = 1325376018666

# Convert to seconds (divide by 1,000,000)
epoch_time_seconds = epoch_time / 1_000

# Convert epoch seconds to a readable datetime in UTC
readable_time = datetime.fromtimestamp(epoch_time_seconds, tz=timezone.utc)

print("Readable Time (UTC):", readable_time)

Readable Time (UTC): 2012-01-01 00:00:18.666000+00:00


In [13]:
cc_data.select('merchant').distinct().show()

+--------------------+
|            merchant|
+--------------------+
|  fraud_Rau and Sons|
|    fraud_Herman Inc|
|     fraud_Thiel PLC|
|fraud_O'Hara-Wild...|
|fraud_Bradtke, To...|
|fraud_Altenwerth,...|
|fraud_Robel, Cumm...|
| fraud_Greenholt Ltd|
|fraud_Ledner, Har...|
|  fraud_Waelchi-Wolf|
|   fraud_Effertz LLC|
|fraud_Smitham-Sch...|
| fraud_Kihn-Schuster|
|   fraud_Hills-Boyer|
|fraud_Douglas, Du...|
|      fraud_Jast Ltd|
|fraud_Stroman, Hu...|
|fraud_Kerluke-Abs...|
|fraud_Gottlieb-Ha...|
|fraud_Rippin-VonR...|
+--------------------+
only showing top 20 rows



In [59]:
from pyspark.sql.functions import count

# Group by 'merchant' and 'merch_eff_time', then count the rows in each group
grouped_df = cc_data.groupBy("merchant", "merch_last_update_time").agg(count("*").alias("row_count"))

# Show the grouped DataFrame
grouped_df.show()

+--------------------+----------------------+---------+
|            merchant|merch_last_update_time|row_count|
+--------------------+----------------------+---------+
|fraud_Reichert, H...|         1325379419461|        1|
|fraud_Streich, Ha...|           13253955921|        1|
|fraud_Adams, Kova...|         1325407009293|        1|
|    fraud_Barton Inc|         1325410422100|        1|
|fraud_Schumm, Bau...|         1325410956149|        1|
|fraud_Heidenreich...|         1325414448242|        1|
|   fraud_Lockman Ltd|         1325415238378|        1|
|fraud_Goyette, Ho...|         1325422035766|        1|
|fraud_Swaniawski,...|         1325423065476|        1|
|  fraud_Schmeler Inc|         1325435865977|        1|
|fraud_Kutch-Wilde...|         1325437432727|        1|
|fraud_Wuckert, Wi...|          132545036954|        1|
|fraud_Bernier and...|         1325452448770|        1|
|fraud_Lubowitz-Wa...|         1325454239320|        1|
|fraud_Stiedemann Ltd|         1325456901399|   

In [23]:
cc_data.groupby('is_fraud').count().show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|       0|1289169|
|       1|   7506|
+--------+-------+



Around 0.5% of the transactions are fraud. This should be considered if training a ML model.