# Import Library

In [1]:
# Pandas
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, to_date, to_timestamp, when, expr, date_trunc, count, avg
from pyspark.sql.functions import explode, split, trim
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import count as Fcount
from pyspark.sql.window import Window 

spark = SparkSession.builder \
    .appName("BNPL_Part1_Data") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/12 15:02:45 WARN Utils: Your hostname, xuzhengs-MacBook-Pro-2.local, resolves to a loopback address: 127.0.0.1; using 192.168.112.60 instead (on interface en0)
25/09/12 15:02:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/12 15:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/12 15:02:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Data Check - Part 2 & 3 & 4

In [2]:
p2_path = "part2_data/transaction2021A"
p2_df = spark.read.parquet(p2_path)

p3_path = "part3_data/transaction2021B"
p3_df = spark.read.parquet(p3_path)

p4_path = "part4_data/transaction2022A"
p4_df = spark.read.parquet(p4_path)

new_tbl_consumer = (
    spark.read
         .option("header", True)   
         .option("inferSchema", True)
         .csv("part1_data/new_tbl_consumer.csv")
)
consumer_details = spark.read.parquet("part1_data/consumer_user_details.parquet")

### 赵航宇数据
preprocessed_data = spark.read.parquet("preprocessed_data.parquet")

crime_df = (
    spark.read
        .option("header", True)   
        .option("inferSchema", True)
        .csv("external_data/comprehensive_crime_rate.csv")
)

# income
income_df = (
    spark.read
        .option("header", True)   
        .option("inferSchema", True)
        .csv("external_data/income.csv")
)

# employed rate
employed_rate = (
    spark.read
        .option("header", True)   
        .option("inferSchema", True)
        .csv("external_data/employed_rate.csv")
)


# consumer_file
consumer_level = spark.read.parquet("part1_data/consumer_profile.parquet")

# merchant file
merchant_level = spark.read.parquet("part1_data/merchant_profile.parquet")

                                                                                

In [3]:
p2_df.show(10)
p2_df.count()

+-------+------------+------------------+--------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|
+-------+------------+------------------+--------------------+--------------+
|  18478| 62191208634|63.255848959735246|949a63c8-29f7-4ab...|    2021-08-20|
|      2| 15549624934| 130.3505283105634|6a84c3cf-612a-457...|    2021-08-20|
|  18479| 64403598239|120.15860593212783|b10dcc33-e53f-425...|    2021-08-20|
|      3| 60956456424| 136.6785200286976|0f09c5a5-784e-447...|    2021-08-20|
|  18479| 94493496784| 72.96316578355305|f6c78c1a-4600-4c5...|    2021-08-20|
|      3| 76819856970|  448.529684285612|5ace6a24-cdf0-4aa...|    2021-08-20|
|  18479| 67609108741|  86.4040605836911|d0e180f0-cb06-42a...|    2021-08-20|
|      3| 34096466752| 301.5793450525113|6fb1ff48-24bb-4f9...|    2021-08-20|
|  18482| 70501974849| 68.75486276223054|8505fb33-b69a-412...|    2021-08-20|
|      4| 49891706470| 48.89796461900801|ed11e477-b09f-4ae...|  

3643266

In [4]:
p3_df.show(10)
p3_df.count()

+-------+------------+------------------+--------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|
+-------+------------+------------------+--------------------+--------------+
|  14935| 79417999332|136.06570809815838|23acbb7b-cf98-458...|    2021-11-26|
|      1| 46451548968| 72.61581642788431|76bab304-fa2d-400...|    2021-11-26|
|  14936| 89518629617|3.0783487174439297|a2ae446a-2959-41c...|    2021-11-26|
|      1| 49167531725| 51.58228625503599|7080c274-17f7-4cc...|    2021-11-26|
|  14936| 31101120643|25.228114942417797|8e301c0f-06ab-45c...|    2021-11-26|
|      2| 67978471888| 691.5028234458998|0380e9ad-b0e8-420...|    2021-11-26|
|  14936| 60956456424|102.13952056640888|5ac3da9c-5147-452...|    2021-11-26|
|      2| 47644196714| 644.5220654863093|4e368e44-86f8-4de...|    2021-11-26|
|  14938| 39649557865|209.12780951421405|4d78cd01-4bab-494...|    2021-11-26|
|      3| 88402174457| 141.0387993699113|c50c957d-ecfc-430...|  

