In [None]:
%%configure -f 
{
    "conf":
    {
        "spark.app.name":"Bhavesh_Relevancy_notebook",
        "spark.yarn.queue": "default",
        "spark.jars": "/apps/Jars/obelisk-retail-legos.jar,/apps/Jars/mysql-connector-java-5.1.40.jar",
        "spark.driver.extraJavaOptions": "-Dscala.color",
        "spark.driver.memory": "2g",
        "spark.executor.memory": "2g",
        "spark.executor.instances": "4"
    }
}

In [None]:
import ai.couture.obelisk.commons.Constants._
import ai.couture.obelisk.commons.Constants.STANDARD_COL_NAMES._
import ai.couture.obelisk.commons.Constants._
import ai.couture.obelisk.commons.io._
import ai.couture.obelisk.commons.utils.DateTimeUtil._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

## Complete code

In [None]:
package ai.couture.obelisk.retail.legos.inventoryprediction

import ai.couture.obelisk.commons.utils.BaseBlocks
import ai.couture.obelisk.commons.Constants._
import ai.couture.obelisk.commons.Constants.STANDARD_COL_NAMES._
import ai.couture.obelisk.commons.Constants._
import ai.couture.obelisk.commons.io._
import ai.couture.obelisk.commons.utils.DateTimeUtil._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object PercentileBinnedComputation extends BaseBlocks {

  var getDaysInBetween: UserDefinedFunction = udf((startDate: String, endDate: String) => {
    getNumberOfDaysBetweenTwoDates(startDate, endDate, "yyyy-MM-dd")
  })

  override def load(): Unit = {

  }

  override def doTransformations(): Unit = {
      
      var interactionsDB = ParquetToDF.getDF("/data/ecomm/ajio/processed/interactionsDB")
    interactionsDB = (
        interactionsDB.filter(col(DATE).between("2022-03-13", "2023-03-13") && col("event") === "Checkout" && col("purchase") === "New" 
                              && col(USERID).isNotNull && col(QUANTITY) > 0 && col("booked_rev") > 0 && col(PRODUCTID).isNotNull
                             )
        .groupBy(PRODUCTID, DATE)
        .agg(sum(QUANTITY) as "sales")
        .transform(saveNLoadDF("salesDayWise"))
    )

    var trimmedProducts = ParquetToDF.getDF("/data/Archive/bhavesh/inventoryPrediction/TransformedDataset/date_when_prediction_is_made=2023-04-16/combined/data/test")
      .select(PRODUCTID).distinct()

    val distinctDates: DataFrame = interactionsDB.select(DATE).distinct()
    var operationDF = trimmedProducts.crossJoin(broadcast(distinctDates))
      .join(interactionsDB, Seq(PRODUCTID, DATE), "left")
      .na.fill(0)
      .withColumn("sales", col("sales").cast(DoubleType))
      .transform(saveNLoadDF("trimmed/salesCompleteTrain"))
      .withColumn("daysInHistory", getDaysInBetween(col(DATE), lit("2023-03-15")) - lit(1))
      .withColumn("monthsInHistory", floor(col("daysInHistory") / 30) + 1)
      .filter(col("monthsInHistory") <= 12)
      .transform(saveNLoadDF("trimmed/operationDF"))

    var percentileLimits = operationDF.groupBy(PRODUCTID).agg(
      callUDF("percentile_approx", col("sales"), lit(0.1)).alias("percentile_10"),
      callUDF("percentile_approx", col("sales"), lit(0.2)).alias("percentile_20"),
      callUDF("percentile_approx", col("sales"), lit(0.3)).alias("percentile_30"),
      callUDF("percentile_approx", col("sales"), lit(0.4)).alias("percentile_40"),
      callUDF("percentile_approx", col("sales"), lit(0.5)).alias("percentile_50"),
      callUDF("percentile_approx", col("sales"), lit(0.6)).alias("percentile_60"),
      callUDF("percentile_approx", col("sales"), lit(0.7)).alias("percentile_70"),
      callUDF("percentile_approx", col("sales"), lit(0.8)).alias("percentile_80"),
      callUDF("percentile_approx", col("sales"), lit(0.9)).alias("percentile_90"),
      callUDF("percentile_approx", col("sales"), lit(1.0)).alias("percentile_100")
    ).transform(saveNLoadDF("trimmed/productPercentiles"))

    var X = operationDF.join(percentileLimits, Seq(PRODUCTID)).withColumn(
      "bin",
      when(col("sales") <= col("percentile_20"), lit(1)).otherwise(
        when(col("sales") <= col("percentile_40"), lit(2)).otherwise(
          when(col("sales") <= col("percentile_60"), lit(3)).otherwise(
            when(col("sales") <= col("percentile_80"), lit(4)).otherwise(
              lit(5)
            )
          )
        )
      )
    ).select(PRODUCTID, "monthsInHistory", "bin", "daysInHistory")
      .transform(saveNLoadDF("trimmed/operationDFBinned"))
    X = X.groupBy(PRODUCTID, "monthsInHistory", "bin")
      .agg(count("daysInHistory") as "count")
      .groupBy(PRODUCTID, "bin")
      .agg(sum("count").cast(DoubleType) / lit(12.0) as "count")
      .transform(saveNLoadDF("trimmed/featureStats"))

    var y = (
      ParquetToDF.getDF("/data/ecomm/ajio/processed/interactionsDB")
        .filter(col(DATE).between("2023-03-15", "2023-04-14") && col("event") === "Checkout" && col("purchase") === "New"
          && col(USERID).isNotNull && col(QUANTITY) > 0 && col("booked_rev") > 0 && col(PRODUCTID).isNotNull
        )
        .groupBy(PRODUCTID, DATE)
        .agg(sum(QUANTITY).cast(DoubleType) as "sales")
        .transform(saveNLoadDF("salesDayWiseTarget"))
      )

    var distinctDatesTarget = y.select(DATE).distinct()
    var distinctCombinationsTarget = trimmedProducts.crossJoin(broadcast(distinctDatesTarget))
    y = distinctCombinationsTarget.join(y, Seq(PRODUCTID, DATE), "left")
      .na.fill(0)
      .transform(saveNLoadDF("trimmed/operationDFTarget"))
    y = (
      y.join(percentileLimits, Seq(PRODUCTID))
        .withColumn("bin",
          when(col("sales") <= col("percentile_20"), lit(1)).otherwise(
            when(col("sales") <= col("percentile_40"), lit(2)).otherwise(
              when(col("sales") <= col("percentile_60"), lit(3)).otherwise(
                when(col("sales") <= col("percentile_80"), lit(4)).otherwise(
                  lit(5)
                )
              )
            )
          )
        )
        .select(PRODUCTID, "bin", DATE)
        .repartition(210)
        .transform(saveNLoadDF("trimmed/operationDFTargetBinned"))
        .groupBy(PRODUCTID, "bin")
        .agg(count(DATE) as "count")
        .transform(saveNLoadDF("trimmed/targetStats"))
      )

    val binColumns = Array("percentile_10", "percentile_30", "percentile_50", "percentile_70", "percentile_90")

    // Use the stack function to stack the columns into key-value pairs
    val stackedDF = percentileLimits.select(col("productid"),
      expr(s"stack(${binColumns.length}, ${binColumns.map(c => s"'$c', $c").mkString(", ")}) as (bin, value)"))

    var percentileLimitsBinned = stackedDF.withColumn("bin", substring(col("bin"), 12, 2).cast(IntegerType))
      .withColumn("bin", (col("bin") + lit(10)) / lit(20))
      .transform(saveNLoadDF("trimmed/percentileLimitsBinned"))
      
    var test = X.toDF(PRODUCTID, "bin", "count_x").join(y.toDF(PRODUCTID, "bin", "count_y"), Seq(PRODUCTID, "bin"), "outer").na.fill(0.0).withColumn("count", (col("count_x") + col("count_y"))/lit(2.0)).drop("count_x", "count_y")
    test = test.transform(saveNLoadDF("trimmed/test"))
    test = test.join(percentileLimitsBinned, Seq(PRODUCTID, "bin")).withColumn("predictedyQuantity", col("count") * col("value")).groupBy(PRODUCTID).agg(sum("predictedyQuantity") as "predictedyQuantity").transform(saveNLoadDF("trimmed/predictions"))
      
      // Later split predictions into menShirts/womenKurtas according to productAttrs
  }

  def saveNLoadDF(key: String)(df: DataFrame): DataFrame = {
    var tempPath = "/data/Archive/bhavesh/inventoryPrediction/temp/exp_files"
    DFToParquet.putDF(tempPath + "/" + key, df)
    ParquetToDF.getDF(tempPath + "/" + key)
  }

  override def save(): Unit = {

  }
}

