Skip to content

Commit

Permalink
Use zones to capture print calls within a call and log them correctly (
Browse files Browse the repository at this point in the history
…#43)

Towards #41

Refactored run to be more simple and allow for unit testing
  • Loading branch information
kevmoo committed Dec 4, 2020
1 parent 17acad5 commit ed329da
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 53 deletions.
31 changes: 30 additions & 1 deletion functions_framework/lib/serve.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import 'package:io/io.dart';
import 'package:shelf/shelf.dart';

import 'src/bad_configuration.dart';
import 'src/cloud_metadata.dart';
import 'src/function_config.dart';
import 'src/logging.dart';
import 'src/run.dart';

/// If there is an invalid configuration, [BadConfigurationException] will be
Expand Down Expand Up @@ -48,5 +50,32 @@ Future<void> _serve(List<String> args, Map<String, Handler> handlers) async {
);
}

await run(config.port, handler);
final projectId = await CloudMetadata.projectId();
final loggingMiddleware =
projectId == null ? logRequests() : cloudLoggingMiddleware(projectId);

final completer = Completer<bool>.sync();

// sigIntSub is copied below to avoid a race condition - ignoring this lint
// ignore: cancel_subscriptions
StreamSubscription sigIntSub, sigTermSub;

Future<void> signalHandler(ProcessSignal signal) async {
print('Received signal $signal - closing');

final subCopy = sigIntSub;
if (subCopy != null) {
sigIntSub = null;
await subCopy.cancel();
sigIntSub = null;
await sigTermSub.cancel();
sigTermSub = null;
completer.complete(true);
}
}

sigIntSub = ProcessSignal.sigint.watch().listen(signalHandler);
sigTermSub = ProcessSignal.sigterm.watch().listen(signalHandler);

await run(config.port, handler, completer.future, loggingMiddleware);
}
51 changes: 31 additions & 20 deletions functions_framework/lib/src/logging.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,44 @@
// Please see the AUTHORS file or details. Use of this source code is
// governed by a BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';

import 'package:shelf/shelf.dart';
import 'package:stack_trace/stack_trace.dart';

import 'constants.dart';

