Skip to content

Commit

Permalink
feat(examples): Add example for service accounts + Cloud Pubsub
Browse files Browse the repository at this point in the history
Please note that I specified version 0.9.4 in the example crate
(anticipating the version bump)
  • Loading branch information
dermesser committed Sep 21, 2016
1 parent ed0a3c4 commit 4ba4353
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
10 changes: 10 additions & 0 deletions examples/service_account/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "service_account"
version = "0.1.0"
authors = ["Lewin Bormann <lewin@lewin-bormann.info>"]

[dependencies]
base64 = "0.2"
yup-oauth2 = "0.6.4"
google-pubsub1 = { version = "0.1", path = "../../../google-apis-rs/gen/pubsub1" }
hyper = "0.9"
12 changes: 12 additions & 0 deletions examples/service_account/pubsub-auth.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "sanguine-rhythm-105020",
"private_key_id": "bbd866b207d9081b56d6cdce09782bcc5cc7b55b",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCgSvDJJ+r88GO9\nJgAserwS+T+BL7BKHjR9EWi6AaTkIMbfbR9juDa0hAtgEdW68Qmdo1ABLtjITzXM\nE2hR4Q00Q3QV+kZztfpXOyBcARzI0oUG33EY9BOsXU4fAhIMEjGj9wP2YUlPIEYt\n6CGarRCJoTykITvFL5ERtt/7yHKdcmC1XqdnHz1loJokoLixrWIQ9vHYFnlAGXN8\n4HB63GIdbF1gbrQ+yCbJTSkSf+Jf+MSVZ3bE4QNmwxj9pBpqkQuvR2c/uNHGHDzD\nKGnENsbkqgUnSc4Tv+0Im6QP1NjZytmlmhGkEyj2Bws/mbWGL8qjTQUdJoF/JSpd\nzjVDaDRpAgMBAAECggEAQafwkLSFEC327JcQIyfyuWLWnDfIYdOVdFUZLHdX7wlt\n7D5qSRI/dhlP/33Oqc/pPGSUdKeXrBSl2H1qkX85RVsLxaTKDsNPU0OosYbTH377\ndiE31hzDoiplwVW7xq39H0gim1ymBn8Vv+ABQ7X3vCgLCr+CF/av2S2q+RrVt59n\n+YXT+ucANHc33T5pV/AdvKXcfDvmzbXKZQ7orY3u1hvgjJC7uy7FKEMGnGbiVWN5\nMvSaUygHB+McSqxofWFqUxSmFhS3DNtQDH7Jf7j/s6IHaBFE4m63ursCITLOBv6Y\nEKT+Fr5WcDsbxUAI8E3Ol+BkjEko9mblGWCx2IWeMQKBgQDOxjlVKaFZn84SXvah\nLEYVgvUtyg3HyKbF9fCs9YN04nEoT+WtWAn7whBk/nBARJAmVAsqW0zVR3rQ6fCW\nBESLIAlNwVTQtnrUM9F0SNzpzRWUQkMNvIpORDO0ENR1QTKdpDLzjeW3+QEvUhWy\nv363U6m2tVyi3dzH0ou6UXf3vwKBgQDGc+p8O3rO8y8R+MwEdrQIkhemqYPDMHO0\n8D8D6E/roLrUPrfeKW/ZtaQAKg6nSthwRA0/QRVI3RP7MfHRcCka7tjZHWVd2ygh\n8/FRbb9DFuBwCRF2HFp8sFCfM0DJlQqHnmcq8cgmzrLQhMSIlxjYDcTVNUQ7LVuA\nQeMfMQmd1wKBgA1O4L3EcigMivzPs9vnt5MG3LTXvk9PcLxo+daH7rAFKvdHPScM\n9YeIk/mMDrl0MDrnSdiuC6HmedccV2mwWKEDrNWeTdEpDXq+4woy7lT16B+krCAk\nNFqZNSKvupzZV2KiAM2AInrWwebDx7W7cXJnJhMmVRH+aYlK2cdiGvCNAoGAKKGO\n/sSGzlxH5NwVmUxca+zNXtgSHCKNbhxKKpij1FBiMBZvOtMtDVv9KfNycJoIWmP0\nemDu5K4u6x08r27RW6xPRZSZLnAydVEmtU9M+4Vxij6xDoebly/aMd4ig6M6Nq4d\n3VMHcbqcLckhG/4jngijpitzvtH4i/fxcm9t0p0CgYEAukb0W9DYuoG/OHTKlUIv\nKr0bxLv6rRk5g7OoKD5D+QiWlXQ2RXeBlXPPkR4+N/WuxxQsJ/zlOi2iKM0lB+jG\nwwPCo/WtXm5WNi4gSVZIeOtIZS+e/2QeStc/WPgVhJrZjQ118/ORaeDoVo77lkEd\nvBj5QAMZDCn/3qeOpGJ1NJo=\n-----END PRIVATE KEY-----\n",
"client_email": "pubsub-1@sanguine-rhythm-105020.iam.gserviceaccount.com",
"client_id": "117880612788657019000",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/pubsub-1%40sanguine-rhythm-105020.iam.gserviceaccount.com"
}
183 changes: 183 additions & 0 deletions examples/service_account/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Demonstrates the use of service accounts and the Google Cloud Pubsub API.
//!
//! Run this binary as .../service_account pub 'your message' in order to publish messages,
//! and as .../service_account sub in order to subscribe to those messages. This will look like the
//! following:
//!
//! ```
//! $ target/debug/service_account pub 'Hello oh wonderful world' &
//! $ target/debug/service_account sub
//! Published message #95491011619126
//! message <95491011619126> 'Hello oh wonderful world' at 2016-09-21T20:04:47.040Z
//! Published message #95491011620879
//! message <95491011620879> 'Hello oh wonderful world' at 2016-09-21T20:04:49.086Z
//! Published message #95491011622600
//! message <95491011622600> 'Hello oh wonderful world' at 2016-09-21T20:04:51.132Z
//! Published message #95491011624393
//! message <95491011624393> 'Hello oh wonderful world' at 2016-09-21T20:04:53.187Z
//! Published message #95491011626206
//! message <95491011626206> 'Hello oh wonderful world' at 2016-09-21T20:04:55.233Z
//!
//! Copyright (c) 2016 Google, Inc. (Lewin Bormann <lbo@spheniscida.de>)
//!
extern crate base64;
extern crate yup_oauth2 as oauth;
extern crate google_pubsub1 as pubsub;
extern crate hyper;

