# Import and Clean Data

In [1]:
import org.apache.spark.sql.types._

val transactions = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("s3a://databricks-dump/datalakedr/mlTest/final/transactions/part-00000")
    .withColumnRenamed("T1", "root_category")
    .withColumnRenamed("T2", "sub_category")
    .withColumnRenamed("T4", "primary_category")

val profitMetrics = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("s3a://databricks-dump/datalakedr/mlTest/final/profitMetrics/part-00000")
    .withColumnRenamed("T1", "root_category")
    .withColumnRenamed("T2", "sub_category")
    .withColumnRenamed("T4", "primary_category")


val returnedCancelledMetrics  = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("inferSchema", "true")
    .load("s3a://databricks-dump/datalakedr/mlTest/final/returnedCancelledMetrics/part-00000")
    .withColumnRenamed("T1", "root_category")
    .withColumnRenamed("T2", "sub_category")
    .withColumnRenamed("T4", "primary_category")





[cancel_num: int, return_num: int, merchant_id: string, root_category: string, sub_category: string, primary_category: string]

In [5]:
transactions.printSchema

root
 |-- item_created_at: timestamp (nullable = true)
 |-- item_ship_by_date: timestamp (nullable = true)
 |-- qty_ordered: integer (nullable = false)
 |-- item_mrp: double (nullable = false)
 |-- item_price: double (nullable = false)
 |-- item_selling_price: double (nullable = false)
 |-- item_discount: double (nullable = false)
 |-- fulfillment_shipped_at: string (nullable = true)
 |-- fulfillment_created_at: timestamp (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- root_category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- primary_category: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- order_id: string (nullable = true)





In [3]:
profitMetrics.printSchema

root
 |-- commission_percent: double (nullable = false)
 |-- cashback_percent: string (nullable = true)
 |-- discount_percent: double (nullable = false)
 |-- merchant_id: string (nullable = true)
 |-- root_category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- primary_category: string (nullable = true)





In [31]:
returnedCancelledMetrics.printSchema

root
 |-- cancel_num: integer (nullable = true)
 |-- return_num: integer (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- root_category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- primary_category: string (nullable = true)





# Compute efficiency Metrics

Two metrics are calculated to measure the efficiency of merchants in terms of pricing and shipping. 

The pricing efficiency is calculated based on the assumption that if the merchant can offer better price than the mrp price for a particular item, it means that they have better channels for sourcing the item and therefore the merchant score should be higher.

The shipping efficiency is calculated based on the expected number of days that an item will be delivered to the customer from the time the time order is submitted by the user versus the actual number of days that the item arrives. The faster the item arrives the higher the merchant score. 

All scores are weighted by the quantity of the order and it will be nornmalized by the total order at the primary category level.

In [None]:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val efficiency = transactions
    .withColumn("expected_days", datediff($"item_ship_by_date",$"item_created_at"))
    .withColumn("actual_days", datediff($"fulfillment_shipped_at",$"item_created_at"))
    .withColumn("item_price_efficiency", ($"item_mrp" - $"item_price" + $"item_discount")/$"item_mrp")
    .select($"root_category",
            $"sub_category",
            $"primary_category",
            $"merchant_id",
            $"qty_ordered",
            ($"expected_days" - $"actual_days")/$"expected_days" * $"qty_ordered" as "shipping_efficiency", 
            $"item_price_efficiency" * $"qty_ordered" as "price_efficiency")
    .groupBy($"merchant_id", $"root_category", $"sub_category", $"primary_category")
    .agg(
        sum($"qty_ordered") as "total_orders",
        sum($"shipping_efficiency") as "shipping_efficiency",
        sum($"price_efficiency") as "price_efficiency"
    ).na.fill(0)

In [8]:
efficiency.orderBy($"price_efficiency".asc, $"shipping_efficiency".asc).show()

+--------------------+--------------------+--------------------+--------------------+------------+-------------------+-------------------+
|         merchant_id|       root_category|        sub_category|    primary_category|total_orders|shipping_efficiency|   price_efficiency|
+--------------------+--------------------+--------------------+--------------------+------------+-------------------+-------------------+
|ed86eaf4f8b892e9f...|bf8dd8c68d02e161c...|c7502c55f8db54062...|fef6f971605336724...|         145|  68.63333333333333|  -0.91324200913242|
|ed86eaf4f8b892e9f...|bf8dd8c68d02e161c...|c7502c55f8db54062...|b6f8dc086b2d60c58...|         298| 138.60000000000002|-0.7722007722007722|
|988969200cb769535...|faefec47428cf9a2f...|d372a8a4e6dca1c27...|4258736785df42c82...|           2| 1.0833333333333333|-0.5666666666666667|
|1146d96286ade57a9...|20e7f31e77cd39cde...|d2eb3bf91af83314f...|9a2e8514e94644ccf...|        7901| -574.0714285714289|                0.0|
|a5e5bc0af001017b9...|52c69



# Compute Profit Metrics

To calculate the net paytm net profit, the cash back percent is deducted from the commission percent. Discount percent offered by merchant is considered a reduction of the net profit.

In [3]:
val profit = profitMetrics
.withColumn("actual_profit_percent", ($"commission_percent"  - $"cashback_percent") * (lit(1) - $"discount_percent"))
.select("merchant_id", "root_category", "sub_category", "primary_category", "actual_profit_percent").na.fill(0)





[merchant_id: string, root_category: string, sub_category: string, primary_category: string, actual_profit_percent: double]

In [10]:
profit.orderBy($"actual_profit_percent".asc).show

+--------------------+--------------------+--------------------+--------------------+---------------------+
|         merchant_id|       root_category|        sub_category|    primary_category|actual_profit_percent|
+--------------------+--------------------+--------------------+--------------------+---------------------+
|f713432c65b6b19ad...|c4b8bb990423f770d...|cc298d5bc587e1b65...|daed210307f1dbc6f...|            -0.928788|
|1551cdfa10908c237...|dc36f18a9a0a77667...|f752582986d70327a...|b7852f3d3a775b518...|  -0.9118610169491526|
|1551cdfa10908c237...|dc36f18a9a0a77667...|324545ee1d35608f4...|c9e37c28ce5852218...|  -0.9118606606606606|
|da9cc4cc759d7b03b...|f702defbc67edb455...|98a361c41dd6204d4...|4fd3f5fed2d59efc8...|             -0.91098|
|192e6a98f4986c6c3...|f702defbc67edb455...|98a361c41dd6204d4...|92cf1d861842f9b7b...|             -0.91098|
|9b44f920ec0d026a2...|f702defbc67edb455...|98a361c41dd6204d4...|9403726f748a52982...|  -0.9109799599198397|
|4fd04fbeeb9aabf2f...|f702de



# Aggregate metrics per merchant

Finally, we aggregate the above calculations by merchant weighted by the total orders 

In [4]:
val scoreMetrics = efficiency.join(profit, Seq("root_category", "sub_category","primary_category","merchant_id"))
          .join(returnedCancelledMetrics, Seq("root_category", "sub_category","primary_category","merchant_id"))
          .withColumn("actual_avg_profit", $"total_orders" * $"actual_profit_percent")
          .groupBy("merchant_id")
          .agg(sum("total_orders") as "total", 
               sum("actual_avg_profit") as "total_actual_profit_percent", 
               sum("cancel_num") as "total_cancel_num", 
               sum("return_num") as "total_return_num", 
               sum("shipping_efficiency") as "total_shipping_efficiency",
               sum("price_efficiency" ) as "total_price_efficiency")
          .select($"merchant_id",
                  $"total_shipping_efficiency"/$"total" as "avg_shipping_efficiency",
                  $"total_price_efficiency"/$"total" as "avg_price_efficiency",
                  $"total_return_num"/$"total" as "return_rate",
                  $"total_cancel_num"/$"total" as "cancel_rate",
                  $"total_actual_profit_percent"/$"total" as "avg_profit_percent")





[merchant_id: string, avg_shipping_efficiency: double, avg_price_efficiency: double, return_rate: double, cancel_rate: double, avg_profit_percent: double]

In [122]:
scoreMetrics.show(10, false)

+--------------------------------+-----------------------+--------------------+-------------------+--------------------+--------------------+
|merchant_id                     |avg_shipping_efficiency|avg_price_efficiency|return_rate        |cancel_rate         |avg_profit_percent  |
+--------------------------------+-----------------------+--------------------+-------------------+--------------------+--------------------+
|47d23d35ab19f03c6bdc7d1b4d03cd13|0.23958333333333331    |0.3750236262233663  |0.0                |0.125               |-0.19421969884161433|
|4556ed716bc312dff084dd743caeca87|0.6208459185698139     |0.008600117697962964|0.07308089945722408|0.1134660118893771  |-0.23397497388553923|
|1687c811b0829bc670b2b2f4fa536beb|0.6739130434782608     |0.5926703708561616  |0.10144927536231885|0.043478260869565216|-0.04005331744247771|
|70511774a5ebef510ae7404853a12177|-0.25                  |0.47393364928909953 |1.0                |0.0                 |0.026934702474986836|
|07e11



# Compute Final Score 

The universal score is then computed based on the 5 metrics we calculated. The interpretation of the score is as follows:
1. A positive score means the merchant usually exceeds expectation
2. A 0 score means the merchant meets expectation
3. A negative score means the merchant needs improvement on the 5 metrics we measured


In [6]:
val score = scoreMetrics.select( 
                $"merchant_id",
                $"avg_profit_percent" * (lit(1) - $"return_rate" - $"cancel_rate") as "profit_score",
                $"avg_shipping_efficiency" * (lit(1) - $"return_rate") as "shipping_score",
                $"avg_price_efficiency" * (lit(1) - $"cancel_rate") as "price_score"
            )
            .select($"merchant_id", $"profit_score" + $"shipping_score" + $"price_score" as "merchant_score")
            .show(10, false)

+--------------------------------+-------------------+
|merchant_id                     |merchant_score     |
+--------------------------------+-------------------+
|4556ed716bc312dff084dd743caeca87|0.3927705718825911 |
|fec9ebe81dc34da06ac1f4eff8c8f130|0.3310599204814085 |
|9b44f920ec0d026a26a48bcac4dcba2a|0.49365170767703126|
|226b5bf02bf8b97501335e2792e5abc7|0.9818455702650739 |
|8c47f7c40acb430047f501a2d5345776|0.2541409472775181 |
|aadda33446a0633e97539c0c7f0b177b|0.4213308373066574 |
|fe44357bd46c876f41203d9ee440c0a7|0.8366666666666667 |
|f3af38a9500cfc72614a7cb788e5a56b|0.7222505555555555 |
|dbd667d1742d1abf65e1c74f96f940c3|0.36991095814117664|
|32a2bacb528ea401c06b17376cad237e|0.5439791896832656 |
+--------------------------------+-------------------+
only showing top 10 rows



