# Group by merchants and aggregate other columns

In [1]:
from pyspark.sql import SparkSession, functions as F

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "3g")
    .config("spark.driver.memory", "3g")
    .getOrCreate()
)

24/10/01 23:11:26 WARN Utils: Your hostname, MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.0.6 instead (on interface en0)
24/10/01 23:11:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/01 23:11:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/01 23:11:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Load in data after cleaning tags
trans_final_sdf = spark.read.parquet('../data/merged/merged_transactions_with_tags.parquet')

                                                                                

In [3]:
# Group by on merchant abn, week of year, year, merchant name, and fixed_tags
grouped_sdf = trans_final_sdf.groupBy("merchant_abn",
    "order_datetime", "merchant_name", \
    "description", 'revenue_band', 'take_rate', 'industry_segment')\
    .agg(  # aggregate the following columns
    F.sum("dollar_value").alias("revenue"),
    F.sum(F.when(F.col("gender") == "Male", 1).otherwise(0)).alias("male_count"),
    F.sum(F.when(F.col("gender") == "Female", 1).otherwise(0)).alias("female_count"),
    F.sum(F.when(F.col("gender") == "Undisclosed", 1).otherwise(0)).alias("undisclosed_count"),
    F.count("order_id").alias("num_trans"),
    F.count_distinct("postcode").alias("num_postcodes"),
    F.count_distinct("user_id").alias("num_unique_customers"),
    F.sum("postcode_mean_income").alias("sum_customer_mean_income"),
    F.sum(F.when(F.col("postcode_mean_income").isNull(), 1).otherwise(0)).alias("num_null_POA"),
    F.avg("consumer_fraud_probability").alias("average_consumer_fraud_probability"),
    F.avg("merchant_fraud_probability").alias("average_merchant_fraud_probability"),
    F.sum(F.when(F.col("is_fraud") == "true", 1).otherwise(0)).alias("fraud_count")
)
grouped_sdf = grouped_sdf.orderBy("merchant_abn", "order_datetime")

In [4]:
# New column for average transaction value
grouped_sdf = grouped_sdf.withColumn('mean_trans_val', grouped_sdf.revenue/grouped_sdf.num_trans)

# New column for expected customer mean income
grouped_sdf = grouped_sdf.withColumn('expected_customer_mean_income', grouped_sdf.sum_customer_mean_income / (grouped_sdf.num_trans - grouped_sdf.num_null_POA))

grouped_sdf.show(5, truncate=False)

24/10/01 23:11:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/10/01 23:11:41 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/10/01 23:11:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:11:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:11:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:11:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:12:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedK

+------------+--------------+-------------+---------------------------------------------------------------------------------+------------+---------+----------------+-----------------+----------+------------+-----------------+---------+-------------+--------------------+------------------------+------------+----------------------------------+----------------------------------+-----------+------------------+-----------------------------+
|merchant_abn|order_datetime|merchant_name|description                                                                      |revenue_band|take_rate|industry_segment|revenue          |male_count|female_count|undisclosed_count|num_trans|num_postcodes|num_unique_customers|sum_customer_mean_income|num_null_POA|average_consumer_fraud_probability|average_merchant_fraud_probability|fraud_count|mean_trans_val    |expected_customer_mean_income|
+------------+--------------+-------------+-----------------------------------------------------------------------------

In [5]:
from pyspark.sql import Window

# Fill NULL sum_customer_mean_income and expected_customer_mean_income
grouped_sdf = grouped_sdf.fillna(0, subset = ['sum_customer_mean_income', 'expected_customer_mean_income'])

# Define window specification to order by order_datetime within each merchant_abn
window_spec = Window.partitionBy("merchant_abn").orderBy("order_datetime")

# Get first order date per merchant
grouped_sdf = grouped_sdf.withColumn("first_order_datetime", F.min("order_datetime").over(window_spec))
# Days since first record
grouped_sdf = grouped_sdf.withColumn("days_since_first_record", F.datediff(F.col("order_datetime"), F.col("first_order_datetime")))

# Create column for days_since_first_record^2
grouped_sdf = grouped_sdf.withColumn('days_since_first_record_2', F.pow(F.col('days_since_first_record'), 2))

# Create new column for cumulative_revenue
grouped_sdf = grouped_sdf.withColumn("cumulative_revenue", F.sum(F.col("revenue")).over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# Drop column num_null_POA
grouped_sdf = grouped_sdf.drop('num_null_POA')

grouped_sdf.show(20, truncate = False)


24/10/01 23:19:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:19:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:20:33 WARN RowBasedKeyValueBatch: Calling spill() on

+------------+--------------+-------------+---------------------------------------------------------------------------------+------------+---------+----------------+------------------+----------+------------+-----------------+---------+-------------+--------------------+------------------------+----------------------------------+----------------------------------+-----------+------------------+-----------------------------+--------------------+-----------------------+-------------------------+------------------+
|merchant_abn|order_datetime|merchant_name|description                                                                      |revenue_band|take_rate|industry_segment|revenue           |male_count|female_count|undisclosed_count|num_trans|num_postcodes|num_unique_customers|sum_customer_mean_income|average_consumer_fraud_probability|average_merchant_fraud_probability|fraud_count|mean_trans_val    |expected_customer_mean_income|first_order_datetime|days_since_first_record|days_since

In [7]:
# Save as csv to check
# grouped_sdf.coalesce(1).write.csv('../data/curated/revenue.csv', header=True)
# Save as parquet to continue with segments analysis
grouped_sdf.write.mode("overwrite").parquet("../data/curated/revenue.parquet")

24/10/01 23:28:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:28:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:28:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:28:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:29:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:29:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:29:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:29:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/10/01 23:30:00 WARN RowBasedKeyValueBatch: Calling spill() on