Skip to content

Commit

Permalink
twiq-light: Fix to use FirestoreStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
bouzuya committed Dec 28, 2022
1 parent 4e112b9 commit 36f398c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 239 deletions.
21 changes: 12 additions & 9 deletions twiq-light/src/main.rs
Expand Up @@ -98,11 +98,12 @@ enum TweetSubcommand {
},
}

fn tweet_queue_store(config: ConfigOptions) -> anyhow::Result<TweetQueueStore> {
Ok(TweetQueueStore::new(
async fn tweet_queue_store(config: ConfigOptions) -> anyhow::Result<TweetQueueStore> {
TweetQueueStore::new(
config.project_id.context("project_id")?,
config.google_application_credentials,
))
)
.await
}

#[tokio::main]
Expand All @@ -118,7 +119,7 @@ async fn main() -> anyhow::Result<()> {
config,
} => {
authorize::run(
tweet_queue_store(config)?,
tweet_queue_store(config).await?,
client_id,
client_secret,
redirect_uri,
Expand All @@ -129,16 +130,18 @@ async fn main() -> anyhow::Result<()> {
client_id,
client_secret,
config,
} => dequeue::run(tweet_queue_store(config)?, client_id, client_secret).await,
} => dequeue::run(tweet_queue_store(config).await?, client_id, client_secret).await,
QueueSubcommand::Enqueue { tweet, config } => {
enqueue::run(tweet_queue_store(config)?, tweet).await
enqueue::run(tweet_queue_store(config).await?, tweet).await
}
QueueSubcommand::List { config } => {
list_queue::run(tweet_queue_store(config).await?).await
}
QueueSubcommand::List { config } => list_queue::run(tweet_queue_store(config)?).await,
QueueSubcommand::Remove { index, config } => {
remove::run(tweet_queue_store(config)?, index).await
remove::run(tweet_queue_store(config).await?, index).await
}
QueueSubcommand::Reorder { src, dst, config } => {
reorder::run(tweet_queue_store(config)?, src, dst).await
reorder::run(tweet_queue_store(config).await?, src, dst).await
}
},
Resource::Tweet(command) => {
Expand Down
8 changes: 8 additions & 0 deletions twiq-light/src/storage/firestore.rs
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use anyhow::bail;
use async_trait::async_trait;
use google_cloud_auth::{Credential, CredentialConfig};
use tonic::{
Expand Down Expand Up @@ -189,6 +190,13 @@ impl Storage for FirestoreStorage {
}

async fn set_item(&self, key: Self::Key, value: Self::Value) -> anyhow::Result<()> {
// <https://cloud.google.com/firestore/quotas>
// Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes)
let byte_length = value.len();
if byte_length > 1_000_000 {
bail!("Maximum field size exceeded");
}

let name = format!(
"projects/{}/databases/{}/documents/{}/{}",
self.project_id, self.database_id, self.collection_id, key
Expand Down
259 changes: 32 additions & 227 deletions twiq-light/src/store.rs
@@ -1,262 +1,67 @@
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
};

use anyhow::bail;
use google_cloud_auth::{Credential, CredentialConfig};
use tonic::{
codegen::InterceptedService,
metadata::AsciiMetadataValue,
transport::{Channel, ClientTlsConfig, Endpoint},
Code, Request, Status,
};
use std::collections::VecDeque;

use crate::{
domain::ScheduledTweet,
google::firestore::v1::{
firestore_client::FirestoreClient, precondition::ConditionType, value::ValueType,
write::Operation, CommitRequest, Document, GetDocumentRequest, Precondition, Value, Write,
},
storage::{firestore::FirestoreStorage, Storage},
token::Token,
};

#[derive(Debug)]
pub struct TweetQueueStore {
project_id: String,
google_application_credentials: Option<PathBuf>,
storage: FirestoreStorage,
}

type MyInterceptor = Box<dyn Fn(Request<()>) -> Result<Request<()>, Status>>;
type Client = FirestoreClient<InterceptedService<Channel, MyInterceptor>>;

impl TweetQueueStore {
const DATABASE_ID: &str = "(default)";
const COLLECTION_ID: &str = "twiq-light";
const DOCUMENT_ID: &str = "queue";
const FIELD_NAME: &str = "data";
const QUEUE_DOCUMENT_ID: &str = "queue";
const TOKEN_DOCUMENT_ID: &str = "token";

pub fn new(project_id: String, google_application_credentials: Option<String>) -> Self {
Self {
pub async fn new(
project_id: String,
google_application_credentials: Option<String>,
) -> anyhow::Result<Self> {
let storage = FirestoreStorage::new(
google_application_credentials,
project_id,
google_application_credentials: google_application_credentials.map(PathBuf::from),
}
Self::DATABASE_ID.to_owned(),
Self::COLLECTION_ID.to_owned(),
)
.await?;
Ok(Self { storage })
}

pub async fn read_all(&self) -> anyhow::Result<VecDeque<ScheduledTweet>> {
let mut client = self.get_client().await?;
let document_path = self.get_document_path()?;
let document = Self::get_document(&mut client, &document_path).await?;
Ok(match document {
Some(doc) => serde_json::from_str(Self::data_from_document(&doc))?,
let data = self
.storage
.get_item(Self::QUEUE_DOCUMENT_ID.to_owned())
.await?;
Ok(match data {
Some(d) => serde_json::from_str(&d)?,
None => VecDeque::default(),
})
}

pub async fn read_token(&self) -> anyhow::Result<Option<Token>> {
let mut client = self.get_client().await?;
let document_path = self.get_token_document_path()?;
let document = Self::get_document(&mut client, &document_path).await?;
Ok(match document {
Some(doc) => Some(serde_json::from_str(Self::data_from_document(&doc))?),
let data = self
.storage
.get_item(Self::TOKEN_DOCUMENT_ID.to_owned())
.await?;
Ok(match data {
Some(d) => Some(serde_json::from_str(&d)?),
None => None,
})
}

pub async fn write_token(&self, token: &Token) -> anyhow::Result<()> {
let s = token.to_string();

// <https://cloud.google.com/firestore/quotas>
// Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes)
let byte_length = s.len();
if byte_length > 1_000_000 {
bail!("Maximum field size exceeded");
}

let mut client = self.get_client().await?;
let database_path = self.get_database_path()?;
let document_path = self.get_token_document_path()?;
let document = Self::get_document(&mut client, &document_path).await?;
let condition_type = match document {
Some(doc) => {
let update_time = doc.update_time.expect("output contains update_time");
ConditionType::UpdateTime(update_time)
}
None => ConditionType::Exists(false),
};
let document = Self::document_from_data(document_path, s);
let writes = vec![Write {
update_mask: None,
update_transforms: vec![],
current_document: Some(Precondition {
condition_type: Some(condition_type),
}),
operation: Some(Operation::Update(document)),
}];
client
.commit(CommitRequest {
database: database_path,
writes,
transaction: vec![],
})
.await?;
Ok(())
self.storage
.set_item(Self::TOKEN_DOCUMENT_ID.to_owned(), token.to_string())
.await
}

pub async fn write_all(&self, data: &VecDeque<ScheduledTweet>) -> anyhow::Result<()> {
let s = serde_json::to_string(&data)?;
// <https://cloud.google.com/firestore/quotas>
// Maximum size of a field value: 1 MiB - 89 bytes (1,048,487 bytes)
let byte_length = s.len();
if byte_length > 1_000_000 {
bail!("Maximum field size exceeded");
}

let mut client = self.get_client().await?;
let database_path = self.get_database_path()?;
let document_path = self.get_document_path()?;
let document = Self::get_document(&mut client, &document_path).await?;
let condition_type = match document {
Some(doc) => {
let update_time = doc.update_time.expect("output contains update_time");
ConditionType::UpdateTime(update_time)
}
None => ConditionType::Exists(false),
};
let document = Self::document_from_data(document_path, s);
let writes = vec![Write {
update_mask: None,
update_transforms: vec![],
current_document: Some(Precondition {
condition_type: Some(condition_type),
}),
operation: Some(Operation::Update(document)),
}];
client
.commit(CommitRequest {
database: database_path,
writes,
transaction: vec![],
})
.await?;
Ok(())
}

fn data_from_document(document: &Document) -> &str {
let value = document
.fields
.get(Self::FIELD_NAME)
.expect("field not found");
match value.value_type.as_ref() {
Some(value_type) => match value_type {
ValueType::StringValue(s) => s.as_str(),
_ => unreachable!("value_type is not string"),
},
None => unreachable!(),
}
}

fn document_from_data(document_path: String, s: String) -> Document {
Document {
name: document_path,
fields: {
let mut fields = HashMap::new();
fields.insert(
Self::FIELD_NAME.to_owned(),
Value {
value_type: Some(ValueType::StringValue(s)),
},
);
fields
},
create_time: None,
update_time: None,
}
}

async fn get_client(&self) -> anyhow::Result<Client> {
let config = CredentialConfig::builder()
.scopes(vec!["https://www.googleapis.com/auth/cloud-platform".into()])
.build()?;
let credential = match self.google_application_credentials.as_ref() {
Some(file_path) => Credential::find(file_path, config).await?,
None => {
// GOOGLE_APPLICATION_CREDENTIALS environment variable
Credential::find_default(config).await?
}
};
let channel = Endpoint::from_static("https://firestore.googleapis.com")
.tls_config(ClientTlsConfig::new().domain_name("firestore.googleapis.com"))?
.connect()
.await?;
let access_token = credential.access_token().await?;
let mut metadata_value =
AsciiMetadataValue::try_from(format!("Bearer {}", access_token.value))?;
metadata_value.set_sensitive(true);
let client: FirestoreClient<InterceptedService<Channel, MyInterceptor>> =
FirestoreClient::with_interceptor(
channel,
Box::new(move |mut request: Request<()>| {
request
.metadata_mut()
.insert("authorization", metadata_value.clone());
Ok(request)
}),
);
Ok(client)
}

fn get_database_path(&self) -> anyhow::Result<String> {
let database_path = format!(
"projects/{}/databases/{}",
self.project_id,
Self::DATABASE_ID
);
Ok(database_path)
}

async fn get_document(
client: &mut Client,
document_path: &str,
) -> anyhow::Result<Option<Document>> {
let document = client
.get_document(GetDocumentRequest {
name: document_path.to_owned(),
mask: None,
consistency_selector: None,
})
self.storage
.set_item(Self::QUEUE_DOCUMENT_ID.to_owned(), s)
.await
.map(|response| Some(response.into_inner()))
.or_else(|status| {
if matches!(status.code(), Code::NotFound) {
Ok(None)
} else {
Err(status)
}
})?;
Ok(document)
}

fn get_document_path(&self) -> anyhow::Result<String> {
let database_path = self.get_database_path()?;
let document_path = format!(
"{}/documents/{}/{}",
database_path,
Self::COLLECTION_ID,
Self::DOCUMENT_ID
);
Ok(document_path)
}

fn get_token_document_path(&self) -> anyhow::Result<String> {
let database_path = self.get_database_path()?;
let document_path = format!(
"{}/documents/{}/{}",
database_path,
Self::COLLECTION_ID,
Self::TOKEN_DOCUMENT_ID
);
Ok(document_path)
}
}
5 changes: 2 additions & 3 deletions twiq-light/src/tweet_store.rs
Expand Up @@ -30,12 +30,11 @@ impl TweetStore {
}

pub async fn write_all(&self, data: &BTreeMap<String, MyTweet>) -> anyhow::Result<()> {
Ok(self
.storage
self.storage
.set_item(
PathBuf::from("twiq-light.json"),
serde_json::to_string(data)?,
)
.await?)
.await
}
}

0 comments on commit 36f398c

Please sign in to comment.