Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/operator_helm_build_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ version: ${VERSION}" > "${helm_path}/Chart.yaml"
cp "${repo_path}/k8s/operator/crd/base/px.dev_viziers.yaml" "${helm_path}/crds/vizier_crd.yaml"

# Updates templates with Helm-specific template functions.
sed -i '1c{{ if (or (eq (.Values.deployOLM | toString) "true") (and (not (eq (.Values.deployOLM | toString) "false")) (eq (len (lookup "operators.coreos.com/v1" "OperatorGroup" "" "").items) 0))) }}' "${repo_path}/k8s/operator/helm/templates/00_olm.yaml"
#shellcheck disable=SC2016
sed -i '1c{{- $lookupLen := 0 -}}{{- $opLookup := (lookup "operators.coreos.com/v1" "OperatorGroup" "" "").items -}}{{if $opLookup }}{{ $lookupLen = len $opLookup }}{{ end }}\n{{ if (or (eq (.Values.deployOLM | toString) "true") (and (not (eq (.Values.deployOLM | toString) "false")) (eq $lookupLen 0))) }}' "${repo_path}/k8s/operator/helm/templates/00_olm.yaml"

# Fetch all of the current charts in GCS, because generating the index needs all pre-existing tar versions present.
mkdir -p "${tmp_dir}/${helm_gcs_bucket}"
Expand Down
29 changes: 29 additions & 0 deletions k8s/cloud/testing/db_sidecar.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sc
spec:
template:
spec:
containers:
- name: cloudsql-proxy
image: gcr.io/cloudsql-docker/gce-proxy:1.14
command: ["/cloud_sql_proxy",
"-instances=pl-pixies:us-west1:pixie-cloud-testing-db-pg13=tcp:5432",
"-ip_address_types=PRIVATE",
"-credential_file=/secrets/cloudsql/db_service_account.json"]
# [START cloudsql_security_context]
securityContext:
runAsUser: 2 # non-root user
allowPrivilegeEscalation: false
# [END cloudsql_security_context]
volumeMounts:
- name: pl-db-secrets
mountPath: /secrets/cloudsql
readOnly: true
# [END proxy_container]
volumes:
- name: pl-db-secrets
secret:
secretName: pl-db-secrets
15 changes: 13 additions & 2 deletions src/common/metrics/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <memory>

#include <prometheus/registry.h>

#include "src/common/metrics/metrics.h"

namespace {
std::unique_ptr<prometheus::Registry> g_registry_instance;

void ResetMetricsRegistry() { g_registry_instance = std::make_unique<prometheus::Registry>(); }
} // namespace

prometheus::Registry& GetMetricsRegistry() {
static prometheus::Registry registry;
return registry;
if (g_registry_instance == nullptr) {
ResetMetricsRegistry();
}
return *g_registry_instance;
}

void TestOnlyResetMetricsRegistry() { ResetMetricsRegistry(); }
4 changes: 4 additions & 0 deletions src/common/metrics/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
// Returns the global metrics registry;
prometheus::Registry& GetMetricsRegistry();

// Resets the Metrics registry, removing all of its contained metrics.
// This function should only be called by testing code.
void TestOnlyResetMetricsRegistry();

