Skip to content
22 changes: 13 additions & 9 deletions lib/src/registry/data_operation_registry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,12 @@ class DataOperationRegistry {
),
'country': (c, uid, f, s, p) async {
final usage = f?['usage'] as String?;
if (usage != null && usage.isNotEmpty) {
// For 'country' model with 'usage' filter, delegate to CountryService.
// Sorting and pagination are not supported for this specialized query.
final name = f?['name'] as String?;

// If either 'usage' or 'name' filter is present, delegate to CountryService.
// Sorting and pagination are handled by CountryService for these specialized queries.
if ((usage != null && usage.isNotEmpty) ||
(name != null && name.isNotEmpty)) {
final countryService = c.read<CountryService>();
final countries = await countryService.getCountries(f);
return PaginatedResponse<Country>(
Expand All @@ -142,13 +145,14 @@ class DataOperationRegistry {
hasMore: false, // No more items as it's a complete filtered set
);
} else {
// For standard requests, use the repository which supports pagination/sorting.
// For standard requests without specialized filters, use the repository
// which supports pagination/sorting.
return c.read<DataRepository<Country>>().readAll(
userId: uid,
filter: f,
sort: s,
pagination: p,
);
userId: uid,
filter: f,
sort: s,
pagination: p,
);
}
},
'language': (c, uid, f, s, p) => c
Expand Down
206 changes: 144 additions & 62 deletions lib/src/services/country_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,42 +48,60 @@ class CountryService {
static const Duration _cacheDuration = Duration(hours: 1);

// In-memory caches for frequently accessed lists with time-based invalidation.
_CacheEntry<List<Country>>? _cachedEventCountries;
_CacheEntry<List<Country>>? _cachedHeadquarterCountries;
final Map<String, _CacheEntry<List<Country>>> _cachedEventCountries = {};
final Map<String, _CacheEntry<List<Country>>> _cachedHeadquarterCountries =
{};

// Futures to hold in-flight aggregation requests to prevent cache stampedes.
Future<List<Country>>? _eventCountriesFuture;
Future<List<Country>>? _headquarterCountriesFuture;
final Map<String, Future<List<Country>>> _eventCountriesFutures = {};
final Map<String, Future<List<Country>>> _headquarterCountriesFutures = {};

/// Retrieves a list of countries based on the provided filter.
///
/// Supports filtering by 'usage' to get countries that are either
/// 'eventCountry' in headlines or 'headquarters' in sources.
/// If no specific usage filter is provided, it returns all active countries.
/// It also supports filtering by 'name' (full or partial match).
///
/// - [filter]: An optional map containing query parameters.
/// Expected keys:
/// - `'usage'`: String, can be 'eventCountry' or 'headquarters'.
/// - `'name'`: String, a full or partial country name for search.
///
/// Throws [BadRequestException] if an unsupported usage filter is provided.
/// Throws [OperationFailedException] for internal errors during data fetch.
Future<List<Country>> getCountries(Map<String, dynamic>? filter) async {
_log.info('Fetching countries with filter: $filter');

final usage = filter?['usage'] as String?;
final name = filter?['name'] as String?;

Map<String, dynamic>? nameFilter;
if (name != null && name.isNotEmpty) {
// Create a case-insensitive regex filter for the name.
nameFilter = {r'$regex': name, r'$options': 'i'};
}

if (usage == null || usage.isEmpty) {
_log.fine('No usage filter provided. Fetching all active countries.');
return _getAllCountries();
_log.fine(
'No usage filter provided. Fetching all active countries '
'with nameFilter: $nameFilter.',
);
return _getAllCountries(nameFilter: nameFilter);
}

switch (usage) {
case 'eventCountry':
_log.fine('Fetching countries used as event countries in headlines.');
return _getEventCountries();
_log.fine(
'Fetching countries used as event countries in headlines '
'with nameFilter: $nameFilter.',
);
return _getEventCountries(nameFilter: nameFilter);
case 'headquarters':
_log.fine('Fetching countries used as headquarters in sources.');
return _getHeadquarterCountries();
_log.fine(
'Fetching countries used as headquarters in sources '
'with nameFilter: $nameFilter.',
);
return _getHeadquarterCountries(nameFilter: nameFilter);
default:
_log.warning('Unsupported country usage filter: "$usage"');
throw BadRequestException(
Expand All @@ -94,15 +112,30 @@ class CountryService {
}

/// Fetches all active countries from the repository.
Future<List<Country>> _getAllCountries() async {
_log.finer('Retrieving all active countries from repository.');
///
/// - [nameFilter]: An optional map containing a regex filter for the country name.
Future<List<Country>> _getAllCountries({
Map<String, dynamic>? nameFilter,
}) async {
_log.finer(
'Retrieving all active countries from repository with nameFilter: $nameFilter.',
);
try {
final response = await _countryRepository.readAll(
filter: {'status': ContentStatus.active.name},
);
final combinedFilter = <String, dynamic>{
'status': ContentStatus.active.name,
};
if (nameFilter != null && nameFilter.isNotEmpty) {
combinedFilter.addAll({'name': nameFilter});
}

final response = await _countryRepository.readAll(filter: combinedFilter);
return response.items;
} catch (e, s) {
_log.severe('Failed to fetch all countries.', e, s);
_log.severe(
'Failed to fetch all countries with nameFilter: $nameFilter.',
e,
s,
);
throw OperationFailedException('Failed to retrieve all countries: $e');
}
}
Expand All @@ -112,56 +145,84 @@ class CountryService {
///
/// Uses MongoDB aggregation to efficiently get distinct country IDs
/// and then fetches the full Country objects. Results are cached.
Future<List<Country>> _getEventCountries() async {
if (_cachedEventCountries != null && _cachedEventCountries!.isValid()) {
_log.finer('Returning cached event countries.');
return _cachedEventCountries!.data;
///
/// - [nameFilter]: An optional map containing a regex filter for the country name.
Future<List<Country>> _getEventCountries({
Map<String, dynamic>? nameFilter,
}) async {
final cacheKey = 'eventCountry_${nameFilter ?? 'noFilter'}';
if (_cachedEventCountries.containsKey(cacheKey) &&
_cachedEventCountries[cacheKey]!.isValid()) {
_log.finer('Returning cached event countries for key: $cacheKey.');
return _cachedEventCountries[cacheKey]!.data;
}
// Atomically retrieve or create the future for the specific cache key.
var future = _eventCountriesFutures[cacheKey];
if (future == null) {
future = _fetchAndCacheEventCountries(
nameFilter: nameFilter,
).whenComplete(() => _eventCountriesFutures.remove(cacheKey));
_eventCountriesFutures[cacheKey] = future;
}
// Atomically assign the future if no fetch is in progress,
// and clear it when the future completes.
_eventCountriesFuture ??= _fetchAndCacheEventCountries()
.whenComplete(() => _eventCountriesFuture = null);
return _eventCountriesFuture!;
return future;
}

/// Fetches a distinct list of countries that are referenced as
/// `headquarters` in sources.
///
/// Uses MongoDB aggregation to efficiently get distinct country IDs
/// and then fetches the full Country objects. Results are cached.
Future<List<Country>> _getHeadquarterCountries() async {
if (_cachedHeadquarterCountries != null &&
_cachedHeadquarterCountries!.isValid()) {
_log.finer('Returning cached headquarter countries.');
return _cachedHeadquarterCountries!.data;
///
/// - [nameFilter]: An optional map containing a regex filter for the country name.
Future<List<Country>> _getHeadquarterCountries({
Map<String, dynamic>? nameFilter,
}) async {
final cacheKey = 'headquarters_${nameFilter ?? 'noFilter'}';
if (_cachedHeadquarterCountries.containsKey(cacheKey) &&
_cachedHeadquarterCountries[cacheKey]!.isValid()) {
_log.finer('Returning cached headquarter countries for key: $cacheKey.');
return _cachedHeadquarterCountries[cacheKey]!.data;
}
// Atomically assign the future if no fetch is in progress,
// and clear it when the future completes.
_headquarterCountriesFuture ??= _fetchAndCacheHeadquarterCountries()
.whenComplete(() => _headquarterCountriesFuture = null);
return _headquarterCountriesFuture!;
// Atomically retrieve or create the future for the specific cache key.
var future = _headquarterCountriesFutures[cacheKey];
if (future == null) {
future = _fetchAndCacheHeadquarterCountries(
nameFilter: nameFilter,
).whenComplete(() => _headquarterCountriesFutures.remove(cacheKey));
_headquarterCountriesFutures[cacheKey] = future;
}
return future;
}

/// Helper method to fetch and cache distinct event countries.
Future<List<Country>> _fetchAndCacheEventCountries() async {
_log.finer('Fetching distinct event countries via aggregation.');
///
/// - [nameFilter]: An optional map containing a regex filter for the country name.
Future<List<Country>> _fetchAndCacheEventCountries({
Map<String, dynamic>? nameFilter,
}) async {
_log.finer(
'Fetching distinct event countries via aggregation with nameFilter: $nameFilter.',
);
try {
final distinctCountries = await _getDistinctCountriesFromAggregation(
repository: _headlineRepository,
fieldName: 'eventCountry',
nameFilter: nameFilter,
);
_cachedEventCountries = _CacheEntry(
final cacheKey = 'eventCountry_${nameFilter ?? 'noFilter'}';
_cachedEventCountries[cacheKey] = _CacheEntry(
distinctCountries,
DateTime.now().add(_cacheDuration),
);
_log.info(
'Successfully fetched and cached ${distinctCountries.length} '
'event countries.',
'event countries for key: $cacheKey.',
);
return distinctCountries;
} catch (e, s) {
_log.severe(
'Failed to fetch distinct event countries via aggregation.',
'Failed to fetch distinct event countries via aggregation '
'with nameFilter: $nameFilter.',
e,
s,
);
Expand All @@ -170,25 +231,34 @@ class CountryService {
}

/// Helper method to fetch and cache distinct headquarter countries.
Future<List<Country>> _fetchAndCacheHeadquarterCountries() async {
_log.finer('Fetching distinct headquarter countries via aggregation.');
///
/// - [nameFilter]: An optional map containing a regex filter for the country name.
Future<List<Country>> _fetchAndCacheHeadquarterCountries({
Map<String, dynamic>? nameFilter,
}) async {
_log.finer(
'Fetching distinct headquarter countries via aggregation with nameFilter: $nameFilter.',
);
try {
final distinctCountries = await _getDistinctCountriesFromAggregation(
repository: _sourceRepository,
fieldName: 'headquarters',
nameFilter: nameFilter,
);
_cachedHeadquarterCountries = _CacheEntry(
final cacheKey = 'headquarters_${nameFilter ?? 'noFilter'}';
_cachedHeadquarterCountries[cacheKey] = _CacheEntry(
distinctCountries,
DateTime.now().add(_cacheDuration),
);
_log.info(
'Successfully fetched and cached ${distinctCountries.length} '
'headquarter countries.',
'headquarter countries for key: $cacheKey.',
);
return distinctCountries;
} catch (e, s) {
_log.severe(
'Failed to fetch distinct headquarter countries via aggregation.',
'Failed to fetch distinct headquarter countries via aggregation '
'with nameFilter: $nameFilter.',
e,
s,
);
Expand All @@ -202,29 +272,40 @@ class CountryService {
/// - [repository]: The [DataRepository] to perform the aggregation on.
/// - [fieldName]: The name of the field within the documents that contains
/// the country object (e.g., 'eventCountry', 'headquarters').
/// - [nameFilter]: An optional map containing a regex filter for the country name.
///
/// Throws [OperationFailedException] for internal errors during data fetch.
Future<List<Country>> _getDistinctCountriesFromAggregation<T extends FeedItem>({
Future<List<Country>>
_getDistinctCountriesFromAggregation<T extends FeedItem>({
required DataRepository<T> repository,
required String fieldName,
Map<String, dynamic>? nameFilter,
}) async {
_log.finer('Fetching distinct countries for field "$fieldName" via aggregation.');
_log.finer(
'Fetching distinct countries for field "$fieldName" via aggregation '
'with nameFilter: $nameFilter.',
);
try {
final pipeline = [
{
r'$match': {
'status': ContentStatus.active.name,
'$fieldName.id': {r'$exists': true},
},
},
{
r'$group': {
final matchStage = <String, Object>{
'status': ContentStatus.active.name,
'$fieldName.id': <String, Object>{r'$exists': true},
};

// Add name filter if provided
if (nameFilter != null && nameFilter.isNotEmpty) {
matchStage['$fieldName.name'] = nameFilter;
}

final pipeline = <Map<String, Object>>[
<String, Object>{r'$match': matchStage},
<String, Object>{
r'$group': <String, Object>{
'_id': '\$$fieldName.id',
'country': {r'$first': '\$$fieldName'},
'country': <String, Object>{r'$first': '\$$fieldName'},
},
},
{
r'$replaceRoot': {'newRoot': r'$country'},
<String, Object>{
r'$replaceRoot': <String, Object>{'newRoot': r'$country'},
},
];

Expand All @@ -238,12 +319,13 @@ class CountryService {

_log.info(
'Successfully fetched ${distinctCountries.length} distinct countries '
'for field "$fieldName".',
'for field "$fieldName" with nameFilter: $nameFilter.',
);
return distinctCountries;
} catch (e, s) {
_log.severe(
'Failed to fetch distinct countries for field "$fieldName".',
'Failed to fetch distinct countries for field "$fieldName" '
'with nameFilter: $nameFilter.',
e,
s,
);
Expand Down
5 changes: 5 additions & 0 deletions lib/src/services/database_seeding_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class DatabaseSeedingService {
.collection('sources')
.createIndex(keys: {'name': 'text'}, name: 'sources_text_index');

// Index for searching countries by name (case-insensitive friendly)
await _db
.collection('countries')
.createIndex(keys: {'name': 1}, name: 'countries_name_index');

// Indexes for country aggregation queries
await _db
.collection('headlines')
Expand Down
Loading