4508106

In [5]:
p4_df.show(10)
p4_df.count() 

+-------+------------+------------------+--------------------+--------------+
|user_id|merchant_abn|      dollar_value|            order_id|order_datetime|
+-------+------------+------------------+--------------------+--------------+
|  11139| 96152467973|16.213590228273233|785b0080-9e4b-471...|    2022-08-20|
|      1| 98973094975| 86.97955945703498|2560f7b0-ee5d-4b3...|    2022-08-20|
|  11139| 56762458844|31.513502323509194|0311717b-8b5b-410...|    2022-08-20|
|      1| 89502033586|124.18468694868491|f8891626-f098-45b...|    2022-08-20|
|  11139| 96161808980|61.620445567668966|d90a421f-f1da-4bf...|    2022-08-20|
|      2| 72472909171| 32.26524985312485|523e0403-b677-450...|    2022-08-20|
|  11139| 91923722701|11.331586767322225|f45a842b-0366-41d...|    2022-08-20|
|      3| 46380096952|119.80011239189334|58d0f423-037c-43f...|    2022-08-20|
|  11140| 79283124876|198.13027742225435|60b12d41-41d6-4c1...|    2022-08-20|
|      4| 67202032418|206.20865323560022|64a05a23-a078-481...|  

6044133

In [6]:
new_tbl_consumer.show(5, truncate=False)

+-----------------+-----------------------------+-----+--------+------+-----------+
|name             |address                      |state|postcode|gender|consumer_id|
+-----------------+-----------------------------+-----+--------+------+-----------+
|Yolanda Williams |413 Haney Gardens Apt. 742   |WA   |6935    |Female|1195503    |
|Mary Smith       |3764 Amber Oval              |NSW  |2782    |Female|179208     |
|Jill Jones MD    |40693 Henry Greens           |NT   |862     |Female|1194530    |
|Lindsay Jimenez  |00653 Davenport Crossroad    |NSW  |2780    |Female|154128     |
|Rebecca Blanchard|9271 Michael Manors Suite 651|WA   |6355    |Female|712975     |
+-----------------+-----------------------------+-----+--------+------+-----------+
only showing top 5 rows


In [7]:
preprocessed_data.show(10)

+-------+------------+------------------+--------------------+
|user_id|merchant_abn|      dollar_value|            order_id|
+-------+------------+------------------+--------------------+
|      1| 28000487688|133.22689421562643|0c37b3f7-c7f1-48c...|
|  18485| 62191208634| 79.13140006851712|9e18b913-0465-4fd...|
|      1| 83690644458|30.441348317517228|40a2ff69-ea34-465...|
|  18488| 39649557865| 962.8133405407585|f4c1a5ae-5b76-40d...|
|      2| 80779820715| 48.12397733548124|cd09bdd6-f56d-489...|
|  18489| 43186523025| 98.14878546968934|9008a98e-1b02-4de...|
|      3| 29566626791| 46.33087226118639|26b7574e-81c2-455...|
|  18490| 93558142492|232.83335268750145|2bda0665-796f-4f2...|
|      3| 32361057556| 87.34942171371054|633a7656-2fcc-4b8...|
|  18491| 64974914166|130.12601873970038|4bc15338-83eb-43d...|
+-------+------------+------------------+--------------------+
only showing top 10 rows


In [8]:
income_df.show(5)


+--------+---------------+------------------+----------------+
|POSTCODE|2021 Population|Median ($) 2021-22|Mean ($) 2021-22|
+--------+---------------+------------------+----------------+
|     800|        28507.0|           69920.0|         83232.0|
|     810|        57630.0|           66937.0|         76136.0|
|     812|        57630.0|           66937.0|         76136.0|
|     820|        43068.5|           68428.5|         79684.0|
|     822|        26735.0|           56840.4|         67317.8|
+--------+---------------+------------------+----------------+
only showing top 5 rows


In [9]:
employed_rate.show(5)

