From b2cc0db6036623c73f914b9151309ca6c4bf904c Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Thu, 17 Oct 2024 11:21:43 +0200 Subject: [PATCH] Refactor/move UserMerger utility into admin/tool --- app/lib/admin/tools/user_merger.dart | 251 ++++++++++++++++- app/lib/shared/user_merger.dart | 256 ------------------ .../{shared => admin}/user_merger_test.dart | 4 +- 3 files changed, 251 insertions(+), 260 deletions(-) delete mode 100644 app/lib/shared/user_merger.dart rename app/test/{shared => admin}/user_merger_test.dart (98%) diff --git a/app/lib/admin/tools/user_merger.dart b/app/lib/admin/tools/user_merger.dart index 59534b5fbc..25afd32aae 100644 --- a/app/lib/admin/tools/user_merger.dart +++ b/app/lib/admin/tools/user_merger.dart @@ -3,10 +3,17 @@ // BSD-style license that can be found in the LICENSE file. import 'package:args/args.dart'; - +import 'package:logging/logging.dart'; +import 'package:pool/pool.dart'; +import 'package:pub_dev/account/backend.dart'; +import 'package:pub_dev/account/models.dart'; +import 'package:pub_dev/audit/models.dart'; +import 'package:pub_dev/package/models.dart'; +import 'package:pub_dev/publisher/models.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/exceptions.dart'; -import 'package:pub_dev/shared/user_merger.dart'; + +final _logger = Logger('user_merger'); final _argParser = ArgParser() ..addOption('concurrency', @@ -49,3 +56,243 @@ Future executeUserMergerTool(List args) async { return 'Fixed $count `User` entities.'; } } + +/// Utility class to merge user data. +/// Specifically for the case where a two [User] entities exists with the same [User.oauthUserId]. +class UserMerger { + final DatastoreDB _db; + final int? _concurrency; + final bool _omitEmailCheck; + + UserMerger({ + required DatastoreDB db, + int? concurrency = 1, + bool? omitEmailCheck, + }) : _db = db, + _concurrency = concurrency, + _omitEmailCheck = omitEmailCheck ?? false; + + /// Fixes all OAuthUserID issues. + Future fixAll() async { + final ids = await scanOauthUserIdsWithProblems(); + for (final id in ids) { + await fixOAuthUserID(id); + } + return ids.length; + } + + /// Returns the OAuth userIds that have more than one User. + Future> scanOauthUserIdsWithProblems() async { + _logger.info('Scanning Users...'); + final query = _db.query(); + final counts = {}; + await for (final user in query.run()) { + if (user.oauthUserId == null) continue; + counts[user.oauthUserId!] = (counts[user.oauthUserId!] ?? 0) + 1; + } + final result = counts.keys.where((k) => counts[k]! > 1).toList(); + _logger.info('$result OAuthUserID with more than one User.'); + return result; + } + + /// Runs user merging on the [oauthUserId] for each non-primary [User]. + Future fixOAuthUserID(String oauthUserId) async { + _logger.info('Fixing OAuthUserID=$oauthUserId'); + + final query = _db.query()..filter('oauthUserId =', oauthUserId); + final users = await query.run().toList(); + _logger.info('Users: ${users.map((u) => u.userId).join(', ')}'); + + final mapping = await _db.lookupValue( + _db.emptyKey.append(OAuthUserID, id: oauthUserId)); + _logger.info('Primary User: ${mapping.userId}'); + if (!users.any((u) => u.userId == mapping.userId)) { + throw StateError('Primary User is missing!'); + } + + // WARNING + // + // We only update user ids, we do not change e-mails. + // The tool will NOT merge Users with non-matching e-mail addresses. + if (!_omitEmailCheck) { + for (int i = 1; i < users.length; i++) { + if (users[0].email != users[i].email) { + throw StateError( + 'User e-mail does not match: ${users[0].email} != ${users[i].email}'); + } + } + } + + for (final user in users) { + if (user.userId == mapping.userId) continue; + await mergeUser(user.userId, mapping.userId); + } + } + + /// Migrates data for User merge. + Future mergeUser(String fromUserId, String toUserId) async { + _logger.info('Merging User: $fromUserId -> $toUserId'); + final fromUserKey = _db.emptyKey.append(User, id: fromUserId); + final toUserKey = _db.emptyKey.append(User, id: toUserId); + final fromUser = await _db.lookupOrNull(fromUserKey); + InvalidInputException.checkNotNull(fromUser, 'fromUser'); + final toUser = await _db.lookupOrNull(toUserKey); + InvalidInputException.checkNotNull(toUser, 'toUser'); + final fromUserMapping = fromUser!.oauthUserId == null + ? null + : await _db.lookupOrNull( + _db.emptyKey.append(OAuthUserID, id: fromUser.oauthUserId)); + final toUserMapping = toUser!.oauthUserId == null + ? null + : await _db.lookupOrNull( + _db.emptyKey.append(OAuthUserID, id: toUser.oauthUserId)); + + // Package + await _processConcurrently( + _db.query()..filter('uploaders =', fromUserId), + (Package m) async { + await withRetryTransaction(_db, (tx) async { + final p = await tx.lookupValue(m.key); + if (p.containsUploader(fromUserId)) { + p.removeUploader(fromUserId); + p.addUploader(toUserId); + tx.insert(p); + } + }); + }, + ); + + // PackageVersion + await _processConcurrently( + _db.query()..filter('uploader =', fromUserId), + (PackageVersion m) async { + await withRetryTransaction(_db, (tx) async { + final pv = await tx.lookupValue(m.key); + if (pv.uploader == fromUserId) { + pv.uploader = toUserId; + tx.insert(pv); + } + }); + }, + ); + + // Like + await _processConcurrently( + _db.query(ancestorKey: fromUserKey), + (Like like) async { + await withRetryTransaction(_db, (tx) async { + tx.queueMutations( + inserts: [like.changeParentUser(toUserKey)], + deletes: [like.key], + ); + }); + }, + ); + + // UserSession + await _processConcurrently( + _db.query()..filter('userId =', fromUserId), + (UserSession m) async { + await withRetryTransaction(_db, (tx) async { + final session = await tx.lookupValue(m.key); + if (session.userId == fromUserId) { + session.userId = toUserId; + tx.insert(session); + } + }); + }, + ); + + // Consent's fromUserId attribute + await _processConcurrently( + _db.query()..filter('fromAgent =', fromUserId), + (Consent m) async { + if (m.parentKey?.id != null) { + throw StateError('Old Consent entity: ${m.consentId}.'); + } + await withRetryTransaction(_db, (tx) async { + final consent = await tx.lookupValue(m.key); + if (consent.fromAgent == fromUserId) { + consent.fromAgent = toUserId; + tx.insert(consent); + } + }); + }, + ); + + // PublisherMember + await _processConcurrently( + _db.query()..filter('userId =', fromUserId), + (PublisherMember m) async { + await withRetryTransaction(_db, (tx) async { + tx.queueMutations( + inserts: [m.changeParentUserId(toUserId)], + deletes: [m.key], + ); + }); + }, + ); + + // AuditLogRecord: agent + await _processConcurrently( + _db.query()..filter('agent =', fromUserId), + (AuditLogRecord alr) async { + await withRetryTransaction(_db, (tx) async { + final r = await _db.lookupValue(alr.key); + r.agent = toUserId; + r.data = r.data?.map((key, value) => MapEntry( + key, value == fromUserId ? toUserId : value)); + tx.insert(r); + }); + }); + + // AuditLogRecord: users + await _processConcurrently( + _db.query()..filter('users =', fromUserId), + (AuditLogRecord alr) async { + await withRetryTransaction(_db, (tx) async { + final r = await _db.lookupValue(alr.key); + r.users!.remove(fromUserId); + r.users!.add(toUserId); + r.data = r.data?.map( + (key, value) => MapEntry( + key, value == fromUserId ? toUserId : value), + ); + tx.insert(r); + }); + }); + + await withRetryTransaction(_db, (tx) async { + final u = await _db.lookupValue(toUserKey); + if (toUser.created!.isAfter(fromUser.created!)) { + u.created = fromUser.created; + } + if (toUserMapping == null) { + u.oauthUserId = null; + } + if (fromUserMapping?.userId == toUserId) { + u.oauthUserId = fromUserMapping!.oauthUserId; + } + tx.insert(u); + tx.delete(fromUserKey); + if (fromUserMapping?.userId == fromUserId) { + tx.delete(fromUserMapping!.key); + } + }); + + await purgeAccountCache(userId: fromUserId); + await purgeAccountCache(userId: toUserId); + } + + Future _processConcurrently( + Query query, Future Function(T) fn) async { + final pool = Pool(_concurrency!); + final futures = []; + await for (final m in query.run()) { + final f = pool.withResource(() => fn(m)); + futures.add(f); + } + await Future.wait(futures); + await pool.close(); + } +} diff --git a/app/lib/shared/user_merger.dart b/app/lib/shared/user_merger.dart deleted file mode 100644 index 4be1e5d137..0000000000 --- a/app/lib/shared/user_merger.dart +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'package:logging/logging.dart'; -import 'package:pool/pool.dart'; -import 'package:pub_dev/account/backend.dart'; -import 'package:pub_dev/shared/exceptions.dart'; - -import '../account/models.dart'; -import '../audit/models.dart'; -import '../package/models.dart'; -import '../publisher/models.dart'; -import 'datastore.dart'; - -final _logger = Logger('user_merger'); - -/// Utility class to merge user data. -/// Specifically for the case where a two [User] entities exists with the same [User.oauthUserId]. -class UserMerger { - final DatastoreDB _db; - final int? _concurrency; - final bool _omitEmailCheck; - - UserMerger({ - required DatastoreDB db, - int? concurrency = 1, - bool? omitEmailCheck, - }) : _db = db, - _concurrency = concurrency, - _omitEmailCheck = omitEmailCheck ?? false; - - /// Fixes all OAuthUserID issues. - Future fixAll() async { - final ids = await scanOauthUserIdsWithProblems(); - for (final id in ids) { - await fixOAuthUserID(id); - } - return ids.length; - } - - /// Returns the OAuth userIds that have more than one User. - Future> scanOauthUserIdsWithProblems() async { - _logger.info('Scanning Users...'); - final query = _db.query(); - final counts = {}; - await for (final user in query.run()) { - if (user.oauthUserId == null) continue; - counts[user.oauthUserId!] = (counts[user.oauthUserId!] ?? 0) + 1; - } - final result = counts.keys.where((k) => counts[k]! > 1).toList(); - _logger.info('$result OAuthUserID with more than one User.'); - return result; - } - - /// Runs user merging on the [oauthUserId] for each non-primary [User]. - Future fixOAuthUserID(String oauthUserId) async { - _logger.info('Fixing OAuthUserID=$oauthUserId'); - - final query = _db.query()..filter('oauthUserId =', oauthUserId); - final users = await query.run().toList(); - _logger.info('Users: ${users.map((u) => u.userId).join(', ')}'); - - final mapping = await _db.lookupValue( - _db.emptyKey.append(OAuthUserID, id: oauthUserId)); - _logger.info('Primary User: ${mapping.userId}'); - if (!users.any((u) => u.userId == mapping.userId)) { - throw StateError('Primary User is missing!'); - } - - // WARNING - // - // We only update user ids, we do not change e-mails. - // The tool will NOT merge Users with non-matching e-mail addresses. - if (!_omitEmailCheck) { - for (int i = 1; i < users.length; i++) { - if (users[0].email != users[i].email) { - throw StateError( - 'User e-mail does not match: ${users[0].email} != ${users[i].email}'); - } - } - } - - for (final user in users) { - if (user.userId == mapping.userId) continue; - await mergeUser(user.userId, mapping.userId); - } - } - - /// Migrates data for User merge. - Future mergeUser(String fromUserId, String toUserId) async { - _logger.info('Merging User: $fromUserId -> $toUserId'); - final fromUserKey = _db.emptyKey.append(User, id: fromUserId); - final toUserKey = _db.emptyKey.append(User, id: toUserId); - final fromUser = await _db.lookupOrNull(fromUserKey); - InvalidInputException.checkNotNull(fromUser, 'fromUser'); - final toUser = await _db.lookupOrNull(toUserKey); - InvalidInputException.checkNotNull(toUser, 'toUser'); - final fromUserMapping = fromUser!.oauthUserId == null - ? null - : await _db.lookupOrNull( - _db.emptyKey.append(OAuthUserID, id: fromUser.oauthUserId)); - final toUserMapping = toUser!.oauthUserId == null - ? null - : await _db.lookupOrNull( - _db.emptyKey.append(OAuthUserID, id: toUser.oauthUserId)); - - // Package - await _processConcurrently( - _db.query()..filter('uploaders =', fromUserId), - (Package m) async { - await withRetryTransaction(_db, (tx) async { - final p = await tx.lookupValue(m.key); - if (p.containsUploader(fromUserId)) { - p.removeUploader(fromUserId); - p.addUploader(toUserId); - tx.insert(p); - } - }); - }, - ); - - // PackageVersion - await _processConcurrently( - _db.query()..filter('uploader =', fromUserId), - (PackageVersion m) async { - await withRetryTransaction(_db, (tx) async { - final pv = await tx.lookupValue(m.key); - if (pv.uploader == fromUserId) { - pv.uploader = toUserId; - tx.insert(pv); - } - }); - }, - ); - - // Like - await _processConcurrently( - _db.query(ancestorKey: fromUserKey), - (Like like) async { - await withRetryTransaction(_db, (tx) async { - tx.queueMutations( - inserts: [like.changeParentUser(toUserKey)], - deletes: [like.key], - ); - }); - }, - ); - - // UserSession - await _processConcurrently( - _db.query()..filter('userId =', fromUserId), - (UserSession m) async { - await withRetryTransaction(_db, (tx) async { - final session = await tx.lookupValue(m.key); - if (session.userId == fromUserId) { - session.userId = toUserId; - tx.insert(session); - } - }); - }, - ); - - // Consent's fromUserId attribute - await _processConcurrently( - _db.query()..filter('fromAgent =', fromUserId), - (Consent m) async { - if (m.parentKey?.id != null) { - throw StateError('Old Consent entity: ${m.consentId}.'); - } - await withRetryTransaction(_db, (tx) async { - final consent = await tx.lookupValue(m.key); - if (consent.fromAgent == fromUserId) { - consent.fromAgent = toUserId; - tx.insert(consent); - } - }); - }, - ); - - // PublisherMember - await _processConcurrently( - _db.query()..filter('userId =', fromUserId), - (PublisherMember m) async { - await withRetryTransaction(_db, (tx) async { - tx.queueMutations( - inserts: [m.changeParentUserId(toUserId)], - deletes: [m.key], - ); - }); - }, - ); - - // AuditLogRecord: agent - await _processConcurrently( - _db.query()..filter('agent =', fromUserId), - (AuditLogRecord alr) async { - await withRetryTransaction(_db, (tx) async { - final r = await _db.lookupValue(alr.key); - r.agent = toUserId; - r.data = r.data?.map((key, value) => MapEntry( - key, value == fromUserId ? toUserId : value)); - tx.insert(r); - }); - }); - - // AuditLogRecord: users - await _processConcurrently( - _db.query()..filter('users =', fromUserId), - (AuditLogRecord alr) async { - await withRetryTransaction(_db, (tx) async { - final r = await _db.lookupValue(alr.key); - r.users!.remove(fromUserId); - r.users!.add(toUserId); - r.data = r.data?.map( - (key, value) => MapEntry( - key, value == fromUserId ? toUserId : value), - ); - tx.insert(r); - }); - }); - - await withRetryTransaction(_db, (tx) async { - final u = await _db.lookupValue(toUserKey); - if (toUser.created!.isAfter(fromUser.created!)) { - u.created = fromUser.created; - } - if (toUserMapping == null) { - u.oauthUserId = null; - } - if (fromUserMapping?.userId == toUserId) { - u.oauthUserId = fromUserMapping!.oauthUserId; - } - tx.insert(u); - tx.delete(fromUserKey); - if (fromUserMapping?.userId == fromUserId) { - tx.delete(fromUserMapping!.key); - } - }); - - await purgeAccountCache(userId: fromUserId); - await purgeAccountCache(userId: toUserId); - } - - Future _processConcurrently( - Query query, Future Function(T) fn) async { - final pool = Pool(_concurrency!); - final futures = []; - await for (final m in query.run()) { - final f = pool.withResource(() => fn(m)); - futures.add(f); - } - await Future.wait(futures); - await pool.close(); - } -} diff --git a/app/test/shared/user_merger_test.dart b/app/test/admin/user_merger_test.dart similarity index 98% rename from app/test/shared/user_merger_test.dart rename to app/test/admin/user_merger_test.dart index 3b93e6644f..e47a7fb2a9 100644 --- a/app/test/shared/user_merger_test.dart +++ b/app/test/admin/user_merger_test.dart @@ -7,15 +7,15 @@ import 'package:gcloud/db.dart'; import 'package:pub_dev/account/backend.dart'; import 'package:pub_dev/account/like_backend.dart'; import 'package:pub_dev/account/models.dart'; +import 'package:pub_dev/admin/tools/user_merger.dart'; import 'package:pub_dev/audit/backend.dart'; import 'package:pub_dev/fake/backend/fake_auth_provider.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/publisher/backend.dart'; -import 'package:pub_dev/shared/user_merger.dart'; import 'package:pub_dev/tool/test_profile/models.dart'; import 'package:test/test.dart'; -import 'test_services.dart'; +import '../shared/test_services.dart'; void main() { Future _corruptAndFix() async {