# Data Transformation

This notebook involves the One-Hot Encoding on categorical data.

---

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import* 
from pyspark.sql.functions import split
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import col, when, split, udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.linalg import DenseVector, SparseVector

In [2]:
spark = (
    SparkSession.builder.appName("Transformation")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.driver.memory", "4G")
    .config("spark.executor.memory", "4G")
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.sql.debug.maxToStringFields", 200)
    .getOrCreate()
)

24/09/11 23:30:27 WARN Utils: Your hostname, Cocos-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.16.33.67 instead (on interface en0)
24/09/11 23:30:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/11 23:30:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/11 23:30:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/11 23:30:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/09/11 23:30:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


# Read datasets

In [3]:
# read datasets
consumer_full = spark.read.parquet('../data/curated/consumer_full')
merchant_full = spark.read.parquet('../data/curated/merchant_full')

                                                                                

In [4]:
consumer_full.show(5)

                                                                                

+--------+-------+--------------+-----------+---------------+-----+------+------------+------------------+--------------------+-----------------+--------------------+------------------+------------------+-----------------------------+---------------------------+------------------+-------------------------+----------------------------+-------------------------+----------------------+-----------------+
|postcode|user_id|order_datetime|consumer_id|           name|state|gender|merchant_abn|      dollar_value|            order_id|fraud_probability|            SA2_name|average_population|Median_age_persons|Median_mortgage_repay_monthly|Median_tot_prsnl_inc_weekly|Median_rent_weekly|Median_tot_fam_inc_weekly|Average_num_psns_per_bedroom|Median_tot_hhd_inc_weekly|Average_household_size|unemployment_rate|
+--------+-------+--------------+-----------+---------------+-----+------+------------+------------------+--------------------+-----------------+--------------------+------------------+-------

In [5]:
consumer_full.printSchema()