+-------------------+------+------+------+------+------+------+------+------+
|              Month|   NSW|   VIC|   QLD|    SA|    WA|   TAS|    NT|   ACT|
+-------------------+------+------+------+------+------+------+------+------+
|2021-01-01 00:00:00|60.988|62.132|61.688| 57.76|63.254|57.538|69.138|67.904|
|2021-02-01 00:00:00|61.466|62.325|61.868|57.832|62.932|58.095|69.698|68.784|
|2021-03-01 00:00:00|61.896|62.467|62.284|57.852|64.531|58.045|69.062|68.194|
|2021-04-01 00:00:00|61.469|62.511|62.173|58.808|64.168|57.768|69.624|68.718|
|2021-05-01 00:00:00| 62.28|62.927|62.583|59.194|64.375|57.936|70.001|68.845|
+-------------------+------+------+------+------+------+------+------+------+
only showing top 5 rows


In [10]:
crime_df.show(10)

+-----+----+-------+--------+----------+--------------+-----------+
|State|Year|Assault|Homicide|Kidnapping|Sexual assault|Crime_Index|
+-----+----+-------+--------+----------+--------------+-----------+
|  ACT|2021|  511.4|     2.7|       0.0|          75.4|      271.3|
|  ACT|2022|  538.7|     2.0|       0.7|          71.2|      284.1|
|  NSW|2021|  798.9|     1.0|       2.6|         141.8|      428.3|
|  NSW|2022|  852.0|     1.0|       2.6|         152.2|      456.9|
|   NT|2021| 3648.6|     5.2|       0.0|         143.1|     1854.0|
|   NT|2022| 4159.1|     6.0|       2.0|         144.4|     2109.8|
|  QLD|2021|    0.0|     1.6|       0.5|         132.1|       26.8|
|  QLD|2022|  983.3|     2.1|       0.8|         139.5|      520.0|
|   SA|2021| 1008.2|     1.9|       2.3|          98.9|      524.5|
|   SA|2022| 1083.8|     1.6|       2.3|          99.7|      562.4|
+-----+----+-------+--------+----------+--------------+-----------+
only showing top 10 rows


# Data Preprocessing

In [11]:
# Step 1: 合并三个 DataFrame
transactions_df = (
    p2_df.unionByName(p3_df)
         .unionByName(p4_df)
)

# Step 2: 检查结果
transactions_df.printSchema()
print(f"Total rows: {transactions_df.count()}")  
transactions_df.show(10, truncate=False)

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)

Total rows: 14195505
+-------+------------+------------------+------------------------------------+--------------+
|user_id|merchant_abn|dollar_value      |order_id                            |order_datetime|
+-------+------------+------------------+------------------------------------+--------------+
|18478  |62191208634 |63.255848959735246|949a63c8-29f7-4ab0-ada4-99ac50a88952|2021-08-20    |
|2      |15549624934 |130.3505283105634 |6a84c3cf-612a-4574-835b-144a47353eff|2021-08-20    |
|18479  |64403598239 |120.15860593212783|b10dcc33-e53f-4254-863c-de5266810cbc|2021-08-20    |
|3      |60956456424 |136.6785200286976 |0f09c5a5-784e-4477-b049-8ee4dd069b7b|2021-08-20    |
|18479  |94493496784 |72.96316578355305 |f6c78c1a-4600-4c5f-8e97-6e9eb534b586|2021-08-20    |
|3      |76819

                                                                                

# Final Data Merge (External Data)

In [12]:
# Step 1: 只保留 transactions_df 需要的列
trans_dates = transactions_df.select("order_id", "order_datetime")

# Step 2: join 到 preprocessed_data
preprocessed_data = (
    preprocessed_data
    .join(trans_dates, on="order_id", how="left")
)

# Step 3: 检查结果
preprocessed_data.show(10, truncate=False)

                                                                                

+------------------------------------+-------+------------+------------------+--------------+
|order_id                            |user_id|merchant_abn|dollar_value      |order_datetime|
+------------------------------------+-------+------------+------------------+--------------+
|0706747a-6bee-4a7a-bd74-291a5970b94f|6922   |31552582037 |2046.7432731429228|2022-01-15    |
|45eee78a-0588-44f4-b0dd-3fafce69695c|22754  |75089928159 |91.30050328265806 |2021-06-17    |
|5d08f546-86bf-4d50-98dc-dc68cd3b2e87|21009  |70620117107 |50.27871237819427 |2022-08-29    |
|6e95547e-92be-4cb5-bec5-faa86e6b0d27|21853  |95431176007 |197.19276364316767|2022-01-15    |
|9e18b913-0465-4fd4-92fd-66d15e65d93c|18485  |62191208634 |79.13140006851712 |2021-02-28    |
|a8db208b-2ecf-4b04-8e0b-2dd86df7762b|2881   |42500153308 |20.169467251919265|2022-04-28    |
|ef4090cb-9514-4d2c-bc01-1ee8d22a182e|14003  |98545158925 |21.81418329985703 |2022-04-28    |
|1b237258-3f9a-4058-8e5e-8651c628cd35|2883   |94493496784 |8

