# Billups challenge

## Setup Spark

In [1]:
from pyspark.sql import SparkSession



def create_spark_session():
    spark = SparkSession.builder.master(
        "local[*]"
    ).appName(
        'Billups Challenge Task'
    ).config(
        "spark.driver.memory",
        "8g"
    ).getOrCreate()

    return spark

spark = create_spark_session()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/23 03:48:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read  and clean the data

In [2]:
historical_transactions = spark.read.parquet("sample_data/historical_transactions.parquet")
historical_transactions.show()

                                                                                

+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+---------------+-------------------+--------+------------+
|authorized_flag|    customer_id|city_id|installments|category|merchant_category_id|    merchant_id|month_lag|purchase_amount|      purchase_date|state_id|subsector_id|
+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+---------------+-------------------+--------+------------+
|              N|C_ID_2730b57487|     63|           3|       C|                 456|M_ID_fe83cec110|       -9|    -0.05568825|2017-05-19 16:01:59|       9|          21|
|              Y|C_ID_95429da153|     38|           3|       C|                 884|M_ID_9f11c207c1|       -2|    -0.52166038|2017-12-07 22:33:34|       7|          27|
|              Y|C_ID_cdc7680c7f|    160|           1|       B|                 278|M_ID_ac78c7d82a|       -3|    -0.47319988|2017-11-17 00:00:00|      21|

In [3]:
merchants = spark.read.format('csv').options(header="true").load('sample_data/merchants.csv')
merchants.show()


+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+---------------+-------------------+--------+------------+
|authorized_flag|    customer_id|city_id|installments|category|merchant_category_id|    merchant_id|month_lag|purchase_amount|      purchase_date|state_id|subsector_id|
+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+---------------+-------------------+--------+------------+
|              N|C_ID_2730b57487|     63|           3|       C|                 456|M_ID_fe83cec110|       -9|    -0.05568825|2017-05-19 16:01:59|       9|          21|
|              Y|C_ID_95429da153|     38|           3|       C|                 884|M_ID_9f11c207c1|       -2|    -0.52166038|2017-12-07 22:33:34|       7|          27|
|              Y|C_ID_cdc7680c7f|    160|           1|       B|                 278|M_ID_ac78c7d82a|       -3|    -0.47319988|2017-11-17 00:00:00|      21|

##### looks simular to historical transactions...

In [4]:
merchants = spark.read.format('csv').options(header="true").schema(historical_transactions.schema).load('sample_data/merchants.csv')

print(historical_transactions.subtract(merchants).count() == merchants.subtract(historical_transactions).count() == 0)
# in our case we can check on driver, datasets are pretty small
# historical_transactions.collect() == merchants.collect()




True


                                                                                

##### so datasets are same and we will use historical_transactions only

In [5]:
historical_transactions = historical_transactions.cache()

In [6]:
historical_transactions.describe(['purchase_amount']).show()
# negative amount



+-------+-------------------+
|summary|    purchase_amount|
+-------+-------------------+
|  count|            7274367|
|   mean|-0.5843336382794599|
| stddev|0.43351548756163666|
|    min|         -0.7469078|
|    max|         9.99728856|
+-------+-------------------+



                                                                                

In [7]:
from pyspark.sql import functions as F

historical_transactions = historical_transactions.withColumn(
    # arguable suggestion, it looks simular to some logarithmic scale, but at least now it became positive :) 
    'positive_purchase', F.col("purchase_amount") + 1
)


## 1. The top 5 merchants by sales in each city, in each month

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

historical_transactions.withColumn(
    'Month', 
    F.date_format(F.col("purchase_date"), "MMM yyyy")
).groupby(
    'Month', "city_id", "merchant_id"
).agg(
    F.sum('positive_purchase').alias("saleAmount"), # to integer?
    F.count(F.lit(1)).alias("numberOfSales")
).withColumn(
    "merchant_rank",
    F.row_number().over(
        Window.partitionBy(
            "Month", "city_id"
        ).orderBy(
            F.col("saleAmount").desc()
        )
    )
).where(
    F.col("merchant_rank") <= 5
).select(
    "Month", 
    F.col("merchant_id").alias("merchantId"),
    F.col("city_id").alias("cityId"),
    "saleAmount",
    "numberOfSales"
).show()



+--------+---------------+------+------------------+-------------+
|   Month|     merchantId|cityId|        saleAmount|numberOfSales|
+--------+---------------+------+------------------+-------------+
|Apr 2017|M_ID_349f038eea|     2|        9.77860203|           14|
|Apr 2017|M_ID_8791980f16|     2| 6.170373920000001|           11|
|Apr 2017|M_ID_34f2fcb458|     2|5.3071863299999995|            4|
|Apr 2017|M_ID_a04bff97c8|     2|        4.11490574|            1|
|Apr 2017|M_ID_2720229b07|     2|        3.71118006|           11|
|Apr 2017|M_ID_ae83af31af|     8|20.340697100000003|           44|
|Apr 2017|M_ID_ae0de9385a|     8|15.230343069999996|           40|
|Apr 2017|M_ID_61f59b165b|     8|15.224228940000001|           20|
|Apr 2017|M_ID_3d1a48e71e|     8|       13.33592666|           49|
|Apr 2017|M_ID_12843094d6|     8|       11.45171563|           39|
|Apr 2017|M_ID_48257bb851|    17| 174.0177576099998|          574|
|Apr 2017|M_ID_26d4fadb60|    17|129.79474337999983|          

                                                                                