## Bitwise Code (There are some errors in this, look out for them by comparing with above.)

In [None]:
var getDaysInBetween: UserDefinedFunction = udf((startDate: String, endDate: String) => {
    getNumberOfDaysBetweenTwoDates(startDate, endDate, "yyyy-MM-dd")
})

def saveNLoadDF(key: String)(df: DataFrame): DataFrame = {
    var tempPath = "/data/Archive/bhavesh/inventoryPrediction/temp/exp_files"
    DFToParquet.putDF(tempPath + "/" + key, df)
    ParquetToDF.getDF(tempPath + "/" + key)
}

In [None]:
var interactionsDB = ParquetToDF.getDF("/data/ecomm/ajio/processed/interactionsDB")
interactionsDB = (
    interactionsDB.filter(col(DATE).between("2022-03-13", "2023-03-13") && col("event") === "Checkout" && col("purchase") === "New" 
                          && col(USERID).isNotNull && col(QUANTITY) > 0 && col("booked_rev") > 0 && col(PRODUCTID).isNotNull
                         )
    .groupBy(PRODUCTID, DATE)
    .agg(sum(QUANTITY) as "sales")
    .transform(saveNLoadDF("salesDayWise"))
)

In [None]:
var distinctDates = interactionsDB.select(DATE).distinct()
var distinctProducts = interactionsDB.select(PRODUCTID).distinct()
var distinctCombinations = distinctProducts.crossJoin(broadcast(distinctDates))

