Skip to content

Commit

Permalink
Merge lastest version of pegasus (#1376)
Browse files Browse the repository at this point in the history
* merge lastest version of pegasus
* fix shell format
* rm cargo.lock
* Trigger the CI.

Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
Co-authored-by: shirly121 <zxlmillie@163.com>
Co-authored-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
4 people committed Mar 22, 2022
1 parent 0bf6686 commit e851014
Show file tree
Hide file tree
Showing 109 changed files with 4,607 additions and 31,286 deletions.
3 changes: 1 addition & 2 deletions research/engine/pegasus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ members = [
"graph",
"pegasus",
"server",
"server-v0"
]

[profile.dev]
Expand Down Expand Up @@ -42,4 +41,4 @@ lto = true
panic = 'unwind'
incremental = false
codegen-units = 16
rpath = false
rpath = false
42 changes: 42 additions & 0 deletions research/engine/pegasus/benchmark/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[workspace]

[package]
name = "pegasus-benchmark"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pegasus_graph = { path = "../graph" }
pegasus = { path = "../pegasus" }
pegasus_common = { path = "../common" }
tonic = { version = "0.5", features = ["default", "compression"] }
prost = "0.8"
tokio = { version = "1.0", features = ["macros", "sync", "rt-multi-thread"] }
structopt = "0.3"
futures = "0.3.19"
log = "0.4"

[build-dependencies]
tonic-build = { version = "0.5", features = ["default", "compression"] }

[features]
default = []
gcip = []

[lib]
name = "pegasus_benchmark"
path = "src/lib.rs"

[[bin]]
name = "ssk"

[[bin]]
name = "spmsk"

[[bin]]
name = "amsk"

[[bin]]
name = "service"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM rust:1.55.0
WORKDIR /usr/src/pegasus/
COPY .. .
RUN rm -f Cargo.lock && \
cd ./khop && rm -f Cargo.lock && \
cd ./benchmark && rm -f Cargo.lock && \
cd .. && cp ./crates-io.config /usr/local/cargo/config && \
cd ./khop && cargo install --path . && rm -rf target
CMD ["pegasus-khop"]
cd ./benchmark && cargo install --path . && rm -rf target
CMD ["service"]
26 changes: 26 additions & 0 deletions research/engine/pegasus/benchmark/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=proto/clickhouse_grpc.proto");
codegen_inplace()
}

#[cfg(feature = "gcip")]
fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
let dir = "src/graph/storage/clickhouse/pb_gen";
if std::path::Path::new(&dir).exists() {
std::fs::remove_dir_all(&dir).unwrap();
}
std::fs::create_dir(&dir).unwrap();
tonic_build::configure()
.build_server(false)
.out_dir("src/graph/storage/clickhouse/pb_gen")
.compile(&["proto/clickhouse_grpc.proto"], &["proto"])?;
Ok(())
}

#[cfg(not(feature = "gcip"))]
fn codegen_inplace() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.compile(&["proto/clickhouse_grpc.proto"], &["proto"])?;
Ok(())
}
199 changes: 199 additions & 0 deletions research/engine/pegasus/benchmark/proto/clickhouse_grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// copy from https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/grpc_protos/clickhouse_grpc.proto

/* This file describes gRPC protocol supported in ClickHouse.
*
* To use this protocol a client should send one or more messages of the QueryInfo type
* and then receive one or more messages of the Result type.
* According to that the service provides four methods for that:
* ExecuteQuery(QueryInfo) returns (Result)
* ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result)
* ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result)
* ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result)
* It's up to the client to choose which method to use.
* For example, ExecuteQueryWithStreamInput() allows the client to add data multiple times
* while executing a query, which is suitable for inserting many rows.
*/

syntax = "proto3";

package clickhouse.grpc;

message NameAndType {
string name = 1;
string type = 2;
}

// Describes an external table - a table which will exists only while a query is executing.
message ExternalTable {
// Name of the table. If omitted, "_data" is used.
string name = 1;

// Columns of the table. Types are required, names can be omitted. If the names are omitted, "_1", "_2", ... is used.
repeated NameAndType columns = 2;

// Data to insert to the external table.
// If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used,
// then data for insertion to the same external table can be split between multiple QueryInfos.
bytes data = 3;

// Format of the data to insert to the external table.
string format = 4;

// Compression type used to compress `data`.
// Supported values: none, gzip(gz), deflate, brotli(br), lzma(xz), zstd(zst), lz4, bz2.
string compression_type = 6;

// Settings for executing that insertion, applied after QueryInfo.settings.
map<string, string> settings = 5;
}