root
 |-- postcode: integer (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- consumer_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- SA2_name: string (nullable = true)
 |-- average_population: double (nullable = true)
 |-- Median_age_persons: long (nullable = true)
 |-- Median_mortgage_repay_monthly: long (nullable = true)
 |-- Median_tot_prsnl_inc_weekly: long (nullable = true)
 |-- Median_rent_weekly: long (nullable = true)
 |-- Median_tot_fam_inc_weekly: long (nullable = true)
 |-- Average_num_psns_per_bedroom: double (nullable = true)
 |-- Median_tot_hhd_inc_weekly: long (nullable = true)
 |-- Average_household_size: double (nullable = true)
 |-- unemployment_ra

In [6]:
merchant_full.show(5)

+------------+--------------+-------+------------------+--------------------+--------------------+--------------------+-------------+---------+-----------------+-------------------+
|merchant_abn|order_datetime|user_id|      dollar_value|            order_id|                name|            category|revenue_level|take_rate|fraud_probability|transaction_revenue|
+------------+--------------+-------+------------------+--------------------+--------------------+--------------------+-------------+---------+-----------------+-------------------+
| 63290521567|    2022-08-25|  11138|41.239626303220014|09f6132a-a6a8-47a...|vehicula pellente...|artist supply and...|            a|     6.48|             NULL|  38.56729713438449|
| 63465140133|    2022-08-25|  22285| 8.815286645396842|f4fc1a3b-a5b1-4d7...|  vitae odio limited|digital goods: bo...|            b|     3.68|             NULL|  8.490884069944117|
| 21359184622|    2022-08-25|  11139| 66.46301590693417|b1a52524-b6e6-42a...|         sit 

In [7]:
merchant_full.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- user_id: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- revenue_level: string (nullable = true)
 |-- take_rate: float (nullable = true)
 |-- fraud_probability: double (nullable = true)
 |-- transaction_revenue: double (nullable = true)



# Consumer data transformation

# Merchant data transformation

Select helpful columons:

In [8]:
merchant_full = merchant_full.drop('user_id', 'order_id', 'name')
merchant_full.show(5)

+------------+--------------+------------------+--------------------+-------------+---------+-----------------+-------------------+
|merchant_abn|order_datetime|      dollar_value|            category|revenue_level|take_rate|fraud_probability|transaction_revenue|
+------------+--------------+------------------+--------------------+-------------+---------+-----------------+-------------------+
| 63290521567|    2022-08-25|41.239626303220014|artist supply and...|            a|     6.48|             NULL|  38.56729713438449|
| 63465140133|    2022-08-25| 8.815286645396842|digital goods: bo...|            b|     3.68|             NULL|  8.490884069944117|
| 21359184622|    2022-08-25| 66.46301590693417|motor vehicle sup...|            b|      3.6|             NULL|  64.07034834842969|
| 75034515922|    2022-08-25|19.654995598411727|digital goods: bo...|            a|     6.22|             NULL| 18.432454632261372|
| 96334476428|    2022-08-25| 6.383881161736656|bicycle shops - s...|       

### Apply One-Hot Encoding on `revenue_level` column

In [9]:
revenue_levels = merchant_full.select("revenue_level").distinct().rdd.flatMap(lambda x: x).collect()

for level in revenue_levels:
    merchant_full = merchant_full.withColumn(f"revenue_level_{level}", when(col("revenue_level") == level, 1).otherwise(0))

merchant_full_expanded = merchant_full.drop("revenue_level")
merchant_full_expanded.show(5)

                                                                                

+------------+--------------+------------------+--------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+
|merchant_abn|order_datetime|      dollar_value|            category|take_rate|fraud_probability|transaction_revenue|revenue_level_e|revenue_level_d|revenue_level_c|revenue_level_b|revenue_level_a|
+------------+--------------+------------------+--------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+
| 63290521567|    2022-08-25|41.239626303220014|artist supply and...|     6.48|             NULL|  38.56729713438449|              0|              0|              0|              0|              1|
| 63465140133|    2022-08-25| 8.815286645396842|digital goods: bo...|     3.68|             NULL|  8.490884069944117|              0|              0|              0|              1|              0|
| 21359184

### Apply One-Hot Encoding on `category` column

In [10]:
# collect distinct words and create a dictionary with indices
distinct_words = merchant_full_expanded.selectExpr("explode(split(category, ' ')) as word")\
                                       .distinct()\
                                       .rdd.flatMap(lambda x: x)\
                                       .collect()
word_index = {word: idx for idx, word in enumerate(distinct_words)}

# create a dictionary mapping indices to words
index_word = {idx: word for word, idx in word_index.items()}

# convert the `word_index` dictionary to a broadcast variable
word_index_broadcast = spark.sparkContext.broadcast(word_index)

                                                                                

Define a funciton to convert a list of words into a binary vector based on a predefined word index dictionary:

In [11]:
def words_to_binary_vector(words):
    # retrieve the broadcasted `word_index` dictionary which maps words 
    # to their respective indices
    word_index = word_index_broadcast.value
    
    # initialize a binary vector with zeros, with length equal 
    # to the size of the `word_index` dictionary
    vector = [0] * len(word_index)
    
    # iterate over each word in the input list
    for word in words:
        # if the word is in the `word_index` dictionary, set the corresponding 
        # index in the vector to 1
        if word in word_index:
            vector[word_index[word]] = 1
    
    return vector

words_to_binary_vector_udf = udf(words_to_binary_vector, ArrayType(IntegerType()))

In [12]:
# split `category` into words
merchant_full_expanded = merchant_full_expanded.withColumn("category_words", 
                                                           split(col("category"), " "))

# apply the UDF to get the binary vector
merchant_full_expanded = merchant_full_expanded.withColumn("category_binary", 
                                                           words_to_binary_vector_udf(col("category_words")))

# drop the original columns
merchant_full_expanded = merchant_full_expanded.drop("category", "category_words")

# show the result
merchant_full_expanded.show(5, truncate=False)

+------------+--------------+------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|merchant_abn|order_datetime|dollar_value      |take_rate|fraud_probability|transaction_revenue|revenue_level_e|revenue_level_d|revenue_level_c|revenue_level_b|revenue_level_a|category_binary                                                                                                                                                                                                                                                                                                         |
+------------+------

In [13]:
sample_vector = merchant_full_expanded.select("category_binary").first()[0]
count_sub_category = len(sample_vector)
print(f'Number of vectors in category_binary:', count_sub_category)

Number of vectors in category_binary: 104


In [14]:
# define a UDF to extract values from the vector (both Dense and Sparse vectors)
def extract_vector_values(vector):
    if vector is None:
        return [0] * count_sub_category
    if isinstance(vector, DenseVector):
        return vector.toArray().tolist()
    elif isinstance(vector, SparseVector):
        return vector.toArray().tolist()
    else:
        return [0] * count_sub_category

# register the UDF
extract_vector_values_udf = udf(extract_vector_values, ArrayType(IntegerType()))

# add a new column with the extracted values
merchant_full_expanded = merchant_full_expanded.withColumn("category_binary_array", 
                                                           extract_vector_values_udf(col("category_binary")))

# split the array into separate columns with specific category names
for idx, word in index_word.items():
    merchant_full_expanded = merchant_full_expanded.withColumn(f"category_{word}", 
                                                               col("category_binary_array")[idx])

# drop the temporary columns
merchant_full_expanded = merchant_full_expanded.drop("category_binary", 
                                                     "category_binary_array")

# show the result
merchant_full_expanded.show(5, truncate=False)

+------------+--------------+------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+----------------+---------------+------------+--------------+-------------------+----------------+---------------+---------------+---------------+-----------------+---------------+----------------+-------------------+-------------------+------------+----------------+-----------------+-------------+-------------+-------------+----------------+---------------+-------------+--------------+-----------------+---------------+--------------------+--------------------+--------------+--------------+-----------------+-------------------+--------------+-------------------+--------------+--------------+---------------------+--------------+------------------+---------------+-------------------+-----------------+--------------+---------------+-------------------+-------------+--------------+----------+-------------------+-----

### Apply One-Hot Encoding on `order_datetime` column

In [15]:
# extract year, month, date, and day of the week as new features
merchant_full_expanded = merchant_full_expanded.withColumn("year", year("order_datetime"))
merchant_full_expanded = merchant_full_expanded.withColumn("month", month("order_datetime"))
merchant_full_expanded = merchant_full_expanded.withColumn("day", dayofmonth("order_datetime"))

# drop original column
merchant_full_expanded = merchant_full_expanded.drop("order_datetime")
merchant_full_expanded.show(5)

+------------+------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+----------------+---------------+------------+--------------+-------------------+----------------+---------------+---------------+---------------+-----------------+---------------+----------------+-------------------+-------------------+------------+----------------+-----------------+-------------+-------------+-------------+----------------+---------------+-------------+--------------+-----------------+---------------+--------------------+--------------------+--------------+--------------+-----------------+-------------------+--------------+-------------------+--------------+--------------+---------------------+--------------+------------------+---------------+-------------------+-----------------+--------------+---------------+-------------------+-------------+--------------+----------+-------------------+--------------+-----

In [16]:
# One-Hot encode the `year` column
years = merchant_full_expanded.select("year").distinct().rdd.flatMap(lambda x: x).collect()
for year in years:
    merchant_full_expanded = merchant_full_expanded.withColumn(f"year_{year}", 
                                                               when(col("year") == year, 1)\
                                                               .otherwise(0))

# One-Hot encode the `month` column
months = merchant_full_expanded.select("month").distinct().rdd.flatMap(lambda x: x).collect()
for month in months:
    merchant_full_expanded = merchant_full_expanded.withColumn(f"month_{month}", 
                                                               when(col("month") == month, 1)\
                                                               .otherwise(0))

# One-Hot encode the `day` column
days = merchant_full_expanded.select("day").distinct().rdd.flatMap(lambda x: x).collect()
for day in days:
    merchant_full_expanded = merchant_full_expanded.withColumn(f"day_{day}", 
                                                               when(col("day") == day, 1)\
                                                               .otherwise(0))

# drop original columns
merchant_full_expanded = merchant_full_expanded.drop("year", "month", "day")
merchant_full_expanded.show(5)

+------------+------------------+---------+-----------------+-------------------+---------------+---------------+---------------+---------------+---------------+----------------+---------------+------------+--------------+-------------------+----------------+---------------+---------------+---------------+-----------------+---------------+----------------+-------------------+-------------------+------------+----------------+-----------------+-------------+-------------+-------------+----------------+---------------+-------------+--------------+-----------------+---------------+--------------------+--------------------+--------------+--------------+-----------------+-------------------+--------------+-------------------+--------------+--------------+---------------------+--------------+------------------+---------------+-------------------+-----------------+--------------+---------------+-------------------+-------------+--------------+----------+-------------------+--------------+-----

### Check the shape of transformed dataset

In [17]:
num_rows = merchant_full_expanded.count()
print(f"Number of rows: {num_rows}")

num_columns = len(merchant_full_expanded.columns)
print(f"Number of columns: {num_columns}")

Number of rows: 13614675
Number of columns: 159


In [18]:
# save as a parquet file
merchant_full_expanded.write.parquet('../data/curated/merchant_full_expanded', mode='overwrite')

                                                                                