Skip to content

Commit

Permalink
more tests and fixing the surprises that came of them
Browse files Browse the repository at this point in the history
  • Loading branch information
Hussein Elgridly committed Jul 12, 2019
1 parent 7e6366b commit 135bfc0
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ object GoogleUtilities {
case _ => false
}

def whenIOException(throwable: Throwable): Boolean = throwable match {
def whenNonHttpIOException(throwable: Throwable): Boolean = throwable match {
case _: GoogleJsonResponseException => false
case _: GoogleHttpResponseException => false
case _: IOException => true
case _ => false
}
Expand Down Expand Up @@ -78,12 +80,12 @@ trait GoogleUtilities extends LazyLogging with InstrumentedRetry with GoogleInst
retryExponentially(when500orGoogleError)(() => Future(blocking(op())))
}

private def combine(predicates: Seq[Throwable => Boolean]): (Throwable => Boolean) = { throwable =>
protected def combine(predicates: Seq[Throwable => Boolean]): (Throwable => Boolean) = { throwable =>
predicates.map( _(throwable) ).foldLeft(false)(_ || _)
}

//Retry if any of the predicates return true.
protected def retry[T](predicates: (Throwable => Boolean)*)(op: () => T): Future[T] = {
protected def retry[T](predicates: (Throwable => Boolean)*)(op: () => T)(implicit histo: Histogram): Future[T] = {
retryExponentially(combine(predicates))(() => Future(blocking(op())))
}

Expand All @@ -93,7 +95,7 @@ trait GoogleUtilities extends LazyLogging with InstrumentedRetry with GoogleInst
}

//Retry if any of the predicates return true.
protected def retryWithRecover[T](predicates: (Throwable => Boolean)*)(op: () => T)(recover: PartialFunction[Throwable, T]) : Future[T] = {
protected def retryWithRecover[T](predicates: (Throwable => Boolean)*)(op: () => T)(recover: PartialFunction[Throwable, T])(implicit histo: Histogram) : Future[T] = {
retryExponentially(combine(predicates))(() => Future(blocking(op())).recover(recover))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class HttpGoogleBigQueryDAO(appName: String,
private def submitQuery(projectId: String, job: Job): Future[JobReference] = {
val queryRequest = bigquery.jobs.insert(projectId, job)

retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(queryRequest)
} map { job =>
job.getJobReference
Expand Down Expand Up @@ -61,7 +61,7 @@ class HttpGoogleBigQueryDAO(appName: String,
override def getQueryStatus(jobRef: JobReference): Future[Job] = {
val statusRequest = bigquery.jobs.get(jobRef.getProjectId, jobRef.getJobId)

retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(statusRequest)
}
}
Expand All @@ -71,7 +71,7 @@ class HttpGoogleBigQueryDAO(appName: String,
Future.failed(new WorkbenchException(s"job ${job.getJobReference.getJobId} not done"))

val resultRequest = bigquery.jobs.getQueryResults(job.getJobReference.getProjectId, job.getJobReference.getJobId)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(resultRequest)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HttpGoogleDirectoryDAO(appName: String,

def updateGroupSettings(groupEmail: WorkbenchEmail, settings: GroupSettings) = {
val updater = settingsClient.groups().update(groupEmail.value, settings)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) (() => { executeGoogleRequest(updater) })
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) (() => { executeGoogleRequest(updater) })
}
}

Expand Down Expand Up @@ -87,7 +87,7 @@ class HttpGoogleDirectoryDAO(appName: String,
val inserter = groups.insert(group)

for {
_ <- retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) (() => { executeGoogleRequest(inserter) })
_ <- retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) (() => { executeGoogleRequest(inserter) })
_ <- groupSettings match {
case None => Future.successful(())
case Some(settings) => new GroupSettingsDAO().updateGroupSettings(groupEmail, settings)
Expand All @@ -99,7 +99,7 @@ class HttpGoogleDirectoryDAO(appName: String,
val groups = directory.groups
val deleter = groups.delete(groupEmail.value)

retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
executeGoogleRequest(deleter)
()
}) {
Expand All @@ -111,7 +111,7 @@ class HttpGoogleDirectoryDAO(appName: String,
val member = new Member().setEmail(memberEmail.value).setRole(groupMemberRole)
val inserter = directory.members.insert(groupEmail.value, member)

retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
executeGoogleRequest(inserter)
()
}) {
Expand All @@ -124,7 +124,7 @@ class HttpGoogleDirectoryDAO(appName: String,
override def removeMemberFromGroup(groupEmail: WorkbenchEmail, memberEmail: WorkbenchEmail): Future[Unit] = {
val deleter = directory.members.delete(groupEmail.value, memberEmail.value)

retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
executeGoogleRequest(deleter)
()
}) {
Expand All @@ -136,15 +136,15 @@ class HttpGoogleDirectoryDAO(appName: String,
override def getGoogleGroup(groupEmail: WorkbenchEmail): Future[Option[Group]] = {
val getter = directory.groups().get(groupEmail.value)

retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => { Option(executeGoogleRequest(getter)) }){
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => { Option(executeGoogleRequest(getter)) }){
case e: HttpResponseException if e.getStatusCode == StatusCodes.NotFound.intValue => None
}
}

override def isGroupMember(groupEmail: WorkbenchEmail, memberEmail: WorkbenchEmail): Future[Boolean] = {
val getter = directory.members.get(groupEmail.value, memberEmail.value)

retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
executeGoogleRequest(getter)
true
}) {
Expand Down Expand Up @@ -181,14 +181,14 @@ class HttpGoogleDirectoryDAO(appName: String,
implicit val service = GoogleInstrumentedService.Groups
accumulated match {
// when accumulated has a Nil list then this must be the first request
case Some(Nil) => retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
case Some(Nil) => retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
Option(executeGoogleRequest(fetcher))
}) {
case e: HttpResponseException if e.getStatusCode == StatusCodes.NotFound.intValue => None
}.flatMap(firstPage => listGroupMembersRecursive(fetcher, firstPage.map(List(_))))

// the head is the Members object of the prior request which contains next page token
case Some(head :: _) if head.getNextPageToken != null => retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException)(() => {
case Some(head :: _) if head.getNextPageToken != null => retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException)(() => {
executeGoogleRequest(fetcher.setPageToken(head.getNextPageToken))
}).flatMap(nextPage => listGroupMembersRecursive(fetcher, accumulated.map(pages => nextPage :: pages)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class HttpGoogleIamDAO(appName: String,
val getter = iam.projects().serviceAccounts().get(name)

//Return a Future[Option[ServiceAccount]]. The future fails if we get a Google error we don't understand. The Option is None if we get a 404, i.e. the SA doesn't exist.
val findOption = OptionT(retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
val findOption = OptionT(retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
Option(executeGoogleRequest(getter))
} {
case t: GoogleJsonResponseException if t.getStatusCode == 404 =>
Expand All @@ -101,7 +101,7 @@ class HttpGoogleIamDAO(appName: String,
val request = new CreateServiceAccountRequest().setAccountId(serviceAccountName.value)
.setServiceAccount(new ServiceAccount().setDisplayName(displayName.value))
val inserter = iam.projects().serviceAccounts().create(s"projects/${serviceAccountProject.value}", request)
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(inserter)
} {
case t: GoogleJsonResponseException if t.getStatusCode == StatusCodes.NotFound.intValue => throw new WorkbenchException(s"The project [${serviceAccountProject.value}] was not found")
Expand All @@ -114,7 +114,7 @@ class HttpGoogleIamDAO(appName: String,
val serviceAccountEmail = toServiceAccountEmail(serviceAccountProject, serviceAccountName)
val name = s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}"
val deleter = iam.projects().serviceAccounts().delete(name)
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(deleter)
()
} {
Expand All @@ -126,7 +126,7 @@ class HttpGoogleIamDAO(appName: String,
override def testIamPermission(project: GoogleProject, iamPermissions: Set[IamPermission]): Future[Set[IamPermission]] = {
val testRequest = new TestIamPermissionsRequest().setPermissions(iamPermissions.map(p => p.value).toList.asJava)
val request = cloudResourceManager.projects().testIamPermissions(project.value, testRequest)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
} map { response =>
Option(response.getPermissions).getOrElse(Collections.emptyList()).asScala.toSet.map(IamPermission)
Expand All @@ -139,7 +139,7 @@ class HttpGoogleIamDAO(appName: String,
val updatedPolicy = updatePolicy(policy, userEmail, rolesToAdd, Set.empty)
val policyRequest = new ProjectSetIamPolicyRequest().setPolicy(updatedPolicy).setUpdateMask("bindings,etag")
val request = cloudResourceManager.projects().setIamPolicy(iamProject.value, policyRequest)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
}.void
}
Expand All @@ -151,7 +151,7 @@ class HttpGoogleIamDAO(appName: String,
val updatedPolicy = updatePolicy(policy, userEmail, Set.empty, rolesToRemove)
val policyRequest = new ProjectSetIamPolicyRequest().setPolicy(updatedPolicy).setUpdateMask("bindings,etag")
val request = cloudResourceManager.projects().setIamPolicy(iamProject.value, policyRequest)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
}.void
}
Expand All @@ -167,7 +167,7 @@ class HttpGoogleIamDAO(appName: String,
val updatedPolicy = updatePolicy(policy, userEmail, Set("roles/iam.serviceAccountUser"), Set.empty)
val policyRequest = new ServiceAccountSetIamPolicyRequest().setPolicy(updatedPolicy)
val request = iam.projects().serviceAccounts().setIamPolicy(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}", policyRequest)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
}.void
}
Expand All @@ -178,14 +178,14 @@ class HttpGoogleIamDAO(appName: String,
.setPrivateKeyType("TYPE_GOOGLE_CREDENTIALS_FILE")
.setKeyAlgorithm("KEY_ALG_RSA_2048")
val creater = iam.projects().serviceAccounts().keys().create(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}", request)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(creater)
} map googleKeyToWorkbenchKey
}

override def removeServiceAccountKey(serviceAccountProject: GoogleProject, serviceAccountEmail: WorkbenchEmail, keyId: ServiceAccountKeyId): Future[Unit] = {
val request = iam.projects().serviceAccounts().keys().delete(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}/keys/${keyId.value}")
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException){ () =>
retryWithRecover(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException){ () =>
executeGoogleRequest(request)
()
} {
Expand All @@ -196,7 +196,7 @@ class HttpGoogleIamDAO(appName: String,
override def listServiceAccountKeys(serviceAccountProject: GoogleProject, serviceAccountEmail: WorkbenchEmail): Future[Seq[ServiceAccountKey]] = {
val request = iam.projects().serviceAccounts().keys().list(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}")

retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
} map { response =>
Option(response.getKeys).getOrElse(Collections.emptyList()).asScala map googleKeyToWorkbenchKey
Expand All @@ -206,7 +206,7 @@ class HttpGoogleIamDAO(appName: String,
override def listUserManagedServiceAccountKeys(serviceAccountProject: GoogleProject, serviceAccountEmail: WorkbenchEmail): Future[Seq[ServiceAccountKey]] = {
val request = iam.projects().serviceAccounts().keys().list(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}").setKeyTypes(List("USER_MANAGED").asJava)

retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
} map { response =>
Option(response.getKeys).getOrElse(Collections.emptyList()).asScala map googleKeyToWorkbenchKey
Expand All @@ -229,14 +229,14 @@ class HttpGoogleIamDAO(appName: String,

private def getProjectPolicy(googleProject: GoogleProject): Future[Policy] = {
val request = cloudResourceManager.projects().getIamPolicy(googleProject.value, null)
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
}
}

private def getServiceAccountPolicy(serviceAccountProject: GoogleProject, serviceAccountEmail: WorkbenchEmail): Future[Policy] = {
val request = iam.projects().serviceAccounts().getIamPolicy(s"projects/${serviceAccountProject.value}/serviceAccounts/${serviceAccountEmail.value}")
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenIOException) { () =>
retry(when5xx, whenRateLimited, when404, whenInvalidValueOnBucketCreation, whenNonHttpIOException) { () =>
executeGoogleRequest(request)
}
}
Expand Down
Loading

0 comments on commit 135bfc0

Please sign in to comment.