enum CompressionAlgorithm {
NO_COMPRESSION = 0;
DEFLATE = 1;
GZIP = 2;
STREAM_GZIP = 3;
}

enum CompressionLevel {
COMPRESSION_NONE = 0;
COMPRESSION_LOW = 1;
COMPRESSION_MEDIUM = 2;
COMPRESSION_HIGH = 3;
}

message Compression {
CompressionAlgorithm algorithm = 1;
CompressionLevel level = 2;
}

// Information about a query which a client sends to a ClickHouse server.
// The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data.
// In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set.
message QueryInfo {
string query = 1;
string query_id = 2;
map<string, string> settings = 3;

// Default database.
string database = 4;

// Input data, used both as data for INSERT query and as data for the input() function.
bytes input_data = 5;

// Delimiter for input_data, inserted between input_data from adjacent QueryInfos.
bytes input_data_delimiter = 6;

// Default output format. If not specified, 'TabSeparated' is used.
string output_format = 7;

repeated ExternalTable external_tables = 8;

string user_name = 9;
string password = 10;
string quota = 11;

// Works exactly like sessions in the HTTP protocol.
string session_id = 12;
bool session_check = 13;
uint32 session_timeout = 14;

// Set `cancel` to true to stop executing the query.
bool cancel = 15;

// If true there will be at least one more QueryInfo in the input stream.
// `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used.
bool next_query_info = 16;

/// Controls how a ClickHouse server will compress query execution results before sending back to the client.
/// If not set the compression settings from the configuration file will be used.
Compression result_compression = 17;

// Compression type for `input_data`, `output_data`, `totals` and `extremes`.
// Supported compression types: none, gzip(gz), deflate, brotli(br), lzma(xz), zstd(zst), lz4, bz2.
// When used for `input_data` the client is responsible to compress data before putting it into `input_data`.
// When used for `output_data` or `totals` or `extremes` the client receives compressed data and should decompress it by itself.
// In the latter case consider to specify also `compression_level`.
string compression_type = 18;

// Compression level.
// WARNING: If it's not specified the compression level is set to zero by default which might be not the best choice for some compression types (see below).
// The compression level should be in the following range (the higher the number, the better the compression):
// none: compression level isn't used
// gzip: 0..9; 0 means no compression, 6 is recommended by default (compression level -1 also means 6)
// brotli: 0..11
// lzma: 0..9; 6 is recommended by default
// zstd: 1..22; 3 is recommended by default (compression level 0 also means 3)
// lz4: 0..16; values < 0 mean fast acceleration
// bz2: 1..9
int32 compression_level = 19;
}

enum LogsLevel {
LOG_NONE = 0;
LOG_FATAL = 1;
LOG_CRITICAL = 2;
LOG_ERROR = 3;
LOG_WARNING = 4;
LOG_NOTICE = 5;
LOG_INFORMATION = 6;
LOG_DEBUG = 7;
LOG_TRACE = 8;
}

message LogEntry {
uint32 time = 1;
uint32 time_microseconds = 2;
uint64 thread_id = 3;
string query_id = 4;
LogsLevel level = 5;
string source = 6;
string text = 7;
}

message Progress {
uint64 read_rows = 1;
uint64 read_bytes = 2;
uint64 total_rows_to_read = 3;
uint64 written_rows = 4;
uint64 written_bytes = 5;
}

message Stats {
uint64 rows = 1;
uint64 blocks = 2;
uint64 allocated_bytes = 3;
bool applied_limit = 4;
uint64 rows_before_limit = 5;
}

message Exception {
int32 code = 1;
string name = 2;
string display_text = 3;
string stack_trace = 4;
}

// Result of execution of a query which is sent back by the ClickHouse server to the client.
message Result {
// Output of the query, represented in the `output_format` or in a format specified in `query`.
bytes output = 1;
bytes totals = 2;
bytes extremes = 3;

repeated LogEntry logs = 4;
Progress progress = 5;
Stats stats = 6;

// Set by the ClickHouse server if there was an exception thrown while executing.
Exception exception = 7;

// Set by the ClickHouse server if executing was cancelled by the `cancel` field in QueryInfo.
bool cancelled = 8;
}

service ClickHouse {
rpc ExecuteQuery(QueryInfo) returns (Result) {}
rpc ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) {}
rpc ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) {}
rpc ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) {}
}
9 changes: 9 additions & 0 deletions research/engine/pegasus/benchmark/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

# Stable
edition = "2018"
unstable_features = true
chain_width = 48
max_width = 108
use_small_heuristics = "Max"
fn_args_layout = "Compressed"
group_imports = "StdExternalCrate"
Loading

0 comments on commit e851014

Please sign in to comment.