Skip to content
Permalink
Browse files
Support legacy API of HealthManger (#3818)
Co-authored-by: Nicholas Nezis <nicholas.nezis@gmail.com>
  • Loading branch information
thinker0 and nicknezis committed Apr 14, 2022
1 parent fd6a5fa commit 7db7c24733bd7e66ecfe704ea65f864d1fff4adc
Showing 7 changed files with 27 additions and 13 deletions.
@@ -45,6 +45,7 @@
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.NetworkUtils;

import static org.apache.heron.common.basics.TypeUtils.getDouble;
import static org.apache.heron.healthmgr.HealthPolicyConfig.CONF_TOPOLOGY_NAME;

public class MetricsCacheMetricsProvider implements MetricsProvider {
@@ -117,7 +118,7 @@ Collection<Measurement> parse(
instanceId,
metricName,
Instant.ofEpochSecond(mi.getStart()),
Double.parseDouble(value));
getDouble(value));
metricsData.add(measurement);
}
// case 2
@@ -127,7 +128,7 @@ Collection<Measurement> parse(
instanceId,
metricName,
startTime,
Double.parseDouble(im.getValue()));
getDouble(im.getValue()));
metricsData.add(measurement);
}
}
@@ -43,6 +43,7 @@

import net.minidev.json.JSONArray;

import static org.apache.heron.common.basics.TypeUtils.getDouble;
import static org.apache.heron.healthmgr.HealthPolicyConfig.CONF_METRICS_SOURCE_URL;
import static org.apache.heron.healthmgr.HealthPolicyConfig.CONF_TOPOLOGY_NAME;

@@ -105,14 +106,14 @@ private Collection<Measurement> parse(String response, String component, String
}

for (String instanceName : metricsMap.keySet()) {
Map<String, String> tmpValues = (Map<String, String>) metricsMap.get(instanceName);
Map<String, Object> tmpValues = (Map<String, Object>) metricsMap.get(instanceName);
for (String timeStamp : tmpValues.keySet()) {
Measurement measurement = new Measurement(
component,
instanceName,
metric,
Instant.ofEpochSecond(Long.parseLong(timeStamp)),
Double.parseDouble(tmpValues.get(timeStamp)));
getDouble(tmpValues.get(timeStamp)));
metricsData.add(measurement);
}
}
@@ -73,6 +73,7 @@ async def get_metrics_timeline(

# Form and send the http request.
url = f"http://{tmanager.host}:{tmanager.stats_port}/stats"
Log.debug(f"Making HTTP call to fetch metrics: {url}")
async with httpx.AsyncClient() as client:
result = await client.post(url, data=request_parameters.SerializeToString())

@@ -119,22 +119,28 @@ async def get_metrics( # pylint: disable=too-many-arguments
)


@router.get("/metricstimeline", response_model=metricstimeline.MetricsTimeline,
deprecated=True)
@router.get("/metrics/timeline", response_model=metricstimeline.MetricsTimeline)
async def get_metrics_timeline( # pylint: disable=too-many-arguments
cluster: str,
environ: str,
component: str,
start_time: int,
end_time: int,
start_time: int = Query(..., alias="starttime"),
end_time: int = Query(..., alias="endtime"),
role: Optional[str] = None,
topology_name: str = Query(..., alias="topology"),
metric_names: Optional[List[str]] = Query(None, alias="metricname"),
instances: Optional[List[str]] = Query(None, alias="instance"),
):
"""Return metrics over the given interval."""
"""
'/metricstimeline' 0.20.5 below.
'/metrics/timeline' 0.20.5 above.
Return metrics over the given interval.
"""
if start_time > end_time:
raise BadRequest("start_time > end_time")
topology = state.tracker.get_toplogy(cluster, role, environ, topology_name)
topology = state.tracker.get_topology(cluster, role, environ, topology_name)
return await metricstimeline.get_metrics_timeline(
topology.tmanager, component, metric_names, instances, start_time, end_time
)
@@ -157,7 +163,8 @@ class MetricsQueryResponse(BaseModel): # pylint: disable=too-few-public-methods
..., description="list of timeline point objects",
)


@router.get("/metricsquery", response_model=MetricsQueryResponse,
deprecated=True)
@router.get("/metrics/query", response_model=MetricsQueryResponse)
async def get_metrics_query( # pylint: disable=too-many-arguments
cluster: str,
@@ -168,7 +175,11 @@ async def get_metrics_query( # pylint: disable=too-many-arguments
end_time: int = Query(..., alias="endtime"),
topology_name: str = Query(..., alias="topology"),
) -> MetricsQueryResponse:
"""Run a metrics query against a particular toplogy."""
"""
'/metricsquery' 0.20.5 below.
'/metrics/query' 0.20.5 above.
Run a metrics query against a particular topology.
"""
topology = state.tracker.get_topology(cluster, role, environ, topology_name)
metrics = await TManagerQuery(state.tracker).execute_query(
topology.tmanager, query, start_time, end_time
@@ -139,7 +139,7 @@ if __name__ == "__main__":
# piece together the topology
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
# submit the toplogy
# submit the topology
builder.build_and_submit()
```

@@ -140,7 +140,7 @@ if __name__ == "__main__":
# piece together the topology
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
# submit the toplogy
# submit the topology
builder.build_and_submit()
```

@@ -140,7 +140,7 @@ if __name__ == "__main__":
# piece together the topology
word_spout = builder.add_spout("word_spout", WordSpout, par=2)
count_bolt = builder.add_bolt("count_bolt", CountBolt, par=2, inputs={word_spout: Grouping.fields("word")})
# submit the toplogy
# submit the topology
builder.build_and_submit()
```

0 comments on commit 7db7c24

Please sign in to comment.