In [0]:
%fs
ls /FileStore/transaction_large-1.csv


path,name,size,modificationTime
dbfs:/FileStore/transaction_large-1.csv,transaction_large-1.csv,54228538,1695534819000


In [0]:
%sql
create database transaction_large_table_cleaning

In [0]:
transaction_large_cleaning_df = spark.read.format('csv').option("header","True").option("inferschema","true").load("dbfs:/FileStore/transaction_large-1.csv")

In [0]:
transaction_large_cleaning_df.count()

Out[4]: 1048575

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

# Create a SparkSession
spark = SparkSession.builder.appName("HandleOutliers").getOrCreate()

# Load your DataFrame
transaction_large_cleaning_df = spark.read.format("csv").option("header","True").option("inferschema","true").load("dbfs:/FileStore/transaction_large-1.csv")

# Convert 'amount' column to FloatType
transaction_large_cleaning_df = transaction_large_cleaning_df.withColumn("amount", transaction_large_cleaning_df["amount"].cast(FloatType()))

# Calculate Q1, Q3, and IQR
bounds = transaction_large_cleaning_df.stat.approxQuantile("amount", [0.25, 0.75], 0)
Q1 = bounds[0]
Q3 = bounds[1]
IQR = Q3 - Q1

# Define lower and upper bounds for outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter the data frame to remove outliers
transaction_large_df_no_outliers = transaction_large_cleaning_df.filter((col("amount") >= lower_bound) & (col("amount") <= upper_bound))


In [0]:
transaction_large_df_no_outliers.count()

Out[6]: 1048571

In [0]:
transaction_large_df_no_outliers.show()

+----+----------------+------+----------------+-----------+
|  id|        datetime|amount|       card_type|merchant_id|
+----+----------------+------+----------------+-----------+
|1630|09-07-1989 08:04|   1.0|      MasterCard|   MID59578|
|1630|            null| 16.29|      MasterCard|   MID59578|
|1630|09-07-1989 08:04| 16.29|      MasterCard|   MID59578|
|1630|            null| 16.29|      MasterCard|   MID59578|
|1630|09-07-1989 08:04| 16.29|      MasterCard|   MID59578|
|1638|16-04-1979 00:41|117.58|      MasterCard|   MID31981|
|1639|30-09-2000 20:41|873.13|American Express|   MID18885|
|1641|29-11-1975 01:12|851.32|American Express|   MID16625|
|1642|12-11-2012 03:14|458.94|      MasterCard|   MID64824|
|1643|24-02-1999 03:03|443.87|American Express|   MID10496|
|1644|20-01-1995 05:17|907.26|            Visa|   MID47464|
|1645|22-08-1987 19:59| 648.3|            Visa|   MID60019|
|1646|25-07-1985 20:53|440.18|      MasterCard|   MID53261|
|1647|18-11-2005 12:09|641.65|American E

In [0]:
transaction_large_df_no_outliers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- datetime: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- card_type: string (nullable = true)
 |-- merchant_id: string (nullable = true)



In [0]:
from pyspark.sql.functions import split
from pyspark.sql.functions import split, to_date

In [0]:
# Split 'datetime' column into 'date' and 'time' columns
split_col = split(transaction_large_df_no_outliers['datetime'], ' ')
df = transaction_large_df_no_outliers.withColumn('date', to_date(split_col.getItem(0), 'MM-dd-yyyy'))
df2 = df.withColumn('time', split_col.getItem(1))

# Show the DataFrame
df2.show()

+----+----------------+------+----------------+-----------+----------+-----+
|  id|        datetime|amount|       card_type|merchant_id|      date| time|
+----+----------------+------+----------------+-----------+----------+-----+
|1630|09-07-1989 08:04|   1.0|      MasterCard|   MID59578|1989-09-07|08:04|
|1630|            null| 16.29|      MasterCard|   MID59578|      null| null|
|1630|09-07-1989 08:04| 16.29|      MasterCard|   MID59578|1989-09-07|08:04|
|1630|            null| 16.29|      MasterCard|   MID59578|      null| null|
|1630|09-07-1989 08:04| 16.29|      MasterCard|   MID59578|1989-09-07|08:04|
|1638|16-04-1979 00:41|117.58|      MasterCard|   MID31981|      null|00:41|
|1639|30-09-2000 20:41|873.13|American Express|   MID18885|      null|20:41|
|1641|29-11-1975 01:12|851.32|American Express|   MID16625|      null|01:12|
|1642|12-11-2012 03:14|458.94|      MasterCard|   MID64824|2012-12-11|03:14|
|1643|24-02-1999 03:03|443.87|American Express|   MID10496|      null|03:03|

