In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataCleaningBigData").getOrCreate()

df = spark.read.csv("/opt/spark_data/ecommerce_transactions_1000.csv", header=True, inferSchema=True)
df.show(5)


+--------------+-------+--------+--------------------+-------------------+
|transaction_id|user_id|  amount|               email|   transaction_time|
+--------------+-------+--------+--------------------+-------------------+
|         T0001|   U069|    NULL|jeffreyfisher@gma...|2025-04-20 08:00:02|
|         T0002|   U253|70921.08| porteramy@yahoo.com|2025-03-30 21:07:41|
|         T0003|   U222|42313.74|  jerome93@yahoo.com|2025-04-20 10:50:30|
|         T0004|   U187|    NULL|jimeneztamara@sny...|2025-04-05 11:48:29|
|         T0005|   U064|81176.73|   louis64@gmail.com|2025-04-14 08:50:35|
+--------------+-------+--------+--------------------+-------------------+
only showing top 5 rows



In [41]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- email: string (nullable = true)
 |-- transaction_time: timestamp (nullable = true)



In [42]:
from pyspark.sql.functions import col, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------------+-------+------+-----+----------------+
|transaction_id|user_id|amount|email|transaction_time|
+--------------+-------+------+-----+----------------+
|             0|      0|   316|    0|              50|
+--------------+-------+------+-----+----------------+



In [43]:
print("Jumlah baris:", df.count())

Jumlah baris: 1000


In [44]:
df = df.dropna(subset=["transaction_time"])
df = df.fillna({"amount": 0})

In [45]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------------+-------+------+-----+----------------+
|transaction_id|user_id|amount|email|transaction_time|
+--------------+-------+------+-----+----------------+
|             0|      0|     0|    0|               0|
+--------------+-------+------+-----+----------------+



In [47]:
from pyspark.sql.functions import instr, substring_index

df = df.withColumn("email_domain", substring_index("email", "@", -1))

df = df.filter(instr(col("email"), "@") > 0)

In [48]:
from pyspark.sql.functions import col, when, count

df.select(
    count(
        when(~col("email").contains("@"), True)
    ).alias("invalid_email_count")
).show()



+-------------------+
|invalid_email_count|
+-------------------+
|                  0|
+-------------------+



In [49]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import to_date, col

df = df.withColumn("amount", col("amount").cast(DoubleType()))
df = df.withColumn("transaction_date", to_date(col("transaction_time")))


In [50]:
final = df.toPandas()

final.to_csv("work/cleaned_transactions_1000.csv", index=False)


In [22]:
print(df.columns)

['transaction_id', 'user_id', 'amount', 'email', 'transaction_time', 'email_domain', 'transaction_date']


In [23]:
df.filter(col("transaction_date").isNotNull()).count()


867

In [51]:
final = df.toPandas()
print(final.columns)
print(final.head())


Index(['transaction_id', 'user_id', 'amount', 'email', 'transaction_time',
       'email_domain', 'transaction_date'],
      dtype='object')
  transaction_id user_id    amount                          email  \
0          T0001    U069      0.00        jeffreyfisher@gmail.com   
1          T0002    U253  70921.08            porteramy@yahoo.com   
2          T0003    U222  42313.74             jerome93@yahoo.com   
3          T0004    U187      0.00  jimeneztamara@snyder-shaw.com   
4          T0005    U064  81176.73              louis64@gmail.com   

     transaction_time     email_domain transaction_date  
0 2025-04-20 08:00:02        gmail.com       2025-04-20  
1 2025-03-30 21:07:41        yahoo.com       2025-03-30  
2 2025-04-20 10:50:30        yahoo.com       2025-04-20  
3 2025-04-05 11:48:29  snyder-shaw.com       2025-04-05  
4 2025-04-14 08:50:35        gmail.com       2025-04-14  


In [26]:
final = df.toPandas()
final.to_csv("work/cleaned_transactions_1000.csv", index=False)

In [34]:
df.se

Jumlah nilai 'amount' yang numerik : 867


In [52]:
from pyspark.sql.functions import col

amount5 = df.orderBy(col("amount").desc()).limit(5)
print("Top 5 transaksi dengan amount terbesar")
amount5.show()

Top 5 transaksi dengan amount terbesar
+--------------+-------+--------+--------------------+-------------------+------------+----------------+
|transaction_id|user_id|  amount|               email|   transaction_time|email_domain|transaction_date|
+--------------+-------+--------+--------------------+-------------------+------------+----------------+
|         T0437|   U233|99830.84|franklincraig@gma...|2025-03-31 01:07:47|   gmail.com|      2025-03-31|
|         T0175|   U224|99410.65|natalie63@hotmail...|2025-04-10 14:15:20| hotmail.com|      2025-04-10|
|         T0320|   U046|99399.22|bonniemack@yahoo.com|2025-04-05 21:15:08|   yahoo.com|      2025-04-05|
|         T0451|   U293|98343.68|  sean46@walters.com|2025-04-17 14:27:35| walters.com|      2025-04-17|
|         T0067|   U183|98103.36|danielramirez@hot...|2025-04-19 08:54:15| hotmail.com|      2025-04-19|
+--------------+-------+--------+--------------------+-------------------+------------+----------------+



In [53]:
total_transaksi = df.count()
print(f"Jumlah total transaksi: {total_transaksi}")


Jumlah total transaksi: 867


In [54]:
from pyspark.sql.functions import sum as spark_sum

total_pendapatan = df.select(spark_sum("amount")).collect()[0][0]
print(f"Total pendapatan dari seluruh transaksi: {total_pendapatan}")


Total pendapatan dari seluruh transaksi: 16922579.86999999


In [61]:
quantiles = df.approxQuantile("amount", [0.25, 0.75], 0.05)
Q1, Q3 = quantiles
IQR = Q3 - Q1

lower_bound = Q1 -  1.5 * IQR
upper_bound = Q3 +  1.5 * IQR

print(f"Q1 = {Q1}, Q3 = {Q3}, IQR = {IQR}")
print(f"Lower Bound = {lower_bound}, Upper Bound = {upper_bound}")
outliers = df.filter((df.amount < lower_bound) | (df.amount > upper_bound))
print("Jumlah Outliers : ", outliers.count())

Q1 = 0.0, Q3 = 30029.83, IQR = 30029.83
Lower Bound = -45044.745, Upper Bound = 75074.57500000001
Jumlah Outliers :  86


In [62]:
outliers_count = outliers.count()  

total_transaksi = df.count()

persentase_outlier = (outliers_count / total_transaksi) * 100
print(f"Persentase outlier terhadap seluruh transaksi: {persentase_outlier:.2f}%")

Persentase outlier terhadap seluruh transaksi: 9.92%