use std::env;
use std::time;
use std::thread;

use pubsub::{Topic, Subscription};

// The prefixes are important!
const SUBSCRIPTION_NAME: &'static str = "projects/sanguine-rhythm-105020/subscriptions/rust_authd_sub_1";
const TOPIC_NAME: &'static str = "projects/sanguine-rhythm-105020/topics/topic-01";

type PubsubMethods<'a> = pubsub::ProjectMethods<'a,
hyper::Client,
oauth::ServiceAccountAccess<hyper::Client>>;

// Verifies that the topic TOPIC_NAME exists, or creates it.
fn check_or_create_topic(methods: &PubsubMethods) -> Topic {
let result = methods.topics_get(TOPIC_NAME).doit();

if result.is_err() {
println!("Assuming topic doesn't exist; creating topic");
let topic = pubsub::Topic { name: Some(TOPIC_NAME.to_string()) };
let result = methods.topics_create(topic, TOPIC_NAME).doit().unwrap();
result.1
} else {
result.unwrap().1
}
}

fn check_or_create_subscription(methods: &PubsubMethods) -> Subscription {
// check if subscription exists
let result = methods.subscriptions_get(SUBSCRIPTION_NAME).doit();

if result.is_err() {
println!("Assuming subscription doesn't exist; creating subscription");
let sub = pubsub::Subscription {
topic: Some(TOPIC_NAME.to_string()),
ack_deadline_seconds: Some(30),
push_config: None,
name: Some(SUBSCRIPTION_NAME.to_string()),
};
let (_resp, sub) = methods.subscriptions_create(sub, SUBSCRIPTION_NAME).doit().unwrap();

sub
} else {
result.unwrap().1
}
}

fn ack_message(methods: &PubsubMethods, id: String) {
let request = pubsub::AcknowledgeRequest { ack_ids: Some(vec![id]) };
let result = methods.subscriptions_acknowledge(request, SUBSCRIPTION_NAME).doit();

match result {
Err(e) => {
// There's a JSON decode error happening, even on successful returns. :/
println!("Ack error: {:?}", e);
}
Ok(_) => (),
}
}

// Wait for new messages. Print and ack any new messages.
fn subscribe_wait(methods: &PubsubMethods) {
check_or_create_subscription(&methods);

let request = pubsub::PullRequest {
return_immediately: Some(false),
max_messages: Some(1),
};


loop {
let result = methods.subscriptions_pull(request.clone(), SUBSCRIPTION_NAME).doit();

match result {
Err(e) => {
println!("Pull error: {}", e);
}
Ok((_response, pullresponse)) => {
for msg in pullresponse.received_messages.unwrap_or(Vec::new()) {
let ack_id = msg.ack_id.unwrap_or(String::new());
let message = msg.message.unwrap_or(Default::default());
println!("message <{}> '{}' at {}",
message.message_id.unwrap_or(String::new()),
String::from_utf8(base64::decode(&message.data
.unwrap_or(String::new()))
.unwrap())
.unwrap(),
message.publish_time.unwrap_or(String::new()));

if ack_id != "" {
ack_message(methods, ack_id);
}
}
}
}
}
}

// Publish some message every 2 seconds.
fn publish_stuff(methods: &PubsubMethods, message: &str) {
check_or_create_topic(&methods);

let message = pubsub::PubsubMessage {
attributes: None,
// Base64 encoded!
data: Some(base64::encode(message.as_bytes())),
publish_time: None,
message_id: None,
};
let request = pubsub::PublishRequest { messages: Some(vec![message]) };


loop {
let result = methods.topics_publish(request.clone(), TOPIC_NAME).doit();

match result {
Err(e) => {
println!("Publish error: {}", e);
}
Ok((_response, pubresponse)) => {
for msg in pubresponse.message_ids.unwrap_or(Vec::new()) {
println!("Published message #{}", msg);
}
}
}

thread::sleep(time::Duration::new(2, 0));
}
}

// If called as '.../service_account pub', act as publisher; if called as '.../service_account
// sub', act as subscriber.
fn main() {
let client_secret = oauth::service_account_key_from_file(&"pubsub-auth.json".to_string())
.unwrap();
let mut access = oauth::ServiceAccountAccess::new(client_secret, hyper::Client::new());

use oauth::GetToken;
println!("{:?}",
access.token(&vec!["https://www.googleapis.com/auth/pubsub"]).unwrap());

let hub = pubsub::Pubsub::new(hyper::Client::new(), access);
let methods = hub.projects();

let mode = env::args().nth(1).unwrap_or(String::new());

if mode == "pub" {
let message = env::args().nth(2).unwrap_or("Hello World!".to_string());
publish_stuff(&methods, &message);
} else if mode == "sub" {
subscribe_wait(&methods);
} else {
println!("Please use either of 'pub' or 'sub' as first argument to this binary!");
}
}

0 comments on commit 4ba4353

Please sign in to comment.