In [0]:
from pyspark.sql.functions import lit

df = df2.na.fill({"date": "1989-09-07", "time": "08:04"})

In [0]:
sorted_df = df.orderBy("date")

In [0]:
sorted_df.show()

+-------+----------------+------+----------------+-----------+----------+-----+
|     id|        datetime|amount|       card_type|merchant_id|      date| time|
+-------+----------------+------+----------------+-----------+----------+-----+
|1025117|01-01-1970 21:51| 721.7|American Express|   MID93640|1970-01-01|21:51|
| 332503|01-01-1970 12:00|731.96|American Express|   MID53189|1970-01-01|12:00|
| 718206|01-01-1970 21:59| 983.6|American Express|   MID76491|1970-01-01|21:59|
| 317444|01-01-1970 04:23|663.46|            Visa|   MID82091|1970-01-01|04:23|
| 174624|01-01-1970 08:46|390.48|American Express|   MID39400|1970-01-01|08:46|
| 327127|01-01-1970 20:05|702.96|American Express|   MID91000|1970-01-01|20:05|
| 713248|01-01-1970 10:20|155.37|American Express|   MID61494|1970-01-01|10:20|
| 356068|01-01-1970 17:52|189.89|      MasterCard|   MID88173|1970-01-01|17:52|
| 437611|01-01-1970 06:08|782.75|            Visa|   MID96696|1970-01-01|06:08|
| 364062|01-01-1970 12:56|588.85|       

In [0]:
from math import isnan

In [0]:
# Check for non-numeric values (nulls in this case, after casting)
non_numeric = sorted_df.filter(col("amount").isNull()).count()
print(f"Number of non-numeric values: {non_numeric}")

# Check for negative numbers
negative_numbers = sorted_df.filter(col("amount") < 0).count()
print(f"Number of negative numbers: {negative_numbers}")

Number of non-numeric values: 0
Number of negative numbers: 0


In [0]:
sorted_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- datetime: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- card_type: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- time: string (nullable = false)



In [0]:
from pyspark.sql.functions import when, col

In [0]:
# Flag records with missing 'datetime' field
df3 = sorted_df.withColumn("missing_datetime", when(col("datetime").isNull(), 1).otherwise(0))

# Show the DataFrame
df3.show()

+-------+----------------+------+----------------+-----------+----------+-----+----------------+
|     id|        datetime|amount|       card_type|merchant_id|      date| time|missing_datetime|
+-------+----------------+------+----------------+-----------+----------+-----+----------------+
|1025117|01-01-1970 21:51| 721.7|American Express|   MID93640|1970-01-01|21:51|               0|
| 587238|01-01-1970 08:48|242.99|            Visa|   MID90788|1970-01-01|08:48|               0|
| 332503|01-01-1970 12:00|731.96|American Express|   MID53189|1970-01-01|12:00|               0|
| 634163|01-01-1970 11:21|778.43|American Express|   MID41036|1970-01-01|11:21|               0|
| 864684|01-01-1970 16:20|609.33|            Visa|   MID16224|1970-01-01|16:20|               0|
| 604612|01-01-1970 19:26|802.65|      MasterCard|   MID14094|1970-01-01|19:26|               0|
| 317444|01-01-1970 04:23|663.46|            Visa|   MID82091|1970-01-01|04:23|               0|
| 642032|01-01-1970 09:40|484.

In [0]:
df3.filter(col("missing_datetime")==1).show()

+----+--------+------+----------+-----------+----------+-----+----------------+
|  id|datetime|amount| card_type|merchant_id|      date| time|missing_datetime|
+----+--------+------+----------+-----------+----------+-----+----------------+
|1630|    null| 16.29|MasterCard|   MID59578|1989-09-07|08:04|               1|
|1630|    null| 16.29|MasterCard|   MID59578|1989-09-07|08:04|               1|
+----+--------+------+----------+-----------+----------+-----+----------------+



In [0]:
df3.count()

Out[37]: 1048571

In [0]:
from pyspark.sql import functions as F

#  Df is the DataFrame and "datetime" is our column of interest
df4 = df3.filter(F.col("datetime").isNotNull()).dropDuplicates()


In [0]:
df4.count()

Out[39]: 1048568

In [0]:
from pyspark.sql import functions as F

# Df is the DataFrame and "amount" is our column of interest
missing_amounts = df4.filter(F.col("amount").isNull()).count()

print(f"Number of missing values in 'amount': {missing_amounts}")


Number of missing values in 'amount': 0


In [0]:
df4.write.format("csv").option("header","true").save("/FileStore/data/cleaned/transaction_cleaned_data.csv")
