diff --git a/twiq-light/src/main.rs b/twiq-light/src/main.rs index a06d2b66..e0a02de1 100644 --- a/twiq-light/src/main.rs +++ b/twiq-light/src/main.rs @@ -98,11 +98,12 @@ enum TweetSubcommand { }, } -fn tweet_queue_store(config: ConfigOptions) -> anyhow::Result { - Ok(TweetQueueStore::new( +async fn tweet_queue_store(config: ConfigOptions) -> anyhow::Result { + TweetQueueStore::new( config.project_id.context("project_id")?, config.google_application_credentials, - )) + ) + .await } #[tokio::main] @@ -118,7 +119,7 @@ async fn main() -> anyhow::Result<()> { config, } => { authorize::run( - tweet_queue_store(config)?, + tweet_queue_store(config).await?, client_id, client_secret, redirect_uri, @@ -129,16 +130,18 @@ async fn main() -> anyhow::Result<()> { client_id, client_secret, config, - } => dequeue::run(tweet_queue_store(config)?, client_id, client_secret).await, + } => dequeue::run(tweet_queue_store(config).await?, client_id, client_secret).await, QueueSubcommand::Enqueue { tweet, config } => { - enqueue::run(tweet_queue_store(config)?, tweet).await + enqueue::run(tweet_queue_store(config).await?, tweet).await + } + QueueSubcommand::List { config } => { + list_queue::run(tweet_queue_store(config).await?).await } - QueueSubcommand::List { config } => list_queue::run(tweet_queue_store(config)?).await, QueueSubcommand::Remove { index, config } => { - remove::run(tweet_queue_store(config)?, index).await + remove::run(tweet_queue_store(config).await?, index).await } QueueSubcommand::Reorder { src, dst, config } => { - reorder::run(tweet_queue_store(config)?, src, dst).await + reorder::run(tweet_queue_store(config).await?, src, dst).await } }, Resource::Tweet(command) => { diff --git a/twiq-light/src/storage/firestore.rs b/twiq-light/src/storage/firestore.rs index f1af5f20..fae4ecf3 100644 --- a/twiq-light/src/storage/firestore.rs +++ b/twiq-light/src/storage/firestore.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use anyhow::bail; use async_trait::async_trait; use google_cloud_auth::{Credential, CredentialConfig}; use tonic::{ @@ -189,6 +190,13 @@ impl Storage for FirestoreStorage { } async fn set_item(&self, key: Self::Key, value: Self::Value) -> anyhow::Result<()> { + // + // Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes) + let byte_length = value.len(); + if byte_length > 1_000_000 { + bail!("Maximum field size exceeded"); + } + let name = format!( "projects/{}/databases/{}/documents/{}/{}", self.project_id, self.database_id, self.collection_id, key diff --git a/twiq-light/src/store.rs b/twiq-light/src/store.rs index 86ab5335..643249b6 100644 --- a/twiq-light/src/store.rs +++ b/twiq-light/src/store.rs @@ -1,262 +1,67 @@ -use std::{ - collections::{HashMap, VecDeque}, - path::PathBuf, -}; - -use anyhow::bail; -use google_cloud_auth::{Credential, CredentialConfig}; -use tonic::{ - codegen::InterceptedService, - metadata::AsciiMetadataValue, - transport::{Channel, ClientTlsConfig, Endpoint}, - Code, Request, Status, -}; +use std::collections::VecDeque; use crate::{ domain::ScheduledTweet, - google::firestore::v1::{ - firestore_client::FirestoreClient, precondition::ConditionType, value::ValueType, - write::Operation, CommitRequest, Document, GetDocumentRequest, Precondition, Value, Write, - }, + storage::{firestore::FirestoreStorage, Storage}, token::Token, }; -#[derive(Debug)] pub struct TweetQueueStore { - project_id: String, - google_application_credentials: Option, + storage: FirestoreStorage, } -type MyInterceptor = Box) -> Result, Status>>; -type Client = FirestoreClient>; - impl TweetQueueStore { const DATABASE_ID: &str = "(default)"; const COLLECTION_ID: &str = "twiq-light"; - const DOCUMENT_ID: &str = "queue"; - const FIELD_NAME: &str = "data"; + const QUEUE_DOCUMENT_ID: &str = "queue"; const TOKEN_DOCUMENT_ID: &str = "token"; - pub fn new(project_id: String, google_application_credentials: Option) -> Self { - Self { + pub async fn new( + project_id: String, + google_application_credentials: Option, + ) -> anyhow::Result { + let storage = FirestoreStorage::new( + google_application_credentials, project_id, - google_application_credentials: google_application_credentials.map(PathBuf::from), - } + Self::DATABASE_ID.to_owned(), + Self::COLLECTION_ID.to_owned(), + ) + .await?; + Ok(Self { storage }) } pub async fn read_all(&self) -> anyhow::Result> { - let mut client = self.get_client().await?; - let document_path = self.get_document_path()?; - let document = Self::get_document(&mut client, &document_path).await?; - Ok(match document { - Some(doc) => serde_json::from_str(Self::data_from_document(&doc))?, + let data = self + .storage + .get_item(Self::QUEUE_DOCUMENT_ID.to_owned()) + .await?; + Ok(match data { + Some(d) => serde_json::from_str(&d)?, None => VecDeque::default(), }) } pub async fn read_token(&self) -> anyhow::Result> { - let mut client = self.get_client().await?; - let document_path = self.get_token_document_path()?; - let document = Self::get_document(&mut client, &document_path).await?; - Ok(match document { - Some(doc) => Some(serde_json::from_str(Self::data_from_document(&doc))?), + let data = self + .storage + .get_item(Self::TOKEN_DOCUMENT_ID.to_owned()) + .await?; + Ok(match data { + Some(d) => Some(serde_json::from_str(&d)?), None => None, }) } pub async fn write_token(&self, token: &Token) -> anyhow::Result<()> { - let s = token.to_string(); - - // - // Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes) - let byte_length = s.len(); - if byte_length > 1_000_000 { - bail!("Maximum field size exceeded"); - } - - let mut client = self.get_client().await?; - let database_path = self.get_database_path()?; - let document_path = self.get_token_document_path()?; - let document = Self::get_document(&mut client, &document_path).await?; - let condition_type = match document { - Some(doc) => { - let update_time = doc.update_time.expect("output contains update_time"); - ConditionType::UpdateTime(update_time) - } - None => ConditionType::Exists(false), - }; - let document = Self::document_from_data(document_path, s); - let writes = vec![Write { - update_mask: None, - update_transforms: vec![], - current_document: Some(Precondition { - condition_type: Some(condition_type), - }), - operation: Some(Operation::Update(document)), - }]; - client - .commit(CommitRequest { - database: database_path, - writes, - transaction: vec![], - }) - .await?; - Ok(()) + self.storage + .set_item(Self::TOKEN_DOCUMENT_ID.to_owned(), token.to_string()) + .await } pub async fn write_all(&self, data: &VecDeque) -> anyhow::Result<()> { let s = serde_json::to_string(&data)?; - // - // Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes) - let byte_length = s.len(); - if byte_length > 1_000_000 { - bail!("Maximum field size exceeded"); - } - - let mut client = self.get_client().await?; - let database_path = self.get_database_path()?; - let document_path = self.get_document_path()?; - let document = Self::get_document(&mut client, &document_path).await?; - let condition_type = match document { - Some(doc) => { - let update_time = doc.update_time.expect("output contains update_time"); - ConditionType::UpdateTime(update_time) - } - None => ConditionType::Exists(false), - }; - let document = Self::document_from_data(document_path, s); - let writes = vec![Write { - update_mask: None, - update_transforms: vec![], - current_document: Some(Precondition { - condition_type: Some(condition_type), - }), - operation: Some(Operation::Update(document)), - }]; - client - .commit(CommitRequest { - database: database_path, - writes, - transaction: vec![], - }) - .await?; - Ok(()) - } - - fn data_from_document(document: &Document) -> &str { - let value = document - .fields - .get(Self::FIELD_NAME) - .expect("field not found"); - match value.value_type.as_ref() { - Some(value_type) => match value_type { - ValueType::StringValue(s) => s.as_str(), - _ => unreachable!("value_type is not string"), - }, - None => unreachable!(), - } - } - - fn document_from_data(document_path: String, s: String) -> Document { - Document { - name: document_path, - fields: { - let mut fields = HashMap::new(); - fields.insert( - Self::FIELD_NAME.to_owned(), - Value { - value_type: Some(ValueType::StringValue(s)), - }, - ); - fields - }, - create_time: None, - update_time: None, - } - } - - async fn get_client(&self) -> anyhow::Result { - let config = CredentialConfig::builder() - .scopes(vec!["https://www.googleapis.com/auth/cloud-platform".into()]) - .build()?; - let credential = match self.google_application_credentials.as_ref() { - Some(file_path) => Credential::find(file_path, config).await?, - None => { - // GOOGLE_APPLICATION_CREDENTIALS environment variable - Credential::find_default(config).await? - } - }; - let channel = Endpoint::from_static("https://firestore.googleapis.com") - .tls_config(ClientTlsConfig::new().domain_name("firestore.googleapis.com"))? - .connect() - .await?; - let access_token = credential.access_token().await?; - let mut metadata_value = - AsciiMetadataValue::try_from(format!("Bearer {}", access_token.value))?; - metadata_value.set_sensitive(true); - let client: FirestoreClient> = - FirestoreClient::with_interceptor( - channel, - Box::new(move |mut request: Request<()>| { - request - .metadata_mut() - .insert("authorization", metadata_value.clone()); - Ok(request) - }), - ); - Ok(client) - } - - fn get_database_path(&self) -> anyhow::Result { - let database_path = format!( - "projects/{}/databases/{}", - self.project_id, - Self::DATABASE_ID - ); - Ok(database_path) - } - - async fn get_document( - client: &mut Client, - document_path: &str, - ) -> anyhow::Result> { - let document = client - .get_document(GetDocumentRequest { - name: document_path.to_owned(), - mask: None, - consistency_selector: None, - }) + self.storage + .set_item(Self::QUEUE_DOCUMENT_ID.to_owned(), s) .await - .map(|response| Some(response.into_inner())) - .or_else(|status| { - if matches!(status.code(), Code::NotFound) { - Ok(None) - } else { - Err(status) - } - })?; - Ok(document) - } - - fn get_document_path(&self) -> anyhow::Result { - let database_path = self.get_database_path()?; - let document_path = format!( - "{}/documents/{}/{}", - database_path, - Self::COLLECTION_ID, - Self::DOCUMENT_ID - ); - Ok(document_path) - } - - fn get_token_document_path(&self) -> anyhow::Result { - let database_path = self.get_database_path()?; - let document_path = format!( - "{}/documents/{}/{}", - database_path, - Self::COLLECTION_ID, - Self::TOKEN_DOCUMENT_ID - ); - Ok(document_path) } } diff --git a/twiq-light/src/tweet_store.rs b/twiq-light/src/tweet_store.rs index 233742e8..ce23189c 100644 --- a/twiq-light/src/tweet_store.rs +++ b/twiq-light/src/tweet_store.rs @@ -30,12 +30,11 @@ impl TweetStore { } pub async fn write_all(&self, data: &BTreeMap) -> anyhow::Result<()> { - Ok(self - .storage + self.storage .set_item( PathBuf::from("twiq-light.json"), serde_json::to_string(data)?, ) - .await?) + .await } }