Skip to content

Commit

Permalink
fix: repair telemetry processing (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgante authored Mar 23, 2024
1 parent ff58de2 commit 73e866f
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 63 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ tempfile = "3.1"
similar = "2.2.1"
dialoguer = "0.10.4"
console = "0.15.7"
futures = "0.3.28"
rayon = "1.8.0"
dashmap = "5.5.3"
clap-markdown = { git = "https://github.com/getgrit/clap-markdown", optional = true }
Expand Down
102 changes: 75 additions & 27 deletions crates/cli/src/analytics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use anyhow::Result;
use clap::Args;
use lazy_static::lazy_static;
Expand All @@ -19,6 +20,26 @@ pub enum AnalyticsEventName {
Errored,
}

impl fmt::Display for AnalyticsEventName {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AnalyticsEventName::Invoked => write!(f, "command-invoked"),
AnalyticsEventName::Completed => write!(f, "command-completed"),
AnalyticsEventName::Errored => write!(f, "command-errored"),
}
}
}

impl<'a> From<&'a AnalyticsEvent<'a>> for AnalyticsEventName {
fn from(event: &'a AnalyticsEvent) -> Self {
match event {
AnalyticsEvent::Invoked(_) => AnalyticsEventName::Invoked,
AnalyticsEvent::Completed(_) => AnalyticsEventName::Completed,
AnalyticsEvent::Errored(_) => AnalyticsEventName::Errored,
}
}
}