var operationDF = (
    distinctCombinations.join(interactionsDB, Seq(PRODUCTID, DATE), "left")
    .na.fill(0)
    .withColumn("sales", col("sales").cast(DoubleType))
    .transform(saveNLoadDF("salesCompleteTrain"))
    .withColumn("daysInHistory", getDaysInBetween(col(DATE), lit("2023-03-15")) - lit(1))
    .withColumn("monthsInHistory", floor(col("daysInHistory")/30) + 1)
    .filter(col("monthsInHistory") <= 12)
    .repartition(210)
    .transform(saveNLoadDF("operationDF"))
)

In [None]:
var percentileLimits = operationDF.groupBy(PRODUCTID).agg(
    callUDF("percentile_approx", col("sales"), lit(0.1)).alias("percentile_10"),
    callUDF("percentile_approx", col("sales"), lit(0.2)).alias("percentile_20"),
    callUDF("percentile_approx", col("sales"), lit(0.3)).alias("percentile_30"),
    callUDF("percentile_approx", col("sales"), lit(0.4)).alias("percentile_40"),
    callUDF("percentile_approx", col("sales"), lit(0.5)).alias("percentile_50"),
    callUDF("percentile_approx", col("sales"), lit(0.6)).alias("percentile_60"),
    callUDF("percentile_approx", col("sales"), lit(0.7)).alias("percentile_70"),
    callUDF("percentile_approx", col("sales"), lit(0.8)).alias("percentile_80"),
    callUDF("percentile_approx", col("sales"), lit(0.9)).alias("percentile_90"),
    callUDF("percentile_approx", col("sales"), lit(1.0)).alias("percentile_100")
).transform(saveNLoadDF("productPercentiles"))

In [None]:
val binColumns = Array("percentile_10", "percentile_30", "percentile_50", "percentile_70", "percentile_90")

// Use the stack function to stack the columns into key-value pairs
val stackedDF = percentileLimits.select($"productid", expr(s"stack(${binColumns.length}, ${binColumns.map(c => s"'$c', $c").mkString(", ")})").as("bin", "value"))

var percentileLimitsBinned = stackedDF.withColumn("bin", substring(col("bin"), 12, 2).cast(IntegerType)).withColumn((col("bin") + lit(10))/lit(20)).transform(saveNLoadDF("percentileLimitsBinned"))

In [None]:
    var X = operationDF.join(percentileLimits, Seq(PRODUCTID)).withColumn(
      "bin",
      when(col("sales") <= col("percentile_20"), lit(1)).otherwise(
        when(col("sales") <= col("percentile_40"), lit(2)).otherwise(
          when(col("sales") <= col("percentile_60"), lit(3)).otherwise(
            when(col("sales") <= col("percentile_80"), lit(4)).otherwise(
              lit(5)
            )
          )
        )
      )
    ).select(PRODUCTID, "monthsInHistory", "bin", "daysInHistory").transform(saveNLoadDF("operationDFBinned"))
