In [None]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark

In [None]:
# PREPS AND LOADING THE DATAFRAMES
from pyspark.sql.functions import col, month, year, hour, date_format, sum, avg, row_number, coalesce, dense_rank, format_number, count, rank
from pyspark.sql.window import Window

# loading files into respective dfs                                                       ( PLEASE SWAP THE FILE PATHS BEFORE THE RUN)
df_history = spark.read.format("parquet").load('historical-subset.parquet')                        # << - HERE
df_merchants = spark.read.option("header",True).format("csv").load('merchants-subset.csv')

# Adding a prefix to df_merchants columns so we can avoid ambuguity errors + swapping null merchant_id/category with "merchant/category unknown"
df_prepared = df_history.withColumn("month_and_year_of_purchase", date_format("purchase_date", 'MMM-yyyy'))\
                        .withColumn("purchase_hour", hour(df_history.purchase_date))\
                        .na.fill(value="Unknown merchant", subset=["merchant_id"])\
                        .na.fill(value="Unknown category", subset=["category"])

df_merchants = df_merchants.select([col(c).alias(f"df2_{c}") for c in df_merchants.columns])

df_prepared.show(3)
df_merchants.show(3)


print("while doing question 4 I figured that some merchants with different names share the same merchant_id. I assume this is ok?")
df_merchants.where(col("df2_merchant_id") == "M_ID_00a6ca8a8a").show(3)

In [None]:
# Preparing the joined dataframe: joining df_prepared with df_merchants, coalescing the merchant_name column values, casting amount column to a readabla format  
df_joined = df_prepared.join(df_merchants, df_prepared.merchant_id == df_merchants.df2_merchant_id, 'left')\
                       .select(df_prepared["*"], df_merchants["df2_merchant_name"])\
                       .withColumn("merchant_name", coalesce("df2_merchant_name","merchant_id"))\
                       .withColumn('purchase_amount', col('purchase_amount').cast('decimal(12,2)'))\
                       .drop("df2_merchant_name")

print(df_joined.schema["purchase_amount"].dataType)

#checking if coalesced any of the merchant_id's  -- none found
#df_joined.where(col("merchant_name").like("M_ID%")).show()   

df_joined.show(3)


In [None]:
# TASK 1 : Top 5 merchants by PURCHASE_AMOUNT by MONTH, CITY
# First, let's group by the "month_and_year_of_purchase", "city_id", "merchant_id"
df_grouped = df_joined.groupBy("month_and_year_of_purchase", "city_id", "merchant_name")\
                .agg(sum("purchase_amount").alias("Purchase Sum"))

# Now using a window function to get rownumber for the most performing merchants (then droppint it)
time_city_window = Window.partitionBy("month_and_year_of_purchase", "city_id").orderBy(col("Purchase Sum").desc())

df_by_month = df_grouped.withColumn("row", row_number().over(time_city_window)).filter(col("row") <= 5).drop("row")

# Renaming columns in final output
output_col_list = ["Month", "City", "Merchant", "Purchase Total"]
df_by_month = df_by_month.toDF(*output_col_list)

df_by_month.show()


In [None]:
# TASK 2 : Finding the average sale amount per merchant per state

# Just for debugging + is state = -1 ok?
#df_grouped2 = df_joined.groupBy("merchant_id", "state_id")\
#                       .agg(avg("purchase_amount").alias("Purchase Avg"), sum("purchase_amount").alias('sum'), count("purchase_amount").alias('count')).orderBy(col("Purchase Avg").desc()).show()

# Creating another groupby dataframe :
df_grouped2 = df_joined.groupBy("merchant_name", "state_id")\
                       .agg(avg("purchase_amount").alias("Purchase Avg")).orderBy(col("Purchase Avg").desc())

# Remove the avg excessive presicion for readability
df_grouped2 = df_grouped2.withColumn('Purchase Avg', col('Purchase Avg').cast('decimal(12,2)'))