// A convenience wrapper to return a counter with the specified name and help message.
inline auto& BuildCounter(const std::string& name, const std::string& help_message) {
return prometheus::BuildCounter()
Expand Down
34 changes: 34 additions & 0 deletions src/common/system/proc_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,40 @@ Status ProcParser::ParseProcPIDStatus(int32_t pid, ProcessStatus* out) const {
return ParseFromKeyValueFile(fpath, field_name_to_offset_map, reinterpret_cast<uint8_t*>(out));
}

StatusOr<size_t> ProcParser::ParseProcPIDPss(const int32_t pid) const {
// We will parse a line that looks like this:
// Pss: 807 kB
// And return the value 807*1024 (or an error status).
constexpr uint32_t kPssKeyIdx = 0;
constexpr uint32_t kPssValIdx = 1;
constexpr uint32_t kUnitsIdx = 2;

const std::string fpath = absl::Substitute("$0/$1/smaps_rollup", proc_base_path_, pid);

std::ifstream ifs;
ifs.open(fpath);
if (!ifs) {
return error::Internal("Failed to open file $0", fpath);
}

std::string line;
while (std::getline(ifs, line)) {
if (absl::StartsWith(line, "Pss:")) {
const std::vector<std::string_view> toks = absl::StrSplit(line, ' ', absl::SkipWhitespace());
DCHECK_EQ(toks.size(), 3);
DCHECK_EQ(toks[kPssKeyIdx], "Pss:");
DCHECK_EQ(toks[kUnitsIdx], "kB");
size_t pss_kb;
if (absl::SimpleAtoi(toks[kPssValIdx], &pss_kb)) {
return 1024 * pss_kb;
} else {
return error::Internal(R"(SimpleAtoi error for "$0", pid=$1.)", toks[kPssValIdx], pid);
}
}
}
return error::Internal("Could not find pss for pid $0.", pid);
}

Status ProcParser::ParseProcPIDSMaps(int32_t pid, std::vector<ProcessSMaps>* out) const {
CHECK(out != nullptr);
std::string fpath = absl::Substitute("$0/$1/smaps", proc_base_path_, pid);
Expand Down
6 changes: 6 additions & 0 deletions src/common/system/proc_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ class ProcParser {
*/
Status ParseProcPIDSMaps(int32_t pid, std::vector<ProcessSMaps>* out) const;

/**
* Parses /proc/<pid>/smaps_rollup for "PSS" the proportional set size memory usage.
* @return status of parsing or the value of PSS.
*/
StatusOr<size_t> ParseProcPIDPss(const int32_t pid) const;

/**
* Reads and returns the /proc/<pid>/fd/<fd> file descriptor link.
*
Expand Down
5 changes: 5 additions & 0 deletions src/common/system/proc_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ TEST_F(ProcParserTest, ParsePidStat) {
EXPECT_EQ(2577 * bytes_per_page_, stats.rss_bytes);
}

TEST_F(ProcParserTest, ParsePSS) {
const size_t pss_bytes = parser_->ParseProcPIDPss(123).ConsumeValueOrDie();
EXPECT_EQ(pss_bytes, 5936128);
}

TEST_F(ProcParserTest, ParseStat) {
ProcParser::SystemStats stats;
PL_CHECK_OK(parser_->ParseProcStat(&stats));
Expand Down
21 changes: 21 additions & 0 deletions src/common/system/testdata/proc/123/smaps_rollup
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
be400000000-7ffc0fd9e000 ---p 00000000 00:00 0 [rollup]
Rss: 57908 kB
Pss: 5797 kB
Pss_Anon: 5098 kB
Pss_File: 686 kB
Pss_Shmem: 11 kB
Shared_Clean: 42852 kB
Shared_Dirty: 10128 kB
Private_Clean: 0 kB
Private_Dirty: 4928 kB
Referenced: 48172 kB
Anonymous: 14932 kB
LazyFree: 0 kB
AnonHugePages: 0 kB
ShmemPmdMapped: 0 kB
FilePmdMapped: 0 kB
Shared_Hugetlb: 0 kB
Private_Hugetlb: 0 kB
Swap: 80 kB
SwapPss: 0 kB
Locked: 0 kB
10 changes: 2 additions & 8 deletions src/shared/metadata/cgroup_metadata_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
*
* SPDX-License-Identifier: Apache-2.0
*/

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <string>
#include <vector>
#include "src/shared/metadata/cgroup_metadata_reader.h"

#include <absl/container/flat_hash_set.h>

#include "src/common/testing/testing.h"
#include "src/shared/metadata/cgroup_metadata_reader.h"

namespace px {
namespace md {
Expand All @@ -39,7 +34,6 @@ class CGroupMetadataReaderTest : public ::testing::Test {
}

std::unique_ptr<CGroupMetadataReader> md_reader_;
std::filesystem::path sysfs_path_;
};

TEST_F(CGroupMetadataReaderTest, read_pid_list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ void ProcessStatsConnector::TransferProcessStatsTable(ConnectorContext* ctx,
continue;
}

const size_t pss_bytes = proc_parser_->ParseProcPIDPss(pid).ConsumeValueOr(0);

DataTable::RecordBuilder<&kProcessStatsTable> r(data_table, timestamp);
// TODO(oazizi): Enable version below, once rest of the agent supports tabletization.
// DataTable::RecordBuilder<&kProcessStatsTable> r(data_table, upid.value(), timestamp);
Expand All @@ -87,6 +89,7 @@ void ProcessStatsConnector::TransferProcessStatsTable(ConnectorContext* ctx,
r.Append<r.ColIndex("num_threads")>(stats.num_threads);
r.Append<r.ColIndex("vsize_bytes")>(stats.vsize_bytes);
r.Append<r.ColIndex("rss_bytes")>(stats.rss_bytes);
r.Append<r.ColIndex("pss_bytes")>(pss_bytes);
r.Append<r.ColIndex("rchar_bytes")>(stats.rchar_bytes);
r.Append<r.ColIndex("wchar_bytes")>(stats.wchar_bytes);
r.Append<r.ColIndex("read_bytes")>(stats.read_bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ static constexpr DataElement kProcessStatsElements[] = {
types::DataType::INT64, types::SemanticType::ST_NONE, types::PatternType::METRIC_GAUGE},
{"vsize_bytes", "Virtual memory size in bytes of the process",
types::DataType::INT64, types::SemanticType::ST_BYTES, types::PatternType::METRIC_GAUGE},
{"rss_bytes", "Resident memory size in bytes of the process",
{"rss_bytes", "Logical resident memory size in bytes of the process",
types::DataType::INT64, types::SemanticType::ST_BYTES, types::PatternType::METRIC_GAUGE},
{"pss_bytes", "Proportional set size, memory consumed by the process, i.e. usage after accounting for page sharing",
types::DataType::INT64, types::SemanticType::ST_BYTES, types::PatternType::METRIC_GAUGE},
{"rchar_bytes", "IO reads in bytes of the process",
types::DataType::INT64, types::SemanticType::ST_BYTES, types::PatternType::METRIC_COUNTER},
Expand Down
1 change: 1 addition & 0 deletions src/stirling/source_connectors/socket_tracer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pl_cc_library(
deps = [
"//src/common/exec:cc_library",
"//src/common/grpcutils:cc_library",
"//src/common/metrics:cc_library",
"//src/stirling/bpf_tools:cc_library",
"//src/stirling/core:cc_library",
"//src/stirling/obj_tools:cc_library",
Expand Down
2 changes: 2 additions & 0 deletions src/stirling/source_connectors/socket_tracer/conn_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ bool ConnTracker::SetProtocol(traffic_protocol_t protocol, std::string_view reas
CONN_TRACE(2) << absl::Substitute("Protocol changed: $0->$1, reason=[$2]",
magic_enum::enum_name(old_protocol),
magic_enum::enum_name(protocol), reason);
send_data_.set_protocol(protocol);
recv_data_.set_protocol(protocol);
return true;
}

Expand Down
14 changes: 14 additions & 0 deletions src/stirling/source_connectors/socket_tracer/data_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>

#include "src/stirling/source_connectors/socket_tracer/data_stream.h"
#include "src/stirling/source_connectors/socket_tracer/metrics.h"
#include "src/stirling/source_connectors/socket_tracer/protocols/types.h"

DEFINE_uint32(datastream_buffer_spike_size,
Expand Down Expand Up @@ -106,6 +107,8 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) {
parse_result.state = ParseState::kNeedsMoreData;
parse_result.end_position = 0;

size_t frame_bytes = 0;

while (keep_processing && !data_buffer_.empty()) {
size_t contiguous_bytes = data_buffer_.Head().size();

Expand Down Expand Up @@ -134,6 +137,8 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) {
stat_valid_frames_ += parse_result.frame_positions.size();
stat_invalid_frames_ += parse_result.invalid_frames;
stat_raw_data_gaps_ += keep_processing;

frame_bytes += parse_result.frame_bytes;
}

// Check to see if we are blocked on parsing.
Expand All @@ -158,6 +163,15 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) {
UpdateLastProgressTime();
}

// Keep track of "lost" data in prometheus. "lost" data includes any gaps in the data stream as
// well as data that wasn't able to be successfully parsed.
ssize_t num_bytes_advanced = data_buffer_.position() - last_processed_pos_;
if (num_bytes_advanced > 0 && static_cast<size_t>(num_bytes_advanced) > frame_bytes) {
size_t bytes_lost = num_bytes_advanced - frame_bytes;
SocketTracerMetrics::GetProtocolMetrics(protocol_).data_loss_bytes.Increment(bytes_lost);
}
last_processed_pos_ = data_buffer_.position();

last_parse_state_ = parse_result.state;

// has_new_events_ should be false for the next transfer cycle.
Expand Down
8 changes: 8 additions & 0 deletions src/stirling/source_connectors/socket_tracer/data_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class DataStream : NotCopyMoveable {
}
}

void set_protocol(traffic_protocol_t protocol) { protocol_ = protocol; }

/**
* Cleanup frames that are parsed from the BPF events, when the condition is right.
*/
Expand Down Expand Up @@ -308,6 +310,12 @@ class DataStream : NotCopyMoveable {
// A copy of the parse state from the last call to ProcessToRecords().
ParseState last_parse_state_ = ParseState::kInvalid;

// Keep track of the byte position after the last processed position, in order to measure data
// loss.
size_t last_processed_pos_ = 0;
// Keep track of the protocol for this DataStream so that data loss can be reported per protocol.
traffic_protocol_t protocol_ = traffic_protocol_t::kProtocolUnknown;

template <typename TFrameType>
friend std::string DebugString(const DataStream& d, std::string_view prefix);
};
Expand Down
Loading