Skip to content

Commit

Permalink
feat: Add support for JsonOutputParser (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmigloz committed Apr 30, 2024
1 parent 3dadfd1 commit c6508f0
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 3 deletions.
130 changes: 130 additions & 0 deletions packages/langchain_core/lib/src/output_parsers/json.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import 'package:collection/collection.dart';
import 'package:rxdart/rxdart.dart';

import '../runnables/runnable.dart';
import 'base.dart';
import 'string.dart';
import 'types.dart';
import 'utils.dart';

/// {@template json_output_parser}
/// Output parser that returns the output of the previous [Runnable] as a
/// JSON [Map].
///
/// - [ParserInput] - The type of the input to the parser.
///
/// Example:
/// ```dart
/// final model = ChatOpenAI(
/// apiKey: openAiApiKey,
/// defaultOptions: ChatOpenAIOptions(
/// responseFormat: ChatOpenAIResponseFormat(
/// type: ChatOpenAIResponseFormatType.jsonObject,
/// ),
/// ),
/// );
/// final parser = JsonOutputParser<ChatResult>();
/// final chain = model.pipe(parser);
/// final stream = chain.stream(
/// PromptValue.string(
/// 'Output a list of the countries france, spain and japan and their '
/// 'populations in JSON format. Use a dict with an outer key of '
/// '"countries" which contains a list of countries. '
/// 'Each country should have the key "name" and "population"',
/// ),
/// );
/// await stream.forEach((final chunk) => print('$chunk|'));
/// // {}|
/// // {countries: []}|
/// // {countries: [{name: France}]}|
/// // {countries: [{name: France, population: 67076000}, {}]}|
/// // {countries: [{name: France, population: 67076000}, {name: Spain}]}|
/// // {countries: [{name: France, population: 67076000}, {name: Spain, population: 46723749}]}|
/// // {countries: [{name: France, population: 67076000}, {name: Spain, population: 46723749}, {name: Japan}]}|
/// // {countries: [{name: France, population: 67076000}, {name: Spain, population: 46723749}, {name: Japan, population: 126476461}]}|
/// ```
/// {@endtemplate}
class JsonOutputParser<ParserInput extends Object?> extends BaseOutputParser<
ParserInput, OutputParserOptions, Map<String, dynamic>> {
/// {@macro json_output_parser}
JsonOutputParser({
this.reduceOutputStream = false,
}) : _stringOutputParser = StringOutputParser<ParserInput>(),
super(defaultOptions: const OutputParserOptions());

/// When invoking this parser with [Runnable.stream], every item from the
/// input stream will be parsed and emitted by default.
///
/// If [reduceOutputStream] is set to `true`, the parser will reduce the
/// output stream into a single String and emit it as a single item.
///
/// Visual example:
/// - reduceOutputStream = false
/// 'A', 'B', 'C' -> 'A', 'B', 'C'
/// - reduceOutputStream = true
/// 'A', 'B', 'C' -> 'ABC'
final bool reduceOutputStream;

final StringOutputParser<ParserInput> _stringOutputParser;

String _lastInputStr = '';
Map<String, dynamic> _lastOutputMap = {};

@override
Future<Map<String, dynamic>> invoke(
final ParserInput input, {
final OutputParserOptions? options,
}) async {
final inputStr = await _stringOutputParser.invoke(input, options: options);
return _parse(inputStr);
}

@override
Stream<Map<String, dynamic>> stream(
final ParserInput input, {
final OutputParserOptions? options,
}) async* {
yield await _parseStream(input, options: options);
}

@override
Stream<Map<String, dynamic>> streamFromInputStream(
final Stream<ParserInput> inputStream, {
final OutputParserOptions? options,
}) async* {
if (reduceOutputStream) {
await inputStream.forEach(
(final input) => _parseStream(input, options: options),
);
yield _lastOutputMap;
_clear();
} else {
yield* super
.streamFromInputStream(inputStream, options: options)
.distinct(const DeepCollectionEquality().equals)
.doOnCancel(_clear);
}
}

Map<String, dynamic> _parse(
final String input, {
Map<String, dynamic> fallback = const {},
}) {
final result = parsePartialJson(input);
return result ?? fallback;
}

Future<Map<String, dynamic>> _parseStream(
final ParserInput input, {
final OutputParserOptions? options,
}) async {
final inputStr = await _stringOutputParser.invoke(input, options: options);
_lastInputStr = '$_lastInputStr$inputStr';
return _lastOutputMap = _parse(_lastInputStr, fallback: _lastOutputMap);
}

void _clear() {
_lastInputStr = '';
_lastOutputMap = {};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export 'base.dart';
export 'exceptions.dart';
export 'functions.dart';
export 'json.dart';
export 'string.dart';
export 'types.dart';
4 changes: 1 addition & 3 deletions packages/langchain_core/lib/src/output_parsers/string.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class StringOutputParser<ParserInput extends Object?>
/// input stream will be parsed and emitted by default.
///
/// If [reduceOutputStream] is set to `true`, the parser will reduce the
/// output stream into a single String and emit it as a single item. This is
/// useful when the next [Runnable] in a chain expects a single String as
/// input.
/// output stream into a single String and emit it as a single item.
///
/// Visual example:
/// - reduceOutputStream = false
Expand Down
83 changes: 83 additions & 0 deletions packages/langchain_core/test/output_parsers/json_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import 'package:langchain_core/output_parsers.dart';
import 'package:test/test.dart';

void main() {
group('JsonOutputParser tests', () {
test('Valid JSON input should be parsed correctly', () async {
final parser = JsonOutputParser<String>();
final result = await parser.invoke('{"name": "John", "age": 30}');
expect(result, equals({'name': 'John', 'age': 30}));
});

test('Whitespace in JSON input should be handled correctly', () async {
final parser = JsonOutputParser<String>();
final result = await parser.invoke(' {"name": "John Doe"} ');
expect(result, equals({'name': 'John Doe'}));
});

test('Clearing the parser should reset the last result', () async {
final parser = JsonOutputParser<String>();
final result1 = await parser.invoke('{"name": "John", "age": 30}');
expect(result1, equals({'name': 'John', 'age': 30}));
final result2 = await parser.invoke('{"name": "Ana", "age": 40}');
expect(result2, equals({'name': 'Ana', 'age': 40}));
});

test('Valid JSON stream should be parsed correctly', () async {
final parser = JsonOutputParser<String>();
final inputStream = Stream.fromIterable(
['{"name": "John"', ', "age": 30}'],
);
final result = await parser.streamFromInputStream(inputStream).toList();
expect(
result,
equals(
[
{'name': 'John'},
{'name': 'John', 'age': 30},
],
),
);
});

test('Calling stream twice should parse the latest input', () async {
final parser = JsonOutputParser<String>();
final inputStream1 = Stream.fromIterable(['{"name": "John", "age": 30}']);
final result1 = await parser.streamFromInputStream(inputStream1).toList();
expect(
result1,
equals(
[
{'name': 'John', 'age': 30},
],
),
);
final inputStream2 = Stream.fromIterable(['{"name": "Ana", "age": 40}']);
final result2 = await parser.streamFromInputStream(inputStream2).toList();
expect(
result2,
equals(
[
{'name': 'Ana', 'age': 40},
],
),
);
});

test('Test reduceOutputStream', () async {
final parser = JsonOutputParser<String>(reduceOutputStream: true);
final inputStream = Stream.fromIterable(
['{"name": "John"', ', "age": 30}'],
);
final result = await parser.streamFromInputStream(inputStream).toList();
expect(
result,
equals(
[
{'name': 'John', 'age': 30},
],
),
);
});
});
}

0 comments on commit c6508f0

Please sign in to comment.