Skip to content

Commit

Permalink
Merge 53c1892 into 4dc134b
Browse files Browse the repository at this point in the history
  • Loading branch information
ansingh7115 committed Jan 22, 2019
2 parents 4dc134b + 53c1892 commit 89dde04
Show file tree
Hide file tree
Showing 10 changed files with 1,016 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.broadinstitute.workbench.ccm

import java.time.{Duration, Instant}

import cats.data.NonEmptyList
import cats.effect.Sync
import cats.effect._
import cats.implicits._
import cats.data._
import org.broadinstitute.workbench.ccm.pricing.{ PriceList}


object CostCalculator {

def getPriceOfCall(callMetaDataJson: MetadataResponse, priceList: PriceList): Either[Throwable, Double] = {
val ls: List[Either[NonEmptyList[String], Double]] = callMetaDataJson.calls.map { call =>
getPriceOfCall(call, priceList, Instant.now(), Instant.now()).leftMap(NonEmptyList.one)
}

ls.parSequence.leftMap(errors => new Exception(errors.toList.mkString(", "))).map(_.sum)
}

private def getPriceOfCall(call: Call, priceList: PriceList, startTime: Instant, endTime: Instant): Either[String, Double] = {
for {
_ <- if (call.status.asString == "Success") Right(()) else Left(s"Call {name} status was ${call.status.asString}.") // not evaluating workflows that are in flight or Failed or Aborted or whatever
machineType <- if (call.machineType.asString.contains("custom")) Right("custom") else {
Either.catchNonFatal(call.machineType.asString.split("/", 1)).leftMap(_ => "MachineType could not be parsed.")}
} yield {
// ToDo: calculate subworkflows
val isVMPreemptible = preemptible(call)
val wasPreempted = wasCallPreempted(call)
// only looking at actual and not requested disk info
val diskName = call.runtimeAttributes.disks.diskName
val diskSize = call.runtimeAttributes.disks.diskSize.asInt + call.runtimeAttributes.bootDiskSizeGb.asInt
val diskType = call.runtimeAttributes.disks.diskType
val callDurationInSeconds = getCallDuration(call, startTime, endTime)

// ToDo: add calculating prices for non-custom
// adjust the call duration to account for preemptibility
// if a VM preempted less than 10 minutes after it is created, user incurs no cost
val adjustedCallDurationInSeconds = if (isVMPreemptible && wasPreempted && callDurationInSeconds < (10 * 60)) 0 else callDurationInSeconds
val cpuCost = adjustedCallDurationInSeconds * (if (isVMPreemptible) priceList.CPUPreemptiblePrice else priceList.CPUOnDemandPrice)
val diskCostPerGbHour = if (call.runtimeAttributes.disks.diskType.asString.equals("SSD")) priceList.ssdCostPerGbPerHour else priceList.hddCostPerGbPerHour
val diskGbHours = call.runtimeAttributes.disks.diskSize.asInt * (adjustedCallDurationInSeconds)
val diskCost = diskGbHours * diskCostPerGbHour
val memCost = adjustedCallDurationInSeconds * (if (isVMPreemptible) priceList.RAMPreemptiblePrice else priceList.RAMOnDemandPrice)
cpuCost + diskCost + memCost
}
}


private def wasCallPreempted(call: Call): Boolean = {
// treat preempted and retryableFailure as the same
call.executionEvents.exists(event => (event.description.asString.equals("Preempted") || event.description.asString.equals("RetryableFailure")))
}

private def preemptible(call: Call): Boolean = {
call.attempt.asInt <= call.runtimeAttributes.preemptible.asInt
// ToDo: Add false result if the metadata does not contain an "attempt" or preemptible info
}

private def getCallDuration(call: Call, cromwellStartTime: Instant, cromwellEndTime: Instant): Long = {
// ToDo: add option to ignore preempted calls and just return 0
val papiV2 = call.backend.asString.equals("PAPIv2")

def getCromwellStart = {
call.executionEvents.find(event => event.description.asString.equals("start")) match {
case Some(nonPapiV2Event) => nonPapiV2Event.startTime
case None => cromwellStartTime
}
}

def getCromwellEnd = {
call.executionEvents.find(event => event.description.asString.equals("ok")) match {
case Some(nonPapiV2Event) => nonPapiV2Event.endTime
case None => cromwellEndTime
}
}

val startTime = if (papiV2) {
val startOption = call.executionEvents.find(event => event.description.asString.contains("Preparing Job"))
startOption match {
case Some(event) => event.startTime
case None => getCromwellStart
}
} else getCromwellStart

val endTime = if (papiV2) {
val endOption = call.executionEvents.find(event => event.description.asString.contains("Worker Released"))
endOption match {
case Some(event) => event.endTime
case None => getCromwellEnd
}
} else getCromwellEnd

val elapsed = Duration.between(startTime, endTime).getSeconds
if (elapsed >= 60) elapsed else 60
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package org.broadinstitute.workbench.ccm


import java.time.format.DateTimeFormatter
import java.time.temporal.{ChronoField, Temporal, TemporalAccessor}
import java.text.SimpleDateFormat
import java.time.Instant

import cats.implicits._
import io.circe.Decoder

import scala.concurrent.duration.{Duration, FiniteDuration}

object JsonCodec {

val formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
implicit val cpuNumberDecoder: Decoder[CpuNumber] = Decoder.decodeString.emap(s => Either.catchNonFatal(s.toInt).leftMap(_.getMessage).map(CpuNumber))
implicit val bootDiskSizeGbDecoder: Decoder[BootDiskSizeGb] = Decoder.decodeString.emap(x => Either.catchNonFatal(x.toInt).leftMap(_.getMessage).map(BootDiskSizeGb))
implicit val preemptibleDecoder: Decoder[Preemptible] = Decoder.decodeString.emap{
Expand All @@ -18,17 +28,43 @@ object JsonCodec {
size <- Either.catchNonFatal(array(1).toInt).leftMap(_.getMessage)
} yield Disks(DiskName(array(0)), DiskSize(size), DiskType(array(2)))
}


implicit val runtimeAttributesDecoder: Decoder[RuntimeAttributes] = Decoder.forProduct4("cpu", "disks", "bootDiskSizeGb", "preemptible")(RuntimeAttributes)
implicit val callDecoder: Decoder[Call] = Decoder.instance{

implicit val executionEventDecoder: Decoder[ExecutionEvent] = Decoder.instance { cursor =>

for {
description <- cursor.downField("description").as[String]
startTime <- cursor.downField("startTime").as[Instant]
endTime <- cursor.downField("endTime").as[Instant]
} yield ExecutionEvent(ExecutionEventDescription(description), startTime, endTime)

}

implicit val callDecoder: Decoder[Call] = Decoder.instance {
cursor =>
for {
ra <- cursor.downField("runtimeAttributes").as[RuntimeAttributes]
executionEvents <- cursor.downField("executionEvents").as[List[ExecutionEvent]]
isPreemptible <- cursor.downField("preemptible").as[Boolean]
isCallCaching <- cursor.downField("callCaching").downField("hit").as[Boolean]
} yield Call(ra, isCallCaching, isPreemptible)
region <- cursor.downField("jes").downField("zone").as[String]
machineType <- cursor.downField("jes").downField("machineType").as[String]
status <- cursor.downField("backendStatus").as[String]
backend <- cursor.downField("backend").as[String]
attempt <- cursor.downField("attempt").as[Int]
} yield Call(ra, executionEvents, isCallCaching, isPreemptible, Region(region), Status(status), MachineType(machineType), BackEnd(backend), Attempt(attempt))
}

implicit val metadataResponseDecoder: Decoder[MetadataResponse] = Decoder.instance {
cursor =>
cursor.downField("calls").as[Map[String, List[Call]]].map(x => MetadataResponse(x.values.flatten.toList))
for {
calls <- cursor.downField("calls").as[Map[String, List[Call]]].map(x => (x.values.flatten.toList))
start <- cursor.downField("start").as[Instant]
end <- cursor.downField("end").as[Instant]
} yield MetadataResponse(calls, start, end)
}


}
36 changes: 33 additions & 3 deletions core/src/main/scala/org/broadinstitute/workbench/ccm/model.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.broadinstitute.workbench.ccm

import java.time.Instant


final case class Cpu(asString: String) extends AnyVal
final case class CpuNumber(asInt: Int) extends AnyVal
final case class BootDiskSizeGb(asInt: Int) extends AnyVal
Expand All @@ -9,9 +12,36 @@ final case class DiskName(asString: String) extends AnyVal
final case class DiskSize(asInt: Int) extends AnyVal
final case class DiskType(asString: String) extends AnyVal
final case class Preemptible(asInt: Int) extends AnyVal
final case class MetadataResponse(value: List[Call]) extends AnyVal
final case class Attempt(asInt: Int) extends AnyVal
final case class Region(asString: String) extends AnyVal
final case class Status(asString: String) extends AnyVal
final case class MachineType(asString: String) extends AnyVal
final case class ExecutionEventDescription(asString: String) extends AnyVal
final case class BackEnd(asString: String) extends AnyVal



final case class ExecutionEvent(description: ExecutionEventDescription,
startTime: Instant,
endTime: Instant)

final case class Call(runtimeAttributes: RuntimeAttributes,
executionEvents: List[ExecutionEvent],
isCallCaching: Boolean,
preemptible: Boolean,
region: Region,
status: Status,
machineType: MachineType,
backend: BackEnd,
attempt: Attempt)

final case class MetadataResponse(calls: List[Call], startTime: Instant, endTime: Instant)


final case class RuntimeAttributes(cpuNumber: CpuNumber,
disks: Disks,
bootDiskSizeGb: BootDiskSizeGb,
preemptible: Preemptible)

final case class Call(runtimeAttributes: RuntimeAttributes, isCallCaching: Boolean, preemptible: Boolean)
final case class RuntimeAttributes(cpuNumber: CpuNumber, disks: Disks, bootDiskSizeGb: BootDiskSizeGb, preemptible: Preemptible)
final case class Disks(diskName: DiskName, diskSize: DiskSize, diskType: DiskType)
final case class Compute(cpu: Cpu, ram: Ram)
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,38 @@ package pricing
import cats.effect.Sync
import io.circe.Json
import org.broadinstitute.workbench.ccm.pricing.JsonCodec._
import org.http4s.Uri
import org.http4s.{EntityDecoder, Uri}
import org.http4s.circe.CirceEntityDecoder._
import org.http4s.client.Client


final case class PriceList(region: Region,
machineType: MachineType,
ssdCostPerGbPerHour: Double,
hddCostPerGbPerHour: Double,
CPUOnDemandPrice: Double,
RAMOnDemandPrice: Double,
extendedOnDemandRAMPrice: Double,
CPUPreemptiblePrice: Double,
RAMPreemptiblePrice: Double,
extendedRAMPreemptiblePrice: Double)

case class GcpPriceList(asJson: Json) extends AnyVal

class GcpPricing[F[_]: Sync](httpClient: Client[F], uri: Uri) {
def getPriceList(): F[GcpPriceList] = {
httpClient.expect[GcpPriceList](uri)

def getGcpPriceList(region: Region, machineType: MachineType): F[PriceList] = {
// F can not be flatmapped
// for {
// json <- httpClient.expect[Json](uri)
// result <- JsonCodec.PriceListDecoder(region, machineType).decodeJson(json)
// } yield result

implicit val priceListDecoder = JsonCodec.PriceListDecoder(region, machineType)
httpClient.expect[PriceList](uri)
}
}

final case class GcpPriceList(asJson: Json) extends AnyVal

//CUSTOM_MACHINE_CPU = "CP-DB-PG-CUSTOM-VM-CORE"
//CUSTOM_MACHINE_RAM = "CP-DB-PG-CUSTOM-VM-RAM"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,120 @@
package org.broadinstitute.workbench.ccm
package pricing

import io.circe.Decoder
import io.circe.{Decoder, DecodingFailure}

//TODO: this needs to be updated to use https://cloud.google.com/billing/v1/how-tos/catalog-api
object JsonCodec {
implicit val cpuDecoder: Decoder[CpuCost] = Decoder.forProduct1("us")(CpuCost.apply)
implicit val ramDecoder: Decoder[RamCost] = Decoder.forProduct1("us")(RamCost.apply)
implicit val computeCostDecoder: Decoder[ComputeCost] = Decoder.forProduct2("CP-DB-PG-CUSTOM-VM-CORE", "CP-DB-PG-CUSTOM-VM-RAM")(ComputeCost.apply)
implicit val gcpPriceListDecoder: Decoder[GcpPriceList] = Decoder.forProduct1("gcp_price_list")(GcpPriceList.apply)


// implicit val categoryDecoder: Decoder[Category] = Decoder.forProduct4("serviceDisplayName", "resourceFamily", "resourceGroup", "usageType")(Category.apply)
implicit val categoryDecoder: Decoder[Category] = Decoder.instance { cursor =>
for {
serviceDisplayName <- cursor.downField("serviceDisplayName").as[String]
resourceFamily <- cursor.downField("resourceFamily").as[String]
resourceGroup <- cursor.downField("resourceGroup").as[String]
usageType <- cursor.downField("usageType").as[String]
} yield Category(ServiceDisplayName(serviceDisplayName), ResourceFamily(resourceFamily), ResourceGroup(resourceGroup), UsageType(usageType))
}

implicit val tieredRateDecoder: Decoder[TieredRate] = Decoder.instance { cursor =>
for {
startUsageAmount <- cursor.downField("startUsageAmount").as[Int]
currencyCode <- cursor.downField("unitPrice").downField("currencyCode").as[String]
units <- cursor.downField("unitPrice").downField("units").as[Int]
nanos <- cursor.downField("unitPrice").downField("nanos").as[Int]
} yield TieredRate(StartUsageAmount(startUsageAmount), CurrencyCode(currencyCode), Units(units), Nanos(nanos))
}

implicit val pricingInfoDecoder: Decoder[PricingInfo] = Decoder.instance { cursor =>
for {
usageUnit <- cursor.downField("pricingExpression").downField("usageUnit").as[String]
tieredRates <- cursor.downField("pricingExpression").downField("tieredRates").as[List[TieredRate]]
} yield PricingInfo(UsageUnit(usageUnit), tieredRates)
}

implicit val googlePriceItemDecoder: Decoder[GooglePriceItem] = Decoder.instance {
// put filtering in here!
cursor =>
for {
name <- cursor.downField("name").as[String]
skuId <- cursor.downField("skuId").as[String]
description <- cursor.downField("description").as[String]
category <- cursor.downField("category").as[Category]
regions <- cursor.downField("serviceRegions").as[List[String]]
pricingInfo <- cursor.downField("pricingInfo").as[List[PricingInfo]]
} yield GooglePriceItem(SkuName(name), SkuId(skuId), SkuDescription(description), category, regions.map(Region(_)), pricingInfo)
}

def PriceListDecoder(region: Region, machineType: MachineType): Decoder[PriceList] = Decoder.instance {
cursor =>
def getPrice(googlePriceItems: List[GooglePriceItem], resourceFamily: ResourceFamily, resourceGroup: ResourceGroup, usageType: UsageType, descriptionShouldInclude: Option[String], descriptionShouldNotInclude: Option[String]): Either[DecodingFailure, Double] = {
val sku = googlePriceItems.filter { priceItem =>
(priceItem.regions.contains(region)
&& priceItem.category.resourceFamily.equals(resourceFamily)
&& priceItem.category.resourceGroup.equals(resourceGroup)
&& priceItem.category.usageType.equals(usageType)
&& (descriptionShouldInclude match {
case Some(desc) => priceItem.description.asString.contains(desc)
case None => true})
&& (descriptionShouldNotInclude match {
case Some(desc) => !priceItem.description.asString.contains(desc)
case None => true}))
}
sku.length match {
case 0 => Left(DecodingFailure(s"No SKUs matched with region $region, resourceFamily $resourceFamily, resourceGroup $resourceGroup, $usageType usageType, and description including $descriptionShouldInclude and notIncluding $descriptionShouldNotInclude in the following price list: $googlePriceItems", List()))
case 1 => Right(getPriceFromSku(sku.head))
case tooMany => Left(DecodingFailure(s"$tooMany SKUs matched with region $region, resourceFamily $resourceFamily, resourceGroup $resourceGroup, $usageType usageType, and description including $descriptionShouldInclude and notIncluding $descriptionShouldNotInclude in the following price list: $googlePriceItems", List()))
}
}

def getPriceFromSku(priceItem: GooglePriceItem): Double = {
// ToDo: Currently just takes first, make it take either most recent or make it dependent on when the call ran
priceItem.pricingInfo.head.tieredRates.filter(rate => rate.startUsageAmount.asInt == 0).head.nanos.asInt.toDouble / 1000000000
}

def priceList(googlePriceList: GooglePriceList): Either[DecodingFailure, PriceList] = {
println(s"GOOGLE PRICE LIST: $googlePriceList")
val filteredByRegion = googlePriceList.priceItems.filter(priceItem => priceItem.regions.contains(region))
println(s"FILTERED BY REGION: $filteredByRegion" )
for {
ssdCostPerGbPerMonth <- getPrice(filteredByRegion, ResourceFamily("Storage"), ResourceGroup("SSD"), UsageType("OnDemand"), None, Some("Regional"))
hddCostPerGbPerMonth <- getPrice(filteredByRegion, ResourceFamily("Storage"), ResourceGroup("PDStandard"), UsageType("OnDemand"), None, Some("Regional"))
cpuOnDemandCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("CPU"), UsageType("OnDemand"), None, None)
ramOnDemandCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("RAM"), UsageType("OnDemand"), None, Some("Custom Extended"))
extendedRamOnDemandCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("RAM"), UsageType("OnDemand"), Some("Custom Extended"), None)
cpuPreemptibleCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("CPU"), UsageType("Preemptible"), None, Some("Custom Extended"))
ramPreemptibleCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("RAM"), UsageType("Preemptible"), None, Some("Custom Extended"))
extendedRamPreemptibleCostGibibytesPerHour <- getPrice(filteredByRegion, ResourceFamily("Compute"), ResourceGroup("RAM"), UsageType("Preemptible"), Some("Custom Extended"), None)
} yield {
val ssdCostPerGbPerHour = ssdCostPerGbPerMonth / (24 * 365 / 12)
val hddCostPerGbPerHour = hddCostPerGbPerMonth / (24 * 365 / 12)
PriceList(
region,
machineType,
ssdCostPerGbPerHour,
hddCostPerGbPerHour,
cpuOnDemandCostGibibytesPerHour,
ramOnDemandCostGibibytesPerHour,
extendedRamOnDemandCostGibibytesPerHour,
cpuPreemptibleCostGibibytesPerHour,
ramPreemptibleCostGibibytesPerHour,
extendedRamPreemptibleCostGibibytesPerHour
)
}
}

for {
googlePriceList <- googlePriceListDecoder.apply(cursor)
result <- priceList(googlePriceList)
} yield {
result
}
}

implicit val googlePriceListDecoder: Decoder[GooglePriceList] = Decoder.forProduct1("skus")(GooglePriceList.apply)
}
Loading

0 comments on commit 89dde04

Please sign in to comment.