X = X.groupBy(PRODUCTID, "monthsInHistory", "bin").agg(count("daysInHistory") as "count").groupBy(PRODUCTID, "bin").agg(avg("count") as "count").transform(saveNLoadDF("featureStats"))

In [None]:
var y = (
    ParquetToDF.getDF("/data/ecomm/ajio/processed/interactionsDB")
    .filter(col(DATE).between("2023-03-15", "2023-04-14") && col("event") === "Checkout" && col("purchase") === "New" 
                          && col(USERID).isNotNull && col(QUANTITY) > 0 && col("booked_rev") > 0 && col(PRODUCTID).isNotNull
                         )
    .groupBy(PRODUCTID, DATE)
    .agg(sum(QUANTITY).cast(DoubleType) as "sales")
    .transform(saveNLoadDF("salesDayWiseTarget"))
)

var distinctDatesTarget = y.select(DATE).distinct()
var distinctProductsTarget = y.select(PRODUCTID).distinct()
var distinctCombinationsTarget = distinctProductsTarget.crossJoin(broadcast(distinctDatesTarget))
y = distinctCombinationsTarget.join(y, Seq(PRODUCTID, DATE), "left").na.fill(0).transform(saveNLoadDF("operationDFTarget"))
y = (
    y.join(percetileLimits, Seq(PRODUCTID))
    .withColumn("bin",
                when(col("sales")<=col("percentile_20", lit(1))).otherwise(
                    when(col("sales")<=col("percentile_40"), lit(2)).otherwise(
                        when(col("sales")<=col("percentile_60"), lit(3)).otherwise(
                            when("sales")<=col("percentile_80", lit(4)).otherwise(
                                lit(5)
                            )
                        )
                    )
                )
            )
    .select(PRODUCTID, "bin", DATE)
    .repartition(210)
    .transform(saveNLoadDF("operationDFTargetBinned"))
    .groupBy(PRODUCTID, "bin")
    .agg(count(DATE) as "count")
    .transform(saveNLoadDF("targetStats"))
)

In [None]:
var x = ParquetToDF.getDF("/data/Archive/bhavesh/inventoryPrediction/temp/exp_files/trimmed/featureStats").withColumn("count", col("count").cast(DoubleType))
var y = ParquetToDF.getDF("/data/Archive/bhavesh/inventoryPrediction/temp/exp_files/trimmed/targetStats").withColumn("count", col("count").cast(DoubleType))
x.printSchema
y.printSchema

In [None]:
var test = x.toDF(PRODUCTID, "bin", "count_x").join(y.toDF(PRODUCTID, "bin", "count_y"), Seq(PRODUCTID, "bin"), "outer").na.fill(0.0).withColumn("count", (col("count_x") + col("count_y"))/lit(2.0)).drop("count_x", "count_y")
test = test.transform(saveNLoadDF("trimmed/test"))
test.printSchema

In [None]:
var percentileLimitsBinned = ParquetToDF.getDF("/data/Archive/bhavesh/inventoryPrediction/temp/exp_files/trimmed/percentileLimitsBinned")
percentileLimitsBinned.printSchema

In [None]:
test = test.join(percentileLimitsBinned, Seq(PRODUCTID, "bin")).withColumn("predictedyQuantity", col("count") * col("value")).groupBy(PRODUCTID).agg(sum("predictedyQuantity") as "predictedyQuantity").transform(saveNLoadDF("trimmed/predictions"))
test.printSchema

In [None]:
var productAttrs = ParquetToDF.getDF("/data/Archive/inventory/productAttributesLegosFNL").select(PRODUCTID, SIMILAR_GROUP_LEVEL)
productAttrs.printSchema

In [None]:
test = test.join(productAttrs, Seq(PRODUCTID))
DFToParquet.putDF("/data/Archive/bhavesh/inventoryPrediction/experiments/exp_my_exp/predictions/ModelForEachBrickProductLevel/menShirts/predictions", test.filter(col(SIMILAR_GROUP_LEVEL) === "830216013").drop(SIMILAR_GROUP_LEVEL))
DFToParquet.putDF("/data/Archive/bhavesh/inventoryPrediction/experiments/exp_my_exp/predictions/ModelForEachBrickProductLevel/womenKurtas/predictions", test.filter(col(SIMILAR_GROUP_LEVEL) === "830303011").drop(SIMILAR_GROUP_LEVEL))