In [13]:
### Add the coloumn of postcode/state/gender
# Join user_id with consumer_id to enrich transaction data
df_with_consumer = (
    preprocessed_data
    .join(consumer_details, on="user_id", how="left")
)

# Join consumer_id with the cleaned consumer demographic table
df_enriched = (
    df_with_consumer
    .join(new_tbl_consumer, on="consumer_id", how="left")
)

# Select only the required fields for the final dataset
df_enriched_selected = df_enriched.select(
    "user_id",
    "consumer_id",
    "merchant_abn",
    "order_id",
    "dollar_value",
    "state",
    "postcode",
    "gender",
    "order_datetime"
)

# Preview first 10 rows of the enriched dataset 
df_enriched_selected.show(10, truncate=False)



+-------+-----------+------------+------------------------------------+------------------+-----+--------+------+--------------+
|user_id|consumer_id|merchant_abn|order_id                            |dollar_value      |state|postcode|gender|order_datetime|
+-------+-----------+------------+------------------------------------+------------------+-----+--------+------+--------------+
|2883   |75264      |94493496784 |1b237258-3f9a-4058-8e5e-8651c628cd35|86.12019907563348 |VIC  |3123    |Female|2022-04-28    |
|18519  |147657     |58741105428 |7f623ae1-5373-4a26-926d-8df335c2bec5|43.31490194582448 |NSW  |2218    |Male  |2021-11-05    |
|2881   |295586     |42500153308 |a8db208b-2ecf-4b04-8e0b-2dd86df7762b|20.169467251919265|TAS  |7017    |Male  |2022-04-28    |
|2881   |295586     |72177236569 |99e6f91e-c60b-4812-a6fa-4f065f54478c|273.0718934208491 |TAS  |7017    |Male  |2022-04-28    |
|18520  |462439     |45572698303 |a5c3b763-0b13-485f-98ce-09df3f97c2a9|86.13964260098399 |VIC  |3719    

                                                                                

In [14]:
### Insert the column with employed rate

# Step 1: reshape employed_rate from wide to long format
employed_long = (
    employed_rate
    .selectExpr("Month", "stack(8, 'NSW', NSW, 'VIC', VIC, 'QLD', QLD, 'SA', SA, 'WA', WA, 'TAS', TAS, 'NT', NT, 'ACT', ACT) as (state, employed_rate)")
)

# Step 2: 对交易数据生成 Year-Month
df_enriched_selected = df_enriched_selected.withColumn(
    "year_month",
    F.date_format("order_datetime", "yyyy-MM")
)

# Step 3: join
df_with_employed = (
    df_enriched_selected
    .join(employed_long.withColumnRenamed("Month", "year_month"), 
          on=["year_month", "state"], how="left")
)

# Step 4: 预览
df_with_employed.show(20, truncate=False)




+----------+-----+-------+-----------+------------+------------------------------------+------------------+--------+-----------+--------------+-------------+
|year_month|state|user_id|consumer_id|merchant_abn|order_id                            |dollar_value      |postcode|gender     |order_datetime|employed_rate|
+----------+-----+-------+-----------+------------+------------------------------------+------------------+--------+-----------+--------------+-------------+
|2022-04   |VIC  |2883   |75264      |94493496784 |1b237258-3f9a-4058-8e5e-8651c628cd35|86.12019907563348 |3123    |Female     |2022-04-28    |64.232       |
|2021-11   |NSW  |18519  |147657     |58741105428 |7f623ae1-5373-4a26-926d-8df335c2bec5|43.31490194582448 |2218    |Male       |2021-11-05    |61.502       |
|2021-08   |WA   |16800  |186459     |91923722701 |e084f4c9-8829-4ea6-9a98-8050a556a520|5.971913169283436 |6964    |Undisclosed|2021-08-05    |65.389       |
|2021-08   |WA   |16800  |186459     |76767266140 |a

                                                                                