## 2. The average sale amount of each merchant in each state

In [9]:
historical_transactions.groupBy(
    "merchant_id",
    "state_id"
).agg(
    F.mean("positive_purchase").alias("avgAmount")
).select(
    F.col("merchant_id").alias("merchantId"),
    "avgAmount",
    F.col("state_id").alias("stateId")
).show()



+---------------+-------------------+-------+
|     merchantId|          avgAmount|stateId|
+---------------+-------------------+-------+
|M_ID_ddfd5234bb| 0.3670768831168832|      4|
|M_ID_894b777cc9| 0.3238014593999998|      9|
|M_ID_dfbb556114| 0.3835793391176471|     15|
|M_ID_5f4863e3d5| 0.2895869300840335|      1|
|M_ID_df6d3065f3| 0.3698593816552892|     16|
|M_ID_d7eeaa006a|  0.280402174047619|     19|
|M_ID_40df701416| 0.7433209137815124|     16|
|M_ID_826fb643fc| 0.3574469740579711|      7|
|M_ID_00110f730e|       0.5004784768|     15|
|M_ID_e9240f6448| 0.3384881701741294|     19|
|M_ID_fee99f4a54|        0.309441625|      2|
|M_ID_722c5dd184|0.31660881424242415|     15|
|M_ID_c0744e8f37|0.32202561924528306|     20|
|M_ID_b1a369bf52|        0.298678885|      7|
|M_ID_48ac0f3e32| 0.6396162504705881|     16|
|M_ID_170ad00a9c| 0.3704056037223587|     22|
|M_ID_412a00d1f8|0.26550663953618003|     21|
|M_ID_fc0f7b1282|0.44591240365853657|     13|
|M_ID_87afdfa535|0.466452894333333

                                                                                

## 3. Top 3 hours with the highest amount of sales, by product category

In [10]:
historical_transactions.withColumn(
    "hour",
    F.hour("purchase_date")
).withColumn(
    "hour",
    F.concat(F.format_string("%02d", "hour"), F.lit("00"))
).groupBy(
    "hour",
    "category"
).agg(
    F.sum("positive_purchase").alias("sales")
).withColumn(
    "rank",
    F.row_number().over(
        Window.partitionBy(
            "category"
        ).orderBy(
            F.col("sales").desc()
        )
    )
).where(
    F.col("rank") <= 3
).select(
    F.col("category").alias("categoryId"),
    "hour"
).show()

# maybe filter out nulls?



+----------+----+
|categoryId|hour|
+----------+----+
|      null|0000|
|      null|1300|
|      null|1400|
|         A|1200|
|         A|1300|
|         A|1700|
|         B|1300|
|         B|1400|
|         B|1500|
|         C|1700|
|         C|1600|
|         C|1500|
+----------+----+



                                                                                

## 4. In what cities are the most popular merchants located? Is there a correlation between where location(city id) and the category the merchant is in?

In [11]:
# what is definition of merchant popularity? is it the amount of sale? or number of sales? 
# or number of distinct customers? let me use the last one 


# most of merchant are working in few cities, so we cant say that merchants are located somewhere 
# without additional info.
popular_merchants = historical_transactions.groupBy(
    "merchant_id"
).agg(
    F.collect_set("city_id").alias("cities"),
    F.collect_set("category").alias("categories"),
    # or use F.approx_count_distinct to avoid possible OOM
    F.size(F.collect_set("customer_id")).alias("merchant_unique_customers_count")
).withColumn(
    "popularity_rank",
    # dataframe has size of merchants count, dont expect to be too large to OOM
    F.row_number().over(
        Window.orderBy(
            F.col("merchant_unique_customers_count").desc()
        )
    )
# take 10 most popular merchants
).where(
    F.col("popularity_rank") <= 10
)




popular_merchants.show()