# Renaming columns in final output
output_col_list2 = ["Merchant", "State","Average Amount"]
df_grouped2 = df_grouped2.toDF(*output_col_list2)

df_grouped2.show()

In [None]:
# TASK 3 : Finding the top 3 performing hours by purchase_amount by product category
# Creating another window filter by purchase sum
category_hour_window = Window.partitionBy("category").orderBy(col("Purchase Sum").desc())

df_grouped3 = df_joined.groupBy("category", "purchase_hour")\
                       .agg(sum("purchase_amount").alias("Purchase Sum"))

df_by_hour = df_grouped3.withColumn("row", row_number().over(category_hour_window)).filter(col("row") <= 3)\
                        .drop("row").drop(col("Purchase Sum"))

df_by_hour.show()

# Hours 12 and 13 are good for both A and B + the Unknown category
# The C category is more inclined to hours 15 to 17 with more sells later in the day

In [None]:
# TASK 4 : Finding the most popular cities by merchants with most transactions

# !! One Merchant_id might have more than one merchant_names attached, so let's try and group_by the ID 
# Creating a ranking window function for count of transactions per merchant to make sure we're taking the most popular ones
merchant_popularity_window  = Window.partitionBy("merchant_id").orderBy(col("count").desc())

df_grouped4  = df_joined.groupBy("merchant_id", "city_id", "category").count()\
                        .withColumn("transactions count rank per merchant ", rank().over(merchant_popularity_window))\
                        .orderBy(col("count").desc()).drop("merchant_id")

df_grouped4.show()

# Notes Cat B is the most popular in city 1, C has a minor sales fraction.
#       Cities 1 and 69 are the most popular

Question 5 


q: Which cities would you advise them to focus on and why?

      a: As per task 4 outcome, cities 1 and 69 locate the most popular merchants. Those are mostly selling categories A and B

q: Which categories would you recommend they sell

      a: I assume that the customer is not tied to a specific category, so I'd suggest selling either A or B as those are the categories with the most sales overall

       df_joined.groupBy("month_and_year_of_purchase", "category").agg(sum("purchase_amount").alias("Purchase Sum")).orderBy(col("Purchase Sum").desc()).show(100)

       Note : It is also assumed that competition is out of the equation, otherwise it'd be not very wise to take either of those categories in any of the cities

q: Are there particular periods (months) that have interesting sales behaviors?

       a: df_joined.groupBy("month_and_year_of_purchase").agg(sum("purchase_amount").alias("Purchase Sum")).orderBy(col("Purchase Sum").desc()).show()
       This query shows that revenues tend to climb up gradually over the year and peak at the end of it

       month_and_year_of_purchase|  Purchase Sum|

             Dec-2017|             17985530483.85|
             Jan-2018|             15072733775.32|
             Nov-2017|             14895102411.56|
             Oct-2017|             13216024402.82|
             Feb-2018|             12830527978.20|
             Sep-2017|             12593080677.42|
             Aug-2017|             11551547781.13|
             Jul-2017|             10514508507.65|
             Jun-2017|              8729460160.04|
             May-2017|              7984831489.83|
             Mar-2017|              7488129164.64|
             Apr-2017|              7449791594.04|
             Feb-2017|              6197997283.63|
             Jan-2017|              5863203497.65|


q: What hours would you recommend they open and close for the day?

      a: Hours 12 and 13 are good for both A and B + the Unknown category  (see code return in TASK 3)
         The C category is more inclined to hours 15 to 17 with more sales later in the day

q: Would you recommend accepting payments in installments? Assume a credit default rate of 22.9% per month. 
For this question, consider the “installments” header in the historical transactions and the impact it may have, if any, on merchant sales (merchant sales in terms of purchase_amounts). We are making a simplistic assumption that 25% of sales is gross profit to merchants, there are equal installments and everyone who defaulted did so after making half payment.

      a: Couldn't finish the question due to lack of time, sorry. I will just submit what I could do since I'm past due anyway :(