In [15]:
### Insert the column with income
# 规范列名 + 类型（去掉空格/符号，转成小写）
income_df = (
    income_df
    .withColumnRenamed("POSTCODE", "postcode")
    .withColumnRenamed("2021 Population", "pop")
    .withColumnRenamed("Median ($) 2021-22", "median_income")
    .withColumnRenamed("Mean ($) 2021-22",   "mean_income")
    # 如果读进来是字符串，做一次安全的数值转换（去逗号）
    .withColumn("postcode", F.regexp_replace(F.col("postcode").cast("string"), r"\s+", "").cast("int"))
    .withColumn("pop", F.regexp_replace(F.col("pop").cast("string"), r"[,\s]", "").cast("double"))
    .withColumn("median_income", F.regexp_replace(F.col("median_income").cast("string"), r"[,\s]", "").cast("double"))
    .withColumn("mean_income",   F.regexp_replace(F.col("mean_income").cast("string"), r"[,\s]", "").cast("double"))
    .dropna(subset=["postcode"])       # 必要时：去掉没有邮编的行
)

# 若同一邮编在文件中出现多行，取一个稳定聚合（这里取最大人口那行）
w = F.row_number().over(Window.partitionBy("postcode").orderBy(F.col("pop").desc_nulls_last()))
income_df = income_df.withColumn("rn", w).filter("rn = 1").drop("rn")

df_with_income = (
    df_enriched_selected
    .join(income_df, on="postcode", how="left")
)

In [16]:
df_with_income.show(10)



+--------+-------+-----------+------------+--------------------+------------------+-----+------+--------------+----------+--------+-------------+-----------+
|postcode|user_id|consumer_id|merchant_abn|            order_id|      dollar_value|state|gender|order_datetime|year_month|     pop|median_income|mean_income|
+--------+-------+-----------+------------+--------------------+------------------+-----+------+--------------+----------+--------+-------------+-----------+
|    3123|   2883|      75264| 94493496784|1b237258-3f9a-405...| 86.12019907563348|  VIC|Female|    2022-04-28|   2022-04|169789.0|      65532.0|   114114.0|
|    2218|  18519|     147657| 58741105428|7f623ae1-5373-4a2...| 43.31490194582448|  NSW|  Male|    2021-11-05|   2021-11|139453.5|      52712.5|    68200.0|
|    7017|   2881|     295586| 42500153308|a8db208b-2ecf-4b0...|20.169467251919265|  TAS|  Male|    2022-04-28|   2022-04| 27829.0|     52017.25|    58035.5|
|    7017|   2881|     295586| 72177236569|99e6f91e-

                                                                                

In [17]:
### Insert the column with crime rate
# Step 1: 给交易数据加 year 字段
df_with_income = df_with_income.withColumn(
    "year", F.year("order_datetime")
)

# Step 2: 统一 state 列名
crime_df = crime_df.withColumnRenamed("State", "state")

# Step 3: join
df_with_crime = (
    df_with_income
    .join(crime_df, on=["state", "year"], how="left")
)

# Step 4: 查看结果
df_with_crime.show(10, truncate=False)



+-----+----+--------+-------+-----------+------------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+
|state|year|postcode|user_id|consumer_id|merchant_abn|order_id                            |dollar_value      |gender|order_datetime|year_month|pop     |median_income|mean_income|Assault|Homicide|Kidnapping|Sexual assault|Crime_Index|
+-----+----+--------+-------+-----------+------------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+
|VIC  |2022|3123    |2883   |75264      |94493496784 |1b237258-3f9a-4058-8e5e-8651c628cd35|86.12019907563348 |Female|2022-04-28    |2022-04   |169789.0|65532.0      |114114.0   |0.0    |1.3     |2.7       |100.9         |20.7       |
|NSW  |2021|2218    |18519  |147657     |58741105428 |7f623ae1-5

                                                                                

### Join with consumer_profile

In [18]:
# --- Join with Consumer Level ---
df_with_consumer = (
    df_with_crime
    .join(consumer_level, on="consumer_id", how="left")
)