22/08/23 03:49:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------------+--------------------+----------+-------------------------------+---------------+
|    merchant_id|              cities|categories|merchant_unique_customers_count|popularity_rank|
+---------------+--------------------+----------+-------------------------------+---------------+
|M_ID_00a6ca8a8a|             [1, 69]| [C, B, 

                                                                                

In [12]:
# cities with number of 10 most popular merchants working there

from collections import Counter
from itertools import chain

cities_lists = [row.cities for row in popular_merchants.select("cities").collect()]
Counter(chain(*cities_lists)).most_common()

22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




22/08/23 03:49:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/08/23 03:49:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

[(1, 8),
 (69, 8),
 (291, 5),
 (213, 4),
 (188, 4),
 (115, 4),
 (233, 4),
 (333, 4),
 (283, 4),
 (279, 4),
 (250, 4),
 (19, 4),
 (244, 4),
 (272, 4),
 (339, 4),
 (231, 4),
 (106, 3),
 (48, 3),
 (163, 3),
 (159, 3),
 (138, 3),
 (109, 3),
 (274, 3),
 (158, 3),
 (295, 3),
 (183, 3),
 (88, 3),
 (3, 3),
 (149, 3),
 (314, 3),
 (143, 3),
 (329, 3),
 (96, 3),
 (344, 3),
 (340, 3),
 (276, 3),
 (322, 3),
 (160, 3),
 (116, 3),
 (98, 2),
 (23, 2),
 (292, 2),
 (271, 2),
 (242, 2),
 (209, 2),
 (303, 2),
 (174, 2),
 (170, 2),
 (318, 2),
 (302, 2),
 (173, 2),
 (140, 2),
 (111, 2),
 (107, 2),
 (57, 2),
 (7, 2),
 (155, 2),
 (299, 2),
 (139, 2),
 (266, 2),
 (87, 2),
 (83, 2),
 (331, 2),
 (21, 2),
 (327, 2),
 (298, 2),
 (117, 2),
 (265, 2),
 (186, 2),
 (53, 2),
 (24, 2),
 (330, 2),
 (78, 2),
 (49, 2),
 (20, 2),
 (297, 2),
 (197, 2),
 (168, 2),
 (60, 2),
 (294, 2),
 (63, 2),
 (199, 2),
 (191, 2),
 (62, 2),
 (189, 2),
 (131, 2),
 (306, 1),
 (277, 1),
 (256, 1),
 (206, 1),
 (156, 1),
 (148, 1),
 (77, 1),
 (3

In [13]:
# this may be computed in different ways per (city, category) pair:
# a) merchants count
# b) sales count
# c) sales amount
# etc.

historical_transactions.select(
    "merchant_id",
    "city_id",
    "category"
).distinct(
).groupBy(
    "city_id",
    "category"
).agg(
    F.count(F.lit(1)).alias("merchants_cnt")
).orderBy(
    F.col("merchants_cnt").desc()
).show()



+-------+--------+-------------+
|city_id|category|merchants_cnt|
+-------+--------+-------------+
|     69|       A|        16590|
|     69|       B|        15468|
|     19|       A|         9672|
|    331|       A|         8207|
|     19|       B|         7934|
|    333|       A|         6772|
|    331|       B|         6747|
|      1|       B|         6701|
|      1|       A|         6691|
|    158|       A|         6663|
|    158|       B|         6131|
|    333|       B|         5447|
|     69|       C|         5325|
|     17|       A|         5144|
|     57|       A|         4518|
|     88|       A|         4443|
|     17|       B|         4302|
|    143|       A|         4059|
|    117|       A|         4058|
|     88|       B|         3817|
+-------+--------+-------------+
only showing top 20 rows



                                                                                

## 5. If a new merchant were to want to set up shop in a city:
a. Where would you advise them go to and why?

b. Which categories of products would you recommend they sell

c. What time of year would you advise them to start

d. When would you recommend they open and close

e. Would you advice them to accept payment in installments, assume a credit
default rate of 22.9% per month (**did not understand what is wanted here**)


In [14]:

# try to guess open/close hours
historical_transactions.withColumn(
    "open_hour",
    F.hour(
        F.first("purchase_date").over(
            Window.partitionBy(
                "merchant_id", "city_id"
            ).orderBy("purchase_date")
        )
    )
).withColumn(
    "close_hour",
    F.hour(
        F.last("purchase_date").over(
            Window.partitionBy(
                "merchant_id", "city_id"
            ).orderBy("purchase_date")
        )
    )
).groupBy(
    "city_id",
    "category",
    F.date_format(F.col("purchase_date"), "MMM").alias("Month"),
    "open_hour",
    "close_hour",
).agg(
    # metric, may be different depending on the exact goal
    (F.sum('positive_purchase') / F.size(F.collect_set("merchant_id"))).alias("purchase_per_merchant")
).orderBy(F.col("purchase_per_merchant").desc()).show()

[Stage 60:>                                                         (0 + 8) / 9]

+-------+--------+-----+---------+----------+---------------------+
|city_id|category|Month|open_hour|close_hour|purchase_per_merchant|
+-------+--------+-----+---------+----------+---------------------+
|     69|       C|  Dec|        0|        11|   205.64238200999995|
|     69|       C|  Dec|        0|        16|   186.81802251000008|
|     69|       C|  Dec|        0|        15|   172.02714916000005|
|     69|       A|  Nov|        0|        12|   164.89174890000027|
|     69|       C|  Jan|        0|        11|   164.63029231000004|
|     69|       C|  Nov|        0|        19|   160.14326141999996|
|     69|       C|  Dec|        0|        18|   152.88978985000006|
|     69|       C|  Jan|        0|        13|   151.53567985999996|
|     69|       C|  Dec|        0|        13|         143.38040711|
|     69|       C|  Oct|        0|        18|   142.84326542999997|
|     69|       C|  Sep|        0|        13|   132.69275525000003|
|     69|       C|  Jan|        0|        12|   

                                                                                