Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(shared-cache): At startup retry token retrieval #627

Merged
merged 1 commit into from Jan 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 39 additions & 24 deletions crates/symbolicator/src/services/shared_cache.rs
Expand Up @@ -10,10 +10,11 @@ use std::fmt;
use std::io::SeekFrom;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context, Error, Result};
use futures::{Future, TryStreamExt};
use gcp_auth::Token;
use reqwest::{Body, Client, StatusCode};
use tempfile::NamedTempFile;
use tokio::fs::{self, File};
Expand Down Expand Up @@ -73,16 +74,27 @@ impl GcsState {
let auth_manager = match config.service_account_path {
Some(ref path) => gcp_auth::from_credentials_file(&path).await?,
None => {
let future = async move {
gcp_auth::init()
// For fresh k8s pods the GKE metadata server may not accept connections
// yet, we we need to retry this for a bit.
const MAX_DELAY: Duration = Duration::from_secs(11);
const RETRY_INTERVAL: Duration = Duration::from_millis(500);
let start = Instant::now();
loop {
let future = async move {
gcp_auth::init()
.await
.context("Failed to initialise authentication token")
};
match tokio::time::timeout(Duration::from_secs(1), future)
.await
.context("Failed to initialise authentication token")
};
tokio::time::timeout(Duration::from_millis(300), future)
.await
.unwrap_or_else(|_| {
Err(Error::msg("Timeout initialising GCS authentication token"))
})?
.unwrap_or_else(|_elapsed| {
Err(Error::msg("Timeout initialising GCS authentication token"))
}) {
Ok(auth_manager) => break auth_manager,
Err(err) if start.elapsed() > MAX_DELAY => return Err(err),
_ => tokio::time::sleep(RETRY_INTERVAL).await,
}
}
}
};
Ok(Self {
Expand All @@ -92,6 +104,21 @@ impl GcsState {
})
}

/// Returns a GCP authentication token, with timeout and error handling.
///
/// Refreshing tokens involves talking to services over networks, this might fail.
async fn get_token(&self) -> Result<Token> {
let future = async {
self.auth_manager
.get_token(&["https://www.googleapis.com/auth/devstorage.read_write"])
.await
.context("Failed to get authentication token")
};
tokio::time::timeout(Duration::from_millis(300), future)
.await
.unwrap_or_else(|_| Err(Error::msg("Timeout refreshing GCS authentication token")))
}

/// Fetches item from shared cache if available and copies them to the writer.
///
/// # Returns
Expand All @@ -101,15 +128,7 @@ impl GcsState {
where
W: tokio::io::AsyncWrite + Unpin,
{
let future = async {
self.auth_manager
.get_token(&["https://www.googleapis.com/auth/devstorage.read_write"])
.await
.context("Failed to get authentication token")
};
let token = tokio::time::timeout(Duration::from_millis(300), future)
.await
.unwrap_or_else(|_| Err(Error::msg("Timeout refreshing GCS authentication token")))?;
let token = self.get_token().await?;

let mut url = Url::parse("https://www.googleapis.com/download/storage/v1/b?alt=media")
.map_err(|_| GcsError::InvalidUrl)?;
Expand Down Expand Up @@ -165,11 +184,7 @@ impl GcsState {
async fn store(&self, key: SharedCacheKey, mut src: File) -> Result<SharedCacheStoreResult> {
let total_bytes = src.seek(SeekFrom::End(0)).await?;
src.rewind().await?;
let token = self
.auth_manager
.get_token(&["https://www.googleapis.com/auth/devstorage.read_write"])
.await
.context("gcp_auth failed to get token")?;
let token = self.get_token().await?;
let mut url =
Url::parse("https://storage.googleapis.com/upload/storage/v1/b?uploadType=media")
.map_err(|_| GcsError::InvalidUrl)?;
Expand Down