In [19]:
# 预览
df_with_consumer.show(10, truncate=False)



+-----------+-----+----+--------+-------+------------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+---------+------------------+-------+
|consumer_id|state|year|postcode|user_id|merchant_abn|order_id                            |dollar_value      |gender|order_datetime|year_month|pop     |median_income|mean_income|Assault|Homicide|Kidnapping|Sexual assault|Crime_Index|c_user_id|c_fraud_prob      |c_state|
+-----------+-----+----+--------+-------+------------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+---------+------------------+-------+
|75264      |VIC  |2022|3123    |2883   |94493496784 |1b237258-3f9a-4058-8e5e-8651c628cd35|86.12019907563348 |Female|2022-04-28    |2022-04   |169789.0|65532.0      |114114.0   |0.0    |1

                                                                                

### Join with merrcant_profile

In [20]:
# --- Join with Merchant Level ---
df_with_merchant = (
    df_with_consumer
    .join(merchant_level, on="merchant_abn", how="left")
)



In [21]:
from pyspark.sql import functions as F

# 合并两个 order_date 列
df_final = (
    df_with_merchant
    .withColumn("order_date", F.coalesce(F.col("order_date"), F.col("order_datetime")))
    .drop("order_datetime")   # 删除多余列，避免重复
)



In [22]:
# 预览
df_with_merchant.show(10, truncate=False)

print("原始交易数量:", df_with_consumer.count())
print("合并后交易数量:", df_with_merchant.count())

25/09/12 15:03:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------------+-----------+-----+----+--------+-------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+---------+------------------+-------+----------+-----------------+------------------------------+-----------------+
|merchant_abn|consumer_id|state|year|postcode|user_id|order_id                            |dollar_value      |gender|order_datetime|year_month|pop     |median_income|mean_income|Assault|Homicide|Kidnapping|Sexual assault|Crime_Index|c_user_id|c_fraud_prob      |c_state|order_date|m_fraud_prob     |m_name                        |m_category       |
+------------+-----------+-----+----+--------+-------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+---------+------------------+-------+----------+-----------------+---

                                                                                

原始交易数量: 14195505


[Stage 208:>                                                        (0 + 8) / 9]

合并后交易数量: 14494630


                                                                                

In [23]:
# 删除重复列
df_cleaned = df_with_merchant.drop(
    "c_user_id",     # 和 user_id 重复
    "c_gender",      # 和 gender 重复
    "c_state",       # 和 state 重复
    "c_postcode",    # 和 postcode 重复
    "order_date"     # 和 order_datetime 重复
)

# 预览清理后的结果
df_cleaned.printSchema()
df_cleaned.show(10, truncate=False)

root
 |-- merchant_abn: long (nullable = true)
 |-- consumer_id: long (nullable = true)
 |-- state: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_id: string (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- year_month: string (nullable = true)
 |-- pop: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- mean_income: double (nullable = true)
 |-- Assault: double (nullable = true)
 |-- Homicide: double (nullable = true)
 |-- Kidnapping: double (nullable = true)
 |-- Sexual assault: double (nullable = true)
 |-- Crime_Index: double (nullable = true)
 |-- c_fraud_prob: double (nullable = true)
 |-- m_fraud_prob: double (nullable = true)
 |-- m_name: string (nullable = true)
 |-- m_category: string (nullable = true)





+------------+-----------+-----+----+--------+-------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+------------------+-----------------+------------------------------+-----------------+
|merchant_abn|consumer_id|state|year|postcode|user_id|order_id                            |dollar_value      |gender|order_datetime|year_month|pop     |median_income|mean_income|Assault|Homicide|Kidnapping|Sexual assault|Crime_Index|c_fraud_prob      |m_fraud_prob     |m_name                        |m_category       |
+------------+-----------+-----+----+--------+-------+------------------------------------+------------------+------+--------------+----------+--------+-------------+-----------+-------+--------+----------+--------------+-----------+------------------+-----------------+------------------------------+-----------------+
|94493496784 |75264      |VIC  |2022|312

                                                                                

In [24]:
output_path = "initial_transaction.parquet"

df_with_merchant.write.mode("overwrite").parquet(output_path)

print(f"Data saved as Parquet at {output_path}")



Data saved as Parquet at initial_transaction.parquet


                                                                                