/// If [projectid] is provided, assume we're running on Google Cloud and return
/// [Middleware] that logs errors using Google Cloud structured logs.
///
/// Otherwise, returns [Middleware] that prints requests and errors to `stdout`.
Middleware loggingMiddleware({String projectid}) {
if (projectid == null) {
return logRequests();
}

/// Return [Middleware] that logs errors using Google Cloud structured logs.
Middleware cloudLoggingMiddleware(String projectid) {
Handler hostedLoggingMiddleware(Handler innerHandler) => (request) async {
// Add log correlation to nest all log messages beneath request log in
// Log Viewer.
final traceHeader = request.headers[cloudTraceContextHeader];

String traceValue() =>
'projects/$projectid/traces/${traceHeader.split('/')[0]}';

try {
// including the extra `await` and assignment here to make sure async
// errors are caught within this try block
final response = await innerHandler(request);
// TODO: GoogleCloudPlatform/functions-framework-dart#41 look into
// using zone error handling, too. Need to debug this.
final response = await Zone.current.fork(
specification: ZoneSpecification(
print: (self, parent, zone, line) {
final logContent = {
'message': line,
'severity': 'INFO',
// 'logging.googleapis.com/labels': { }
if (traceHeader != null)
'logging.googleapis.com/trace': traceValue(),
};

// Serialize to a JSON string and output to parent zone.
parent.print(self, jsonEncode(logContent));
},
),
).runUnary(innerHandler, request);

return response;
} catch (error, stackTrace) {
if (error is HijackException) rethrow;
Expand All @@ -33,24 +50,18 @@ Middleware loggingMiddleware({String projectid}) {
final chain = stackTrace == null
? Chain.current()
: Chain.forTrace(stackTrace)
.foldFrames(
(frame) => frame.isCore || frame.package == 'shelf')
.foldFrames((frame) => frame.isCore)
.terse;

final stackFrame = _frameFromChain(chain);

// Add log correlation to nest all log messages beneath request log in
// Log Viewer.
final traceHeader = request.headers[cloudTraceContextHeader];

// https://cloud.google.com/logging/docs/agent/configuration#special-fields
final logContent = {
'message': '$error\n$chain',
'severity': 'ERROR',
// 'logging.googleapis.com/labels': { }
if (traceHeader != null)
'logging.googleapis.com/trace':
'projects/$projectid/traces/${traceHeader.split('/')[0]}',
'logging.googleapis.com/trace': traceValue(),
if (stackFrame != null)
'logging.googleapis.com/sourceLocation':
_sourceLocation(stackFrame),
Expand Down
41 changes: 10 additions & 31 deletions functions_framework/lib/src/run.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,26 @@ import 'dart:io';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;

import 'cloud_metadata.dart';
import 'logging.dart';

Future<void> run(int port, Handler handler) async {
final projectId = await CloudMetadata.projectId();

Future<void> run(
int port,
Handler handler,
Future<bool> shutdownSignal,
Middleware loggingMiddleware,
) async {
final pipeline = const Pipeline()
.addMiddleware(loggingMiddleware(projectid: projectId))
.addMiddleware(loggingMiddleware)
.addMiddleware(_forbiddenAssetMiddleware)
.addHandler(handler);

var server = await shelf_io.serve(
final server = await shelf_io.serve(
pipeline,
InternetAddress.anyIPv4,
port,
);
print('Listening on :${server.port}');

final completer = Completer<void>.sync();

StreamSubscription sigIntSub, sigTermSub;

Future<void> signalHandler(ProcessSignal signal) async {
print('Received signal $signal - closing');

final serverCopy = server;
if (serverCopy != null) {
server = null;
await serverCopy.close(force: true);
await sigIntSub.cancel();
sigIntSub = null;
await sigTermSub.cancel();
sigTermSub = null;
completer.complete();
}
}

sigIntSub = ProcessSignal.sigint.watch().listen(signalHandler);
sigTermSub = ProcessSignal.sigterm.watch().listen(signalHandler);

await completer.future;
final force = await shutdownSignal;
await server.close(force: force);
}

const _forbiddenAssets = {'robots.txt', 'favicon.ico'};
Expand Down
7 changes: 7 additions & 0 deletions test/hello/lib/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ Future<Response> handleGet(Request request) async {
throw Exception('An error was forced by requesting "$urlPath"');
}

if (urlPath.startsWith('print')) {
for (var segment in request.url.pathSegments) {
print(segment);
}
return Response.ok('Printing: $urlPath');
}

return Response.ok('Hello, World!');
}

Expand Down
168 changes: 168 additions & 0 deletions test/hello/test/cloud_behavior_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

@Timeout(Duration(seconds: 3))
import 'dart:async';
import 'dart:convert';

import 'package:functions_framework/src/constants.dart';
import 'package:functions_framework/src/logging.dart';
import 'package:functions_framework/src/run.dart';
import 'package:hello_world_function_test/app.dart';
import 'package:http/http.dart';
import 'package:test/test.dart';

const _projectId = 'test_project_id';

void main() {
final lines = <String>[];

void expectLines(Object matcher) {
try {
expect(lines, matcher);
} finally {
lines.clear();
}
}

Completer<bool> completionSignal;
Future<void> runFuture;
int port;
Map<String, String> _headers;
var count = 0;
String traceStart;

setUp(() async {
lines.clear();
completionSignal = Completer<bool>.sync();

runFuture = runZoned(
() => run(
0,
handleGet,
completionSignal.future,
cloudLoggingMiddleware(_projectId),
),
zoneSpecification: ZoneSpecification(
print: (_, __, ___, line) => lines.add(line),
),
);

// wait for the server to start!
await Future.value();

final listeningLine = lines.single;
lines.clear();
final split = listeningLine.split(':');
expect(split.first, 'Listening on ');
port = int.parse(split.last);

count++;
traceStart = 'trace_start$count';

_headers = {cloudTraceContextHeader: '$traceStart/trace_end'};
});

tearDown(() async {
completionSignal.complete(false);

await runFuture;
});

Future<void> _get(
String path, {
int expectedStatusCode = 200,
}) async {
final response = await get(
'http://localhost:$port/$path',
headers: _headers,
);

expect(response.statusCode, expectedStatusCode);
}

test('root', () async {
await _get('');
expectLines(isEmpty);
});

test('info', () async {
await _get('info');
expectLines(isEmpty);
});

test('print', () async {
await _get('print/something');
expect(lines, hasLength(2));

void matchEntry(String entry, String message) {
final map = jsonDecode(entry) as Map<String, dynamic>;

expect(
map,
{
'message': message,
'severity': 'INFO',
'logging.googleapis.com/trace':
'projects/test_project_id/traces/trace_start3'
},
);
}

matchEntry(lines[0], 'print');
matchEntry(lines[1], 'something');

lines.clear();
});

test('error', () async {
await _get('error', expectedStatusCode: 500);

expect(lines, hasLength(1));
final entry = lines.single;
final map = jsonDecode(entry) as Map<String, dynamic>;

expect(map, hasLength(4));
expect(map, containsPair('severity', 'ERROR'));
expect(
map,
containsPair('message', startsWith('Exception: An error was forced')),
);
expect(
map,
containsPair(
'logging.googleapis.com/trace',
'projects/$_projectId/traces/$traceStart',
),
);

final sourceLocation =
map['logging.googleapis.com/sourceLocation'] as Map<String, dynamic>;

expect(
sourceLocation,
containsPair('file', 'package:hello_world_function_test/app.dart'),
);
expect(
sourceLocation,
containsPair(
'line',
isA<String>(), // spec says this should be a String
),
);
expect(sourceLocation, containsPair('function', 'handleGet'));

lines.clear();
});
}

0 comments on commit ed329da

Please sign in to comment.