#[derive(Debug, Serialize, Clone)]
#[serde(untagged)]
pub enum AnalyticsEvent<'a> {
Expand Down Expand Up @@ -107,7 +128,7 @@ lazy_static! {

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
struct SegmentPayload {
struct SegmentPayload<'a> {
user_id: Option<String>,
///
/// Anonymous ID is used, as we don't
Expand All @@ -117,33 +138,24 @@ struct SegmentPayload {
/// https://segment.com/docs/connections/spec/identify/#anonymous-id
///
anonymous_id: Uuid,
event: AnalyticsEventName,
event: &'a AnalyticsEventName,
properties: AnalyticsProperties,
}

async fn track_event_segment(
analytics_event_name: AnalyticsEventName,
analytics_properties: AnalyticsProperties,
pub async fn track_event_line(
line: &str,
command: String,
args: Vec<String>,
installation_id: Uuid,
user_id: Option<String>,
) -> Result<()> {
let payload = SegmentPayload {
user_id,
anonymous_id: installation_id,
event: analytics_event_name,
properties: analytics_properties,
};
let (name, json) = line
.split_once('\t')
.ok_or(anyhow::anyhow!("Invalid line, no tab found"))?;
let event = serde_json::from_str::<AnalyticsEventName>(name).context("Invalid event name")?;
let data = serde_json::from_str::<serde_json::Value>(json).context("Invalid event data")?;

//
// https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track
//
reqwest::Client::new()
.post("https://api.segment.io/v1/track")
.basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None)
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await?;
track_event(event, data, command, args, installation_id, user_id).await;

Ok(())
}
Expand Down Expand Up @@ -180,13 +192,39 @@ pub async fn track_event(
data: Some(analytics_event_data),
};

let _ = tokio::task::spawn(track_event_segment(
analytics_event_name,
properties,
installation_id,
let payload = SegmentPayload {
user_id,
))
.await;
anonymous_id: installation_id,
event: &analytics_event_name,
properties,
};

//
// https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#track
//
match reqwest::Client::new()
.post("https://api.segment.io/v1/track")
.basic_auth::<&String, &str>(&SEGMENT_WRITE_KEY, None)
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await
{
Ok(response) => {
if !response.status().is_success() {
eprintln!(
"Failed to send event {}: {}",
analytics_event_name,
response.status()
);
}
}
Err(e) => {
eprintln!("Failed to send event {}: {:#}", analytics_event_name, e);
}
}

println!("Successfully sent event {}", analytics_event_name);
}

pub fn is_telemetry_disabled() -> bool {
Expand All @@ -195,3 +233,13 @@ pub fn is_telemetry_disabled() -> bool {
.parse::<bool>()
.unwrap_or(false)
}

/// By default, telemetry is sent in the background so the main process can exit quickly.
/// If this environment variable is set to true, telemetry will be sent in the foreground.
/// This is useful for debugging telemetry issues.
pub fn is_telemetry_foregrounded() -> bool {
env::var("GRIT_TELEMETRY_FOREGROUND")
.unwrap_or_else(|_| "false".to_owned())
.parse::<bool>()
.unwrap_or(false)
}
64 changes: 43 additions & 21 deletions crates/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ pub(crate) mod workflows_list;
pub(crate) mod docgen;

use crate::{
analytics::{is_telemetry_disabled, AnalyticsEvent, CompletedEvent, ErroredEvent},
analytics::{
is_telemetry_disabled, is_telemetry_foregrounded, AnalyticsEvent, AnalyticsEventName,
CompletedEvent, ErroredEvent,
},
flags::{GlobalFormatFlags, OutputFormat},
updater::Updater,
};
use anyhow::{Error, Result};
use anyhow::{Result};
use apply::ApplyArgs;
use auth::{Auth, AuthCommands};
use check::CheckArg;
Expand All @@ -49,17 +52,17 @@ use indicatif_log_bridge::LogWrapper;
use init::InitArgs;
use install::InstallArgs;
use list::ListArgs;
use log::{debug, LevelFilter};
use log::{LevelFilter};
use lsp::LspArgs;
use marzano_messenger::emit::ApplyDetails;
use parse::ParseArgs;
use patterns::{PatternCommands, Patterns};
use plumbing::PlumbingArgs;
use serde::Serialize;
use std::fmt;
use std::io::Write;
use std::process::{ChildStdin, Command, Stdio};
use std::time::Instant;
use std::{fmt, process::Child};
use tracing::instrument;
use version::VersionArgs;

Expand Down Expand Up @@ -186,7 +189,7 @@ fn maybe_spawn_analytics_worker(
command: &Commands,
args: &[String],
updater: &Updater,
) -> Result<Option<ChildStdin>> {
) -> Result<Option<Child>> {
if is_telemetry_disabled() {
return Ok(None);
}
Expand Down Expand Up @@ -217,33 +220,37 @@ fn maybe_spawn_analytics_worker(
.arg(command.to_string())
.arg("--args")
.arg(args.join(" "))
.stdout(Stdio::null())
.stderr(Stdio::null())
.stdin(Stdio::piped());

let stdin = cmd.spawn()?.stdin;

match stdin {
Some(stdin) => Ok(Some(stdin)),
None => Err(Error::msg(
"Failed to open stdin of analytics worker process",
)),
if !is_telemetry_foregrounded() {
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
}

let child = cmd.spawn()?;

Ok(Some(child))
}

fn write_analytics_event(
analytics_worker: Option<&mut ChildStdin>,
analytics_event: &AnalyticsEvent,
) {
let serialized_name = serde_json::to_string(&analytics_event);
let serialized_name = serde_json::to_string(&AnalyticsEventName::from(analytics_event));
let serialized_event = serde_json::to_string(&analytics_event);
match (analytics_worker, serialized_name, serialized_event) {
(Some(analytics_worker), Ok(serialized_name), Ok(serialized_event)) => {
let data = format!("{}\t{}\n", serialized_name, serialized_event);
let _ = analytics_worker.write_all(data.as_bytes());
let res = analytics_worker.write_all(data.as_bytes());
if let Err(e) = res {
println!("Failed to write to analytics worker: {:?}", e);
}
}
(None, _, _) => {
// No analytics worker to send event to, do nothing
}
(worker, name_err, event_err) => {
debug!(
println!(
"Failed to serialize analytics event: {:?} {:?} {:?}",
worker, name_err, event_err
);
Expand All @@ -261,14 +268,15 @@ pub async fn run_command() -> Result<()> {
let mut updater = Updater::from_current_bin().await?;
updater.dump().await?;

let mut analytics_worker =
let mut analytics_child =
match maybe_spawn_analytics_worker(&app.command, &analytics_args, &updater) {
Err(_e) => {
println!("Failed to start the analytics worker process");
// We failed to start the analytics worker process
None
}
Ok(None) => None,
Ok(Some(analytics_worker)) => Some(analytics_worker),
Ok(Some(child)) => Some(child),
};

let log_level = app.format_flags.log_level.unwrap_or(match &app.command {
Expand Down Expand Up @@ -303,7 +311,7 @@ pub async fn run_command() -> Result<()> {
let start = Instant::now();

write_analytics_event(
analytics_worker.as_mut(),
analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()),
&AnalyticsEvent::from_cmd(&app.command),
);

Expand Down Expand Up @@ -359,7 +367,21 @@ pub async fn run_command() -> Result<()> {
Err(_) => AnalyticsEvent::Errored(ErroredEvent::from_elapsed(elapsed)),
};

write_analytics_event(analytics_worker.as_mut(), &final_analytics_event);
write_analytics_event(
analytics_child.as_mut().map(|c| c.stdin.as_mut().unwrap()),
&final_analytics_event,
);

// If we are in the foreground, wait for the analytics worker to finish
if is_telemetry_foregrounded() {
if let Some(mut child) = analytics_child {
println!("Waiting for analytics worker to finish");
let res = child.wait();
if let Err(e) = res {
println!("Failed to wait for analytics worker: {:?}", e);
}
}
}

res
}
26 changes: 13 additions & 13 deletions crates/cli/src/commands/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use std::io::{stdin, Read};
use std::path::Path;
use std::path::PathBuf;

use crate::analytics::AnalyticsEventName;
use crate::analytics::{track_event_line};
use crate::flags::GlobalFormatFlags;
use crate::lister::list_applyables;
use crate::resolver::{get_grit_files_from, resolve_from, Source};
use crate::utils::is_pattern_name;
use futures::future::join_all;

use super::super::analytics::{track_event, AnalyticsArgs};

use super::super::analytics::{AnalyticsArgs};
use super::apply_pattern::{run_apply_pattern, ApplyPatternArgs};
use super::check::{run_check, CheckArg};
use super::init::{init_config_from_cwd, init_global_grit_modules};
Expand Down Expand Up @@ -173,20 +173,20 @@ pub(crate) async fn run_plumbing(
}
PlumbingArgs::Analytics { args, shared_args } => {
let buffer = read_input(&shared_args)?;
let events_to_send = buffer.lines().filter_map(|line| {
// Split line in name and JSON
let (name, json) = line.split_once('\t')?;

Some(track_event(
serde_json::from_str::<AnalyticsEventName>(name).ok()?,
serde_json::from_str::<serde_json::Value>(json).ok()?,
for line in buffer.lines() {
let result = track_event_line(
line,
args.command.clone(),
args.args.clone(),
args.installation_id,
args.user_id.clone(),
))
});
join_all(events_to_send).await;
)
.await;
if let Err(e) = result {
eprintln!("Error when processing {}: {:#}", line, e);
}
}

Ok(())
}
PlumbingArgs::Check { args, shared_args } => {
Expand Down
28 changes: 28 additions & 0 deletions crates/cli_bin/tests/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::common::{get_fixture, get_test_cmd};
use anyhow::Result;

mod common;

#[test]
fn confirm_telemetry_flush() -> Result<()> {
let (_temp_dir, temp_fixtures_root) = get_fixture("grit_modules", true)?;

let mut cmd = get_test_cmd()?;
cmd.env("GRIT_TELEMETRY_DISABLED", "false");
cmd.env("GRIT_TELEMETRY_FOREGROUND", "true");
cmd.arg("doctor").current_dir(temp_fixtures_root);

let output = cmd.output()?;
println!("output: {:?}", String::from_utf8(output.stdout.clone())?);

assert!(
output.status.success(),
"Command didn't finish successfully"
);

// Confirm output flushed
let output_str = String::from_utf8(output.stdout.clone())?;
assert!(output_str.contains("Successfully sent event command-completed"));

Ok(())
}

0 comments on commit 73e866f

Please sign in to comment.