3. Filtering high-value transactions from a CSV file for targeted promotions.
Tasks involved: Filtering, Cleaning
Filtering: Selecting only records that meet specific criteria — in this case, high-value transactions (e.g., transactions above $1000) or mean value.
Cleaning: Ensuring the transaction data is accurate and usable — removing rows with missing amounts, fixing number formats, etc.

🟢 Goal: Identify and use only the most valuable customers or transactions for special promotions, such as VIP discounts or exclusive offers.

In [90]:
#Getting the SparkSession

from pyspark.sql import SparkSession

In [91]:
#Buidling the spark app
spark = SparkSession.builder\
.appName("MostValuble_customers")\
.getOrCreate()

In [92]:
#Importing the first transactions file
df_transactions = spark.read.option("header",True).csv('transaction_data 1.csv')

In [93]:
#Checking the df_transactions data to check if there are any errors
df_transactions.printSchema() 

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- store_location: string (nullable = true)



From the above transactions dataset these are things we need to change 
1. we can see that amount column in the string we should conver into integer to perform any fileters
2. transaction_date is in the date format we need to change it to date format
3. we need to delete any rows with zero amounts because we can't consider them in our analysis and we can save lot much of computation time


In [94]:
df_customers = spark.read.option("header",True).csv('us_customer_data 2.csv')

In [95]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- loyalty_status: string (nullable = true)



In [96]:
#Cleaning the transactions tables
df_transactions.distinct().count() 

1000

In [97]:
df_transactions.count()

1000

In [98]:
from pyspark.sql.functions import*

In [99]:
null_count = df_transactions.filter(col("amount").isNull()).count()

In [100]:
null_count

50

In [101]:
df_transactions.filter(col("amount").isNull()).show() 

+--------------+-----------+------+-------------------+----------------+--------------+--------------+
|transaction_id|customer_id|amount|   transaction_date|product_category|payment_method|store_location|
+--------------+-----------+------+-------------------+----------------+--------------+--------------+
|             4|         19|  NULL|2025-04-30 15:26:23|          Sports|    Debit Card|   Los Angeles|
|            14|        579|  NULL|2025-04-16 20:54:47|            Home|          Cash|        Online|
|            24|        184|  NULL|2025-02-27 13:58:05|          Beauty|    Debit Card|   Los Angeles|
|            26|        744|  NULL|2025-04-23 02:38:39|          Beauty|        PayPal|      New York|
|            34|        838|  NULL|2025-01-30 22:27:45|        Clothing|          Cash|   Los Angeles|
|           104|        928|  NULL|2025-04-20 02:32:07|          Beauty|          Cash|      New York|
|           110|        632|  NULL|2025-02-17 13:45:29|          Sports| 

In [102]:
#Considering not null amount rows
df_transations_clean = df_transactions.filter(col("amount").isNotNull())

In [103]:
df_transations_clean.count()

950

In [104]:
from pyspark.sql.types import*

In [105]:
#As we already what we did for the first csv file which is customers so doing the same process 
df_customers = df_customers.withColumn("clean_phone", regexp_replace("phone", r"x\d+", "")) \
       .withColumn("clean_phone", regexp_replace("clean_phone", r"\D", "")) \
       .withColumn("valid_phone", when(length("clean_phone") >= 10, col("clean_phone")).otherwise(lit(None)))

In [106]:
df_customers= df_customers.withColumn("registration_date",col("registration_date").cast(DateType()))

In [107]:
df_customers_clean = df_customers.select("customer_id","name","email","valid_phone","address","registration_date","loyalty_status")

In [108]:
df_customers_clean.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- valid_phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- loyalty_status: string (nullable = true)



In [109]:
df_transations_clean.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- store_location: string (nullable = true)



In [110]:
df_transations_clean = df_transations_clean.withColumn("amount", expr("try_cast(amount as double)"))

In [111]:
df_merge = df_customers_clean.join(df_transations_clean,on="customer_id",how="inner")

In [112]:
df_merge.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- valid_phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- loyalty_status: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- store_location: string (nullable = true)



In [116]:
df_merge.count()

950

In [135]:
df_Filtered_Customer_1000 = df_merge.filter((col("amount").cast(IntegerType()))>1000)

In [141]:
df_Filtered_Customer_1000.show()

+-----------+-----------------+--------------------+-------------+--------------------+-----------------+--------------+--------------+-------+-------------------+----------------+--------------+--------------+
|customer_id|             name|               email|  valid_phone|             address|registration_date|loyalty_status|transaction_id| amount|   transaction_date|product_category|payment_method|store_location|
+-----------+-----------------+--------------------+-------------+--------------------+-----------------+--------------+--------------+-------+-------------------+----------------+--------------+--------------+
|          1|    Michelle Kidd|  vayala@example.net|         NULL|USNS Santiago, FP...|       2025-01-25|          Gold|             8|2652.57|2025-04-19 14:46:00|            Home|   Credit Card|      New York|
|          4|   Kimberly Price|jessicaknight@exa...|0019476334224|1631 Alexis Meado...|       2024-12-08|          Gold|           427|1787.09|2025-04-28 04