Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port to async/await #52

Merged
merged 4 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,138 changes: 819 additions & 319 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test: off

environment:
CLI_VERSION: '5.0.165'
RUST_TOOLCHAIN: 'stable'
RUST_TOOLCHAIN: 'nightly-2019-09-10'
DOCKER_TOKEN:
secure: QKr2YEuliXdFKe3jN7w97w==
DOCKER_USER:
Expand Down
3 changes: 2 additions & 1 deletion ci/build-deps.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ function Start-SeqEnvironment($protocol) {
--network sqelf-test `
-e SEQ_ADDRESS=http://sqelf-test-seq:5341 `
-e GELF_ADDRESS="${protocol}://0.0.0.0:12201" `
-e GELF_ENABLE_DIAGNOSTICS="True" `
-itd `
-p "12202:${portArg}" `
sqelf-ci:latest
Expand Down Expand Up @@ -224,7 +225,7 @@ function Invoke-TestApp($protocol) {
if ($LASTEXITCODE) { exit 1 }

# Give sqelf enough time to batch and send
Start-Sleep -Seconds 2
Start-Sleep -Seconds 5
}

function Check-ClefOutput {
Expand Down
33 changes: 28 additions & 5 deletions ci/linux-x64/build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,34 @@ Push-Location "$PSScriptRoot/../../"
function Invoke-SmokeTest($protocol) {
Write-BeginStep $MYINVOCATION

Start-SeqEnvironment($protocol)
Invoke-TestApp($protocol)
Check-SqelfLogs
Check-SeqLogs
Check-ClefOutput
$finished = $false
$retries = 0

do {
try {
Start-SeqEnvironment($protocol)
Invoke-TestApp($protocol)
Check-SqelfLogs
Check-SeqLogs
Check-ClefOutput

$finished = $true
}
catch {
Stop-SeqEnvironment

if ($retries -gt 3) {
exit 1
}
else {
$retries = $retries + 1
Write-Host "Retrying (attempt $retries)"
}
}

}
while ($finished -eq $false)

Stop-SeqEnvironment
}

Expand Down
2 changes: 1 addition & 1 deletion ci/linux-x64/setup.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

RequiredRustToolchain="stable"
RequiredRustToolchain=$RUST_TOOLCHAIN

curl https://sh.rustup.rs -sSf | sh -s -- --default-host x86_64-unknown-linux-gnu --default-toolchain $RequiredRustToolchain -y

Expand Down
2 changes: 1 addition & 1 deletion ci/win-x64/setup.ps1
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
$ErrorActionPreference = "Stop"

$RequiredRustToolchain = "stable"
$RequiredRustToolchain = $env:RUST_TOOLCHAIN

Invoke-WebRequest -OutFile ./rustup-init.exe -Uri https://win.rustup.rs
$ErrorActionPreference = "Continue"
Expand Down
1 change: 1 addition & 0 deletions rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nightly-2019-09-10
2 changes: 2 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
imports_indent = "Block"
imports_layout = "Vertical"
17 changes: 11 additions & 6 deletions sqelf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ authors = ["Datalust"]
edition = "2018"
license = "Apache-2.0"

[dependencies.futures]
version = "0.1"
[dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["nightly", "async-await"]

[dependencies.tokio]
version = "0.1"
[dependencies.pin-utils]
version = "0.1.0-alpha.4"

[dependencies.tokio-signal]
version = "0.2"
[dependencies.tokio]
version = "0.2.0-alpha.4"
features = ["signal"]

[dependencies.bytes]
version = "0.4"
Expand Down Expand Up @@ -44,3 +46,6 @@ version = "1"
[dependencies.chrono]
version = "0.4"
features = ["serde"]

[dependencies.lazy_static]
version = "1"
15 changes: 12 additions & 3 deletions sqelf/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use std::{env, str::FromStr};
use std::{
env,
str::FromStr,
};

use crate::{diagnostics, process, receive, server, Error};
use crate::{
diagnostics,
process,
receive,
server,
Error,
};

#[derive(Debug, Default, Clone)]
pub struct Config {
Expand Down Expand Up @@ -35,7 +44,7 @@ impl Config {
}
}

pub(crate) fn is_seq_app() -> bool {
pub fn is_seq_app() -> bool {
env::var("SEQ_APP_ID").is_ok()
}

Expand Down
153 changes: 73 additions & 80 deletions sqelf/src/diagnostics.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
use crate::error::{err_msg, Error};
use chrono::{DateTime, Utc};
use crate::error::{
err_msg,
Error,
};
use chrono::{
DateTime,
Utc,
};
use std::{
collections::HashMap,
fmt::Display,
ops::Drop,
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{
AtomicUsize,
Ordering,
},
mpsc,
Mutex,
},
thread,
time::Duration,
};

pub(crate) static MIN_LEVEL: MinLevel = MinLevel(AtomicUsize::new(0));

lazy_static! {
static ref DIAGNOSTICS: Mutex<Option<Diagnostics>> = Mutex::new(None);
}

/**
Diagnostics configuration.
*/
Expand All @@ -37,33 +53,17 @@ impl Default for Config {
}
}

pub(crate) struct Diagnostics {
metrics: Option<(mpsc::Sender<()>, thread::JoinHandle<()>)>,
}

impl Diagnostics {
pub fn stop_metrics(&mut self) -> Result<(), Error> {
if let Some((tx, handle)) = self.metrics.take() {
tx.send(())?;

handle
.join()
.map_err(|_| err_msg("failed to join diagnostics handle"))?;
}

Ok(())
}
}
/**
Initialize process-wide diagnostics.
*/
pub fn init(config: Config) {
let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics");

impl Drop for Diagnostics {
fn drop(&mut self) {
if let Some((tx, _)) = self.metrics.take() {
let _ = tx.send(());
}
if diagnostics.is_some() {
drop(diagnostics);
panic!("GELF diagnostics have already been initialized");
}
}

pub(crate) fn init(config: Config) -> Diagnostics {
MIN_LEVEL.set(config.min_level);

// Only set up metrics if the minimum level is Debug
Expand All @@ -75,8 +75,11 @@ pub(crate) fn init(config: Config) -> Diagnostics {
let metrics_timeout = Duration::from_millis(config.metrics_interval_ms);
let handle = thread::spawn(move || loop {
match rx.recv_timeout(metrics_timeout) {
Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => return,
_ => {
Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => {
emit_metrics();
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
emit_metrics();
}
}
Expand All @@ -87,7 +90,46 @@ pub(crate) fn init(config: Config) -> Diagnostics {
None
};

Diagnostics { metrics }
*diagnostics = Some(Diagnostics { metrics });
}

/**
Stop process-wide diagnostics.
*/
pub fn stop() -> Result<(), Error> {
let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics");

if let Some(mut diagnostics) = diagnostics.take() {
diagnostics.stop_metrics()?;
}

Ok(())
}

struct Diagnostics {
metrics: Option<(mpsc::Sender<()>, thread::JoinHandle<()>)>,
}

impl Diagnostics {
fn stop_metrics(&mut self) -> Result<(), Error> {
if let Some((tx, handle)) = self.metrics.take() {
tx.send(())?;

handle
.join()
.map_err(|_| err_msg("failed to join diagnostics handle"))?;
}

Ok(())
}
}

impl Drop for Diagnostics {
fn drop(&mut self) {
if let Some((tx, _)) = self.metrics.take() {
let _ = tx.send(());
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -177,7 +219,7 @@ pub fn emit_err(error: &impl Display, message_template: &'static str) {
}
}

pub fn emit_metrics() {
fn emit_metrics() {
if MIN_LEVEL.includes(Level::Debug) {
#[derive(Serialize)]
struct EmitMetrics {
Expand Down Expand Up @@ -214,56 +256,7 @@ pub fn emit_metrics() {
}
}

/// For use with `map_err`
pub(crate) fn emit_err_abort<TInner>(message_template: &'static str) -> impl Fn(TInner) -> ()
where
TInner: Display,
{
emit_err_abort_with(message_template, || ())
}

/// For use with `map_err`
pub(crate) fn emit_err_abort_with<TInner, TError>(
message_template: &'static str,
err: impl Fn() -> TError,
) -> impl Fn(TInner) -> TError
where
TInner: Display,
{
move |e| {
emit_err(&e, message_template);

err()
}
}

/// For use with `or_else`
pub(crate) fn emit_err_continue<TInner, TOuter>(
message_template: &'static str,
) -> impl Fn(TInner) -> Result<(), TOuter>
where
TInner: Display,
{
emit_err_continue_with(message_template, || ())
}

/// For use with `or_else`
pub(crate) fn emit_err_continue_with<TInner, TOk, TOuter>(
message_template: &'static str,
ok: impl Fn() -> TOk,
) -> impl Fn(TInner) -> Result<TOk, TOuter>
where
TInner: Display,
{
move |err| {
emit_err(&err, message_template);

Ok(ok())
}
}

pub(crate) struct MinLevel(AtomicUsize);
pub(crate) static MIN_LEVEL: MinLevel = MinLevel(AtomicUsize::new(0));

impl MinLevel {
fn set(&self, min: Level) {
Expand Down
Loading