<span style="color:blue">Thanks for using Drogon for your interactive Spark application. We update Drogon/SparkMagic as often as possible to make it easier, faster and more reliable for you. Have a question or feedback? Ping us on [uChat](https://uchat.uberinternal.com/uber/channels/spark).</span>

What's New
- Now you can use `%%configure` and `%%spark` magics to configure and start a Spark session (deprecating hard-to-use `%load_ext sparkmagic.magics` and `manage_spark` magics). Check out [this example](https://workbench.uberinternal.com/explore/knowledge/localfile/cwang/sparkmagic_python2_example.ipynb) for more details.
- Improved `%%configure` magic. You now can use it to make all Spark and Drogon configurations from within notebook itself. Check out our [latest documentation & examples](https://docs.google.com/document/d/1mkYtDHquh4FjqTeA0Fxii8lyV-P6qzmoABhmmRwm_00/edit#heading=h.xn14pmoorsn0) for more details.
- Bug fixes and performance updates.


In [None]:
%%configure -f
{
  "kind": "spark", 
  "proxyUser": "dhruven.vora", 
  "sparkEnv": "SPARK_24", 
  "driverMemory": "12g", 
  "queue": "maps_route_analytics", 
  "numExecutors": 100, 
  "executorCores": 1, 
  "driverCores": 4,
  "conf": {
    "spark.driver.maxResultSize": "10g",
    "spark.executor.memoryOverhead": 3072, 
    "spark.locality.wait": "0",
    "spark.default.parallelism":10000
  },
  "executorMemory": "24g",
  "drogonHeaders": {
    "X-DROGON-CLUSTER": "phx2/Secure"
  }
}

In [None]:
%%spark

In [None]:
// Definition of classes needed to calculate Trip dispatch type
case class MarketplaceInfo (
    taskType: String
)

case class Task (
    marketplaceInfo: MarketplaceInfo
)

case class Waypoint (
    status: String,
    tasks: List[String] // derived from waypoint.task.marketPlaceInfo
)

case class SupplyState (
    status: String,
    waypoints: List[Waypoint],
    assignedJobs: List[String],
    acceptedJobs: List[String],
    offeredJobs: List[String]
)

case class SupplyStateChange (
    jobUuid: String,
    supplyUuid: String,
    fromState: SupplyState,
    toState: SupplyState,
    fromStateTimestamp: Long,
    toStateTimestamp: Long,
    reason: String,
    latitude: Double,
    longitude: Double,
    vehicleType: String
)

case class SupplyStatus (
    supplyUuid: String,
    toTimestampMs: Long,
    fromTimestampMs: Long,
    toLatitude: Double,
    toLongitude: Double,
    fromLatitude: Option[Double],
    fromLongitude: Option[Double],
    fromStatus: String,
    toStatus: String,
    jobUuid: String,
    vehicleType: String
)

case class SupplyStatusList (
    supplyStatuses: List[SupplyStatus]
)

In [None]:
/**
this class loads supply state change from kafka topic rawdata.kafka_hp_supply_supply_state_changes_nodedup
*/
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import spark.implicits._
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.Map

object SupplyStateChangeLoader {

  /** Run query to load trips from the table by city and day
    * @param utcDateStr
    * @param cityIds
    * */
  def load(start_date: String, end_date: String, start_epoch_ms: Long, end_epoch_ms: Long): DataFrame = {

    var query =
      s"""select
    | msg.jobuuid as jobUuid,
    | msg.supplyuuid as supplyUuid,
    | msg.currentstatetimestamp - msg.timeinpreviousstate as fromStateTimestamp,
    | msg.fromstate as fromState,
    | msg.tostate as toState,
    | msg.reason as reason,
    | msg.supplylocation.latitude as latitude,
    | msg.supplylocation.longitude as longitude,
    | msg.currentstatetimestamp as toStateTimestamp,
    | vehicle.form_factor as vehicleType
| from
    | rawdata.kafka_hp_supply_supply_state_changes_nodedup supply_state
    | join dwh.dim_vehicle vehicle on supply_state.msg.activeVehicleUuid = vehicle.vehicle_uuid
| where
  | datestr between '$start_date' and '$end_date'
  | and msg.currentstatetimestamp >= $start_epoch_ms
  | and msg.currentstatetimestamp <= $end_epoch_ms
  | and msg.supplyuuid is not NULL
  | and msg.jobuuid is not NULL
  | and msg.supplylocation.latitude is not null
  | and msg.supplylocation.longitude is not null
  | and msg.currentstatetimestamp is not null
  | and msg.timeinpreviousstate is not null""".stripMargin
        .replaceAll("\n", " ")

    spark.sql(query)
  }

  /** Store dataset in the right schema
    * @param rawDataset
    * */
  def makeDataset(rawDataset: DataFrame): Dataset[SupplyStateChange] = {
      rawDataset.map(r => {
          
          var waypointsFromState = ListBuffer[Waypoint]()
          r.getAs[Row]("fromState").getAs[Seq[Row]]("waypoints").foreach(wp => {
              
              var tasks = ListBuffer[String]()
              wp.getAs[Seq[Row]]("tasks").foreach(tsk => tasks += tsk.getAs[Row]("marketplaceInfo").getAs[String]("taskType"))
              
              Waypoint (
                status = wp.getAs[String]("status"),
                tasks = tasks.toList
              )
          })
          
          var assignedJobsFromState = ListBuffer[String]()
          r.getAs[Row]("fromState").getAs[Seq[String]]("assignedJobs").foreach(job => assignedJobsFromState += job)
          
          var acceptedJobsFromState = ListBuffer[String]()
          r.getAs[Row]("fromState").getAs[Seq[String]]("acceptedJobs").foreach(job => acceptedJobsFromState += job)
          
          var offeredJobsFromState = ListBuffer[String]()
          r.getAs[Row]("fromState").getAs[Seq[String]]("offeredJobs").foreach(job => offeredJobsFromState += job)
          
          val fromState = SupplyState(
            status = r.getAs[Row]("fromState").getAs[String]("status"),
            waypoints = waypointsFromState.toList,
            assignedJobs = assignedJobsFromState.toList,
            acceptedJobs = acceptedJobsFromState.toList,
            offeredJobs = offeredJobsFromState.toList
          )
          
          var waypointsToState = ListBuffer[Waypoint]()
          r.getAs[Row]("toState").getAs[Seq[Row]]("waypoints").foreach(wp => {
              
              var tasks = ListBuffer[String]()
              wp.getAs[Seq[Row]]("tasks").foreach(tsk => tasks += tsk.getAs[Row]("marketplaceInfo").getAs[String]("taskType"))
              
              Waypoint (
                status = wp.getAs[String]("status"),
                tasks = tasks.toList
              )
          })
          
          var assignedJobsToState = ListBuffer[String]()
          r.getAs[Row]("toState").getAs[Seq[String]]("assignedJobs").foreach(job => assignedJobsToState += job)
          
          var acceptedJobsToState = ListBuffer[String]()
          r.getAs[Row]("toState").getAs[Seq[String]]("acceptedJobs").foreach(job => acceptedJobsToState += job)
          
          var offeredJobsToState = ListBuffer[String]()
          r.getAs[Row]("toState").getAs[Seq[String]]("offeredJobs").foreach(job => offeredJobsToState += job)
          
          val toState = SupplyState(
            status = r.getAs[Row]("toState").getAs[String]("status"),
            waypoints = waypointsToState.toList,
            assignedJobs = assignedJobsToState.toList,
            acceptedJobs = acceptedJobsToState.toList,
            offeredJobs = offeredJobsToState.toList
          )
          
          
          SupplyStateChange(
            jobUuid = r.getAs[String]("jobUuid"),
            supplyUuid = r.getAs[String]("supplyUuid"),
            fromState = fromState,
            toState = toState,
            fromStateTimestamp = r.getAs[Long]("fromStateTimestamp"),
            toStateTimestamp = r.getAs[Long]("toStateTimestamp"),
            reason = r.getAs[String]("reason"),
            latitude = r.getAs[Double]("latitude"),
            longitude = r.getAs[Double]("longitude"),
            vehicleType = r.getAs[String]("vehicleType")
          )
      })
  }
}

In [None]:
// load supply state change data
val supplyStateChangeRaw = SupplyStateChangeLoader.load("2023-01-28", "2023-01-28", 1674892800000L, 1674979199000L)
val supplyStateChange = SupplyStateChangeLoader.makeDataset(supplyStateChangeRaw).cache()
supplyStateChange.printSchema

In [None]:
supplyStateChange.count()

In [None]:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import spark.implicits._
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.Map

// This class converts the supply state change data into a simplified supply state data
// which can be used to calculate trip dispatch type

object SupplyStateConverter {
    
  def toSupplyStatus(supplyStateChangeDataset: Dataset[SupplyStateChange]): Dataset[SupplyStatus] = {
    supplyStateChangeDataset.
      groupByKey(r => r.supplyUuid).
      flatMapGroups((key, supplyStateChanges) => convertList(supplyStateChanges));
  }
    
  def convertList(supplyStateChangesIt: Iterator[SupplyStateChange]): Seq[SupplyStatus] = {
    val supplyStateChanges = supplyStateChangesIt.toSeq.sortBy(_.toStateTimestamp)

    var supplyStatuses = ListBuffer[SupplyStatus]();

    var prevSupplyStateChange:Option[SupplyStateChange] = None;
    for (supplyStateChange <- supplyStateChanges) {
      val supplyStatus = convert(supplyStateChange, prevSupplyStateChange);
      supplyStatuses += supplyStatus;
      prevSupplyStateChange = Some(supplyStateChange);
    }

    return supplyStatuses.toSeq;
  }

  def convert(
      supplyStateChange: SupplyStateChange, prevSupplyStateChange: Option[SupplyStateChange]): SupplyStatus = {
    
      val supplyStatus = SupplyStatus(
        jobUuid = supplyStateChange.jobUuid,
        supplyUuid = supplyStateChange.supplyUuid,
        toTimestampMs = supplyStateChange.toStateTimestamp,
        fromTimestampMs = supplyStateChange.fromStateTimestamp,
        toLatitude = supplyStateChange.latitude,
        toLongitude = supplyStateChange.longitude,
        fromLatitude = if(prevSupplyStateChange.isDefined) Some(prevSupplyStateChange.get.latitude) else None,
        fromLongitude = if(prevSupplyStateChange.isDefined) Some(prevSupplyStateChange.get.longitude) else None,
//         fromStatus = translateFromSupplyState(supplyStateChange.fromState),
//         toStatus = translateToSupplyState(supplyStateChange.toState, supplyStateChange.reason),
        fromStatus = "FROM",
        toStatus = "TO",
        vehicleType = supplyStateChange.vehicleType
      );

      return supplyStatus;
  }
    
  def translateFromSupplyState(supplyState: SupplyState): String = {
    
    val waypoints: List[Waypoint] = supplyState.waypoints;
    val status: String = supplyState.status;

    if ("ONLINE".equals(status)) {
      if (!supplyState.assignedJobs.isEmpty) {
        
        val stats: Map[String, Integer] = collectStats(waypoints);
        if (stats("DROPOFF_PENDING")
                + stats("DROPOFF_ARRIVED")
            > stats("PICKUP_ARRIVED")
                + stats("PICKUP_PENDING")) {
          return "DrivingClient";
        } else {
          if (stats("PICKUP_ARRIVED") > 0) {
            return "Arrived";
          } else {
            return "Accepted";
          }
        }
      } else if (!supplyState.acceptedJobs.isEmpty) {
        return "AcceptPending";
      } else if (!supplyState.offeredJobs.isEmpty) {
        return "Dispatched";
      } else {
        return "Open";
      }
    } else if ("OFFLINE".equals(status) || "OBSERVING".equals(status)) {
      return "OffDuty";
    } else {
      return "Unknown";
    }
  }
    
  def translateToSupplyState(supplyState: SupplyState, reason: String): String = {
    var status: Option[String] = None;
    if (reason != null) {
      status = Some(reason);
    }

    if (status.isDefined) {
      return status.get;
    }

    return translateFromSupplyState(supplyState);
  }
    
  def collectStats(waypoints: List[Waypoint]): Map[String, Integer] = {
    
    var COUNTS = Map[String, Integer]().withDefaultValue(0);

    for (wp <- waypoints) {
      val tasks: List[String] = wp.tasks;
      for (task <- tasks) {
//         val mpInfo: MarketplaceInfo = task.marketplaceInfo;
//         if (mpInfo != null) {
          val taskType: String = task;
          val status: String = wp.status;
          if (taskType != null && status != null) {
//             WpTypeStatus key = WpTypeStatus.get(taskType, status);
            val key: String = taskType.toUpperCase() + "_" + status.toUpperCase();
//             if (key == null) {
//               continue; // Ignore this (type, status) combination.
//             }
            val prevCount:Int = COUNTS(key);
            COUNTS += (key -> (prevCount + 1));
          }
//         }
      }
    }

    return COUNTS;
  }
}

In [None]:
val supplyStatus = SupplyStateConverter.toSupplyStatus(supplyStateChange).cache()
supplyStatus.printSchema()

In [None]:
supplyStatus.count()

In [None]:
supplyStatus.show()

In [None]:
// update the status and map the object
supplyStateChange.limit(10).map(r => (
    val fromStatusString = r.fromStatus()
))

In [None]:
supplyStateChange.map(r => r.fromState).distinct().collect().foreach(println)

In [None]:
supplyStateChange.map(r => r.toState).distinct().collect().foreach(println)

In [None]:
// find previous job dataset
val previousStateChange = supplyStateChange.
where(col("fromState")===lit("DrivingClient")).cache()
previousStateChange.count()

In [None]:
// find supply lead time
val leadTimeSpec = Window.partitionBy(col("supplyUuid")).orderBy("toStateTimestamp");

val supplyLeadTime = supplyStateChange.
withColumn("leadTimestampMs", lead(col("toStateTimestamp"), 1).over(leadTimeSpec)).cache()

In [None]:
// identify the trip dispatch type
val fwd_trips = previousStateChange.alias("PRE").
                joinWith(supplyLeadTime.alias("SUP"), 
                          col("PRE.toTimestampMs")===col("SUP.leadTimestampMs")&&
                          col("PRE.supplyUuid")===col("SUP.supplyUuid")&&
                          col("PRE.jobUuid")=!=col("SUP.jobUuid")).
                select(col("SUP.jobUuid").as("jobUuid"),
                       col("SUP.supplyUuid").as("supplyUuid"),
                       col("SUP.leadTimestampMs").as("pickupStartTimestampMs"),
                       col("SUP.toLatitude").as("pickupLatitude"),
                       col("SUP.toLongitude").as("pickupLongitude"))

fwd_trips.count()
                