Skip to content

Commit

Permalink
Add logging. (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy committed Aug 12, 2022
1 parent c6ec104 commit b8a4a18
Show file tree
Hide file tree
Showing 25 changed files with 1,046 additions and 430 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ tokio-stream = { version = "0.1.8", features = ["net"] }
name = "trace_context"
required-features = ["mock"]

[[test]]
name = "logging"
required-features = ["mock"]

[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
55 changes: 42 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ and core concepts to keep best compatibility and performance.

All concepts are from the official SkyWalking definitions.

## Span
## Tracing

### Span

Span is an important and common concept in distributed tracing system. Learn Span from Google Dapper Paper. For better
performance, we extend the span into 3 kinds.
Expand All @@ -34,19 +36,29 @@ Tag and Log are similar attributes of the span.
- Log is heavier than tag, with one timestamp and multiple key:value pairs. Log represents an event, typically an error
happens.

## TracingContext
### TracingContext

TracingContext is the context of the tracing process. Span should only be created through context, and be archived into
the context after the span finished.

## Logging

### LogRecord

LogRecord is the simple builder for the LogData, which is the Log format of Skywalking.

# Example

```rust, no_run
use skywalking::{reporter::grpc::GrpcReporter, trace::tracer::Tracer};
use skywalking::{
logging::{logger::Logger, record::{LogRecord, RecordType}},
reporter::grpc::GrpcReporter,
trace::tracer::Tracer,
};
use std::error::Error;
use tokio::signal;
async fn handle_request(tracer: Tracer) {
async fn handle_request(tracer: Tracer, logger: Logger) {
let mut ctx = tracer.create_trace_context();
{
Expand All @@ -59,10 +71,20 @@ async fn handle_request(tracer: Tracer) {
{
// Generates an Exit Span when executing an RPC.
let _span2 = ctx.create_exit_span("op2", "remote_peer");
let span2 = ctx.create_exit_span("op2", "remote_peer");
// Something...
// Do logging.
logger.log(
LogRecord::new()
.add_tag("level", "INFO")
.with_tracing_context(&ctx)
.with_span(&span2)
.record_type(RecordType::Text)
.content("Something...")
);
// Auto close span2 when dropped.
}
Expand All @@ -74,17 +96,24 @@ async fn handle_request(tracer: Tracer) {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
let tracer = Tracer::new("service", "instance", reporter);
tokio::spawn(handle_request(tracer.clone()));
// Start to report.
tracer
.reporting(async move {
let _ = signal::ctrl_c().await.unwrap();
// Spawn the reporting in background, with listening the graceful shutdown signal.
let handle = reporter
.reporting()
.await
.with_graceful_shutdown(async move {
signal::ctrl_c().await.expect("failed to listen for event");
})
.await?;
.spawn();
let tracer = Tracer::new("service", "instance", reporter.clone());
let logger = Logger::new("service", "instance", reporter);
handle_request(tracer, logger).await;
handle.await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
version: "3.9"
services:
collector:
image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:5acb890f225ca37ee60675ce3e330545e23e3cbc
image: ghcr.io/apache/skywalking-agent-test-tool/mock-collector:f4f5ef22b1df623464772816bb6b42ba611444ff
ports:
- "19876:19876"
- "12800:12800"
Expand Down
49 changes: 47 additions & 2 deletions e2e/data/expected_context.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
#
segmentItems:
- segmentSize: gt 1
- segmentSize: ge 2
segments:
- segmentId: not null
spans:
Expand Down Expand Up @@ -76,7 +76,7 @@ segmentItems:
spanType: Entry
startTime: gt 0
serviceName: producer
- segmentSize: 1
- segmentSize: ge 1
segments:
- segmentId: not null
spans:
Expand All @@ -101,3 +101,48 @@ segmentItems:
spanType: Entry
startTime: gt 0
serviceName: consumer

logItems:
- logSize: ge 2
logs:
- body:
content:
json: '{"message": "handle ping"}'
type: ''
endpoint: /ping
layer: ''
tags:
data:
- key: level
value: DEBUG
timestamp: gt 0
- body:
content:
text: do http request
type: ''
endpoint: /ping
layer: ''
tags:
data:
- key: level
value: INFO
timestamp: gt 0
traceContext:
spanId: 1
traceId: not null
traceSegmentId: not null
serviceName: producer
- logSize: ge 1
logs:
- body:
content:
json: '{"message": "handle pong"}'
type: ''
endpoint: /pong
layer: ''
tags:
data:
- key: level
value: DEBUG
timestamp: gt 0
serviceName: consumer
20 changes: 0 additions & 20 deletions e2e/rust-toolchain.toml

This file was deleted.

48 changes: 38 additions & 10 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use hyper::{
Body, Client, Method, Request, Response, Server, StatusCode,
};
use skywalking::{
logging::{
logger::{self, Logger},
record::{LogRecord, RecordType},
},
reporter::grpc::GrpcReporter,
trace::{
propagation::{
Expand All @@ -30,7 +34,7 @@ use skywalking::{
tracer::{self, Tracer},
},
};
use std::{convert::Infallible, error::Error, future, net::SocketAddr};
use std::{convert::Infallible, error::Error, net::SocketAddr};
use structopt::StructOpt;

static NOT_FOUND_MSG: &str = "not found";
Expand All @@ -40,10 +44,18 @@ async fn handle_ping(
_req: Request<Body>,
client: Client<HttpConnector>,
) -> Result<Response<Body>, Infallible> {
logger::log(
LogRecord::new()
.add_tag("level", "DEBUG")
.endpoint("/ping")
.record_type(RecordType::Json)
.content(r#"{"message": "handle ping"}"#),
);

let mut context = tracer::create_trace_context();
let _span = context.create_entry_span("/ping");
{
let _span2 = context.create_exit_span("/pong", "consumer:8082");
let span2 = context.create_exit_span("/pong", "consumer:8082");
let header = encode_propagation(&context, "/pong", "consumer:8082");
let req = Request::builder()
.method(Method::GET)
Expand All @@ -52,6 +64,15 @@ async fn handle_ping(
.body(Body::from(""))
.unwrap();

logger::log(
LogRecord::new()
.add_tag("level", "INFO")
.endpoint("/ping")
.with_tracing_context(&context)
.with_span(&span2)
.record_type(RecordType::Text)
.content("do http request"),
);
client.request(req).await.unwrap();
}
{
Expand Down Expand Up @@ -106,6 +127,14 @@ async fn run_producer_service(host: [u8; 4]) {
}

async fn handle_pong(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
logger::log(
LogRecord::new()
.add_tag("level", "DEBUG")
.endpoint("/pong")
.record_type(RecordType::Json)
.content(r#"{"message": "handle pong"}"#),
);

let ctx = decode_propagation(
_req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY]
.to_str()
Expand Down Expand Up @@ -154,20 +183,19 @@ struct Opt {
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
let reporter = GrpcReporter::connect("http://collector:19876").await?;
let handle = reporter.reporting().await.spawn();

let handle = if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter));
let handle = tracer::reporting(future::pending());
if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
logger::set_global_logger(Logger::new("consumer", "node_0", reporter));
run_consumer_service([0, 0, 0, 0]).await;
handle
} else if opt.mode == "producer" {
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter));
let handle = tracer::reporting(future::pending());
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone()));
logger::set_global_logger(Logger::new("producer", "node_0", reporter));
run_producer_service([0, 0, 0, 0]).await;
handle
} else {
unreachable!()
};
}

handle.await?;

Expand Down
44 changes: 44 additions & 0 deletions examples/simple_log_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

use skywalking::{
logging::{logger::Logger, record::LogRecord},
reporter::grpc::GrpcReporter,
};
use std::{error::Error, future};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;

// Do logging.
let logger = Logger::new("service", "instance", reporter.clone());
logger.log(LogRecord::new().content("something to log"));

// Start reporting and quit immediately when have completed the existing
// collection.
reporter
.reporting()
.await
.with_graceful_shutdown(future::ready(()))
.start()
.await?;

Ok(())
}
24 changes: 16 additions & 8 deletions examples/simple_trace_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,25 @@ async fn handle_request(tracer: Tracer) {

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
let tracer = Tracer::new("service", "instance", reporter);

tokio::spawn(handle_request(tracer.clone()));

// Start to report.
tracer
.reporting(async move {
let _ = signal::ctrl_c().await.unwrap();
// Spawn the reporting in background, with listening the graceful shutdown
// signal.
let handle = reporter
.reporting()
.await
.with_graceful_shutdown(async move {
signal::ctrl_c().await.expect("failed to listen for event");
})
.await?;
.spawn();

// Do tracing.
let tracer = Tracer::new("service", "instance", reporter);
handle_request(tracer).await;

// Wait the reporting to quit.
handle.await?;

Ok(())
}
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
//

pub mod random_generator;
pub(crate) mod system_time;
File renamed without changes.

0 comments on commit b8a4a18

Please sign in to comment.