diff --git a/backoffice/settings/local.py b/backoffice/settings/local.py index 68ade3e..567f42a 100644 --- a/backoffice/settings/local.py +++ b/backoffice/settings/local.py @@ -8,3 +8,5 @@ ] CORS_ALLOW_ALL_ORIGINS = True + +# DATABASES["prod"] = env.db() # noqa: F405 diff --git a/modules/data_migrate/__init__.py b/modules/data_migrate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/data_migrate/posts_migration.py b/modules/data_migrate/posts_migration.py new file mode 100644 index 0000000..10f7968 --- /dev/null +++ b/modules/data_migrate/posts_migration.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +""" +게시글 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 Post 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from posts.models import Post +from users.models import User + +print("게시글 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 + chunk_size = 500 + offset = 0 + total_migrated = 0 + success_count = 0 + update_count = 0 + skipped_count = 0 + + while True: + # 원격 DB에서 게시글 데이터를 청크 단위로 가져오기 + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT p.id, p.created_at, p.updated_at, p.post_uuid, p.user_id, + p.title, p.is_active, p.slug, p.released_at, u.velog_uuid + FROM posts_post p + JOIN users_user u ON p.user_id = u.id + ORDER BY p.id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + posts = cursor.fetchall() + + if not posts: + break + + print(f"게시글 데이터 {len(posts)}개 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for post in posts: + # 게시글이 이미 로컬에 존재하는지 확인 + existing_post = Post.objects.filter(post_uuid=post[3]).first() + + # velog_uuid를 이용해 로컬 사용자 찾기 + velog_uuid = post[9] # velog_uuid는 10번째 컬럼 + + try: + user = User.objects.get(velog_uuid=velog_uuid) + except User.DoesNotExist: + print( + f"UUID {velog_uuid}의 사용자가 로컬에 존재하지 않습니다. 게시글 {post[3]} 건너뜁니다." + ) + skipped_count += 1 + continue + + if existing_post: + print( + f"UUID {post[3]}의 게시글이 이미 존재합니다. 정보를 업데이트합니다..." + ) + # 기존 게시글 정보 업데이트 + existing_post.title = post[5] + existing_post.is_active = post[6] + existing_post.slug = post[7] + existing_post.released_at = post[8] + existing_post.save() + update_count += 1 + else: + print(f"UUID {post[3]}의 새 게시글을 생성합니다.") + # 새 게시글 생성 + Post.objects.create( + created_at=post[1], + updated_at=post[2], + post_uuid=post[3], + user=user, + title=post[5], + is_active=post[6], + slug=post[7], + released_at=post[8], + ) + success_count += 1 + + total_migrated += len(posts) + offset += chunk_size + print( + f"현재까지 {total_migrated}개의 게시글을 마이그레이션했습니다..." + ) + + print( + f"게시글 마이그레이션이 완료되었습니다. 새로 생성: {success_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" + ) + print(f"총 처리된 레코드: {total_migrated}개") + +except Exception as e: + print(f"게시글 마이그레이션 중 오류 발생: {e}") diff --git a/modules/data_migrate/statistics_migration.py b/modules/data_migrate/statistics_migration.py new file mode 100644 index 0000000..2f58f0a --- /dev/null +++ b/modules/data_migrate/statistics_migration.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" +게시글 통계 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 PostDailyStatistics 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from posts.models import Post, PostDailyStatistics + +print("게시글 통계 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 + chunk_size = 1000 + offset = 0 + total_migrated = 0 + new_count = 0 + update_count = 0 + skipped_count = 0 + + while True: + # 원격 DB에서 통계 데이터를 청크 단위로 가져오기 (post_uuid와 함께) + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT pds.id, pds.created_at, pds.updated_at, pds.post_id, pds.date, + pds.daily_view_count, pds.daily_like_count, p.post_uuid + FROM posts_postdailystatistics pds + JOIN posts_post p ON pds.post_id = p.id + WHERE pds.date >= CURRENT_DATE - INTERVAL '3 days' + ORDER BY pds.id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + stats_chunk = cursor.fetchall() + + if not stats_chunk: + break + + print(f"통계 데이터 {len(stats_chunk)}개 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for stat in stats_chunk: + # post_uuid를 이용해 로컬 게시글 찾기 + post_uuid = stat[7] # post_uuid는 8번째 컬럼 + + try: + post = Post.objects.get(post_uuid=post_uuid) + except Post.DoesNotExist: + print( + f"UUID {post_uuid}의 게시글이 로컬에 존재하지 않습니다. 통계 {stat[0]} 건너뜁니다." + ) + skipped_count += 1 + continue + + # 통계가 이미 로컬에 존재하는지 확인 (post 객체와 date로 찾기) + existing_stat = PostDailyStatistics.objects.filter( + post=post, date=stat[4] + ).first() + + if existing_stat: + # 기존 통계 정보 업데이트 + existing_stat.daily_view_count = stat[5] + existing_stat.daily_like_count = stat[6] + existing_stat.save() + update_count += 1 + else: + # 새 통계 생성 + PostDailyStatistics.objects.create( + id=stat[0], + created_at=stat[1], + updated_at=stat[2], + post=post, # 로컬 post 객체 사용 + date=stat[4], + daily_view_count=stat[5], + daily_like_count=stat[6], + ) + new_count += 1 + + total_migrated += len(stats_chunk) + offset += chunk_size + print( + f"현재까지 {total_migrated}개의 통계 데이터를 마이그레이션했습니다..." + ) + + print( + f"게시글 통계 마이그레이션이 완료되었습니다. 새로 생성: {new_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" + ) + print(f"총 처리된 레코드: {total_migrated}개") + +except Exception as e: + print(f"게시글 통계 마이그레이션 중 오류 발생: {e}") diff --git a/modules/data_migrate/users_migration.py b/modules/data_migrate/users_migration.py new file mode 100644 index 0000000..ff42e7a --- /dev/null +++ b/modules/data_migrate/users_migration.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +""" +사용자 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 User 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from users.models import User + +print("사용자 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 (메모리 문제 방지를 위해) + chunk_size = 500 + offset = 0 + total_migrated = 0 + success_count = 0 + update_count = 0 + + while True: + # 원격 DB에서 사용자 데이터를 청크 단위로 가져오기 + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT id, created_at, updated_at, velog_uuid, access_token, + refresh_token, group_id, email, is_active + FROM users_user + ORDER BY id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + users = cursor.fetchall() + + if not users: + break + + print(f"사용자 데이터 {len(users)}명 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for user in users: + # 사용자가 이미 로컬에 존재하는지 확인 + existing_user = User.objects.filter(velog_uuid=user[3]).first() + + if existing_user: + print( + f"UUID {user[3]}의 사용자가 이미 존재합니다. 정보를 업데이트합니다..." + ) + # 기존 사용자 정보 업데이트 + existing_user.access_token = user[4] + existing_user.refresh_token = user[5] + existing_user.group_id = user[6] + existing_user.email = user[7] + existing_user.is_active = user[8] + existing_user.save() + update_count += 1 + else: + print(f"UUID {user[3]}의 새 사용자를 생성합니다.") + # 새 사용자 생성 + User.objects.create( + created_at=user[1], + updated_at=user[2], + velog_uuid=user[3], + access_token=user[4], + refresh_token=user[5], + group_id=user[6], + email=user[7], + is_active=user[8], + ) + success_count += 1 + + total_migrated += len(users) + offset += chunk_size + print( + f"현재까지 {total_migrated}명의 사용자를 마이그레이션했습니다..." + ) + + print( + f"사용자 마이그레이션이 완료되었습니다. 새로 생성: {success_count}명, 업데이트: {update_count}명" + ) + print(f"총 처리된 사용자: {total_migrated}명") + +except Exception as e: + print(f"사용자 마이그레이션 중 오류 발생: {e}") diff --git a/scraping/main.py b/scraping/main.py index 25c3d88..46964f0 100644 --- a/scraping/main.py +++ b/scraping/main.py @@ -5,7 +5,6 @@ import async_timeout import environ import sentry_sdk - from asgiref.sync import sync_to_async from django.db import transaction @@ -141,6 +140,74 @@ def _execute_transaction() -> None: await _execute_transaction() + async def sync_post_active_status( + self, + user: User, + current_post_ids: set[str], + min_posts_threshold: int = 1, + ) -> None: + """현재 API에서 가져온 게시글 목록을 기준으로 활성/비활성 상태 동기화 + + Args: + user: 대상 사용자 + current_post_ids: 현재 API에서 가져온 게시글 ID 집합 + min_posts_threshold: API 응답에 최소 이 개수 이상의 게시글이 있어야 상태변경을 실행 + """ + + # API 응답이 너무 적으면 상태변경 하지 않음 (API 오류 가능성) + if len(current_post_ids) < min_posts_threshold: + logger.warning( + f"Skipping post status sync for user {user.velog_uuid} - Too few posts returned ({len(current_post_ids)})" + ) + return + + @sync_to_async(thread_sensitive=True) # type: ignore + def _execute_sync() -> None: + # 1. 비활성화 로직: 현재 목록에 없는 활성화된 게시글 찾기 + posts_to_deactivate = Post.objects.filter( + user=user, is_active=True + ).exclude(post_uuid__in=current_post_ids) + + deactivation_count = posts_to_deactivate.count() + + # 너무 많은 게시글이 비활성화되는 경우 방어 로직 + active_posts_count = Post.objects.filter( + user=user, is_active=True + ).count() + + if ( + active_posts_count > 0 + and deactivation_count / active_posts_count > 0.5 + ): + logger.warning( + f"Suspicious deactivation detected for user {user.velog_uuid}: " + f"Would deactivate {deactivation_count} out of {active_posts_count} posts. " + f"Skipping post status sync as a safety measure." + ) + return + + # 2. 재활성화 로직: 현재 목록에 있지만 비활성화된 게시글 찾기 + posts_to_reactivate = Post.objects.filter( + user=user, is_active=False, post_uuid__in=current_post_ids + ) + + reactivation_count = posts_to_reactivate.count() + + # 상태 업데이트 실행 + if deactivation_count > 0: + logger.info( + f"Deactivating {deactivation_count} posts for user {user.velog_uuid}" + ) + posts_to_deactivate.update(is_active=False) + + if reactivation_count > 0: + logger.info( + f"Reactivating {reactivation_count} posts for user {user.velog_uuid}" + ) + posts_to_reactivate.update(is_active=True) + + await _execute_sync() + async def update_daily_statistics( self, post: dict[str, str], stats: dict[str, str] ) -> None: @@ -290,9 +357,19 @@ async def process_user( fetched_posts = await fetch_all_velog_posts( session, username, origin_access_token, origin_refresh_token ) + all_post_ids = {post["id"] for post in fetched_posts} + logger.info( + f"Fetched {len(all_post_ids)} posts for user {user.velog_uuid}" + ) + # 게시물이 새로 생겼으면 추가, 아니면 업데이트 await self.bulk_upsert_posts(user, fetched_posts) + # 게시글 활성/비활성 상태 동기화 + await self.sync_post_active_status( + user, all_post_ids, min_posts_threshold=1 + ) + # 게시물을 적절한 크기의 청크로 나누어 처리 chunk_size = 20 for i in range(0, len(fetched_posts), chunk_size): diff --git a/scraping/tests/test_main.py b/scraping/tests/test_main.py deleted file mode 100644 index 8a56ab3..0000000 --- a/scraping/tests/test_main.py +++ /dev/null @@ -1,496 +0,0 @@ -import uuid -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -from asgiref.sync import sync_to_async -from django.db import transaction - -from posts.models import Post, PostDailyStatistics -from scraping.main import Scraper -from users.models import User -from utils.utils import get_local_now - - -class TestScraper: - @pytest.fixture - def scraper(self): - """Scraper 인스턴스 생성""" - return Scraper(group_range=range(1, 10), max_connections=10) - - @pytest.fixture - def user(self, db): - """테스트용 User 객체 생성""" - return User.objects.create( - velog_uuid=uuid.uuid4(), - access_token="encrypted-access-token", - refresh_token="encrypted-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_success( - self, mock_aes, scraper, user - ) -> None: - """토큰 업데이트 성공 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "new-access-token", - "refresh_token": "new-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is True - mock_asave.assert_called_once() - assert user.access_token == "encrypted-new-access-token" - assert user.refresh_token == "encrypted-new-refresh-token" - mock_encryption.decrypt.assert_any_call("encrypted-access-token") - mock_encryption.decrypt.assert_any_call("encrypted-refresh-token") - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_no_change( - self, mock_aes, scraper, user - ) -> None: - """토큰 업데이트 없음 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = lambda token: token - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "encrypted-access-token", - "refresh_token": "encrypted-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_expired_failure( - self, mock_aes, scraper, user - ): - """토큰이 만료되었을 때 업데이트 실패 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "decrypted-encrypted-access-token", - "refresh_token": "decrypted-encrypted-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_with_mocked_decryption_failure( - self, mock_aes, scraper, user - ): - """복호화가 제대로 되지 않았을 경우 업데이트 실패 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = lambda token: None - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "valid_token", - "refresh_token": "valid_token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @pytest.mark.asyncio - async def test_bulk_upsert_posts_success(self, scraper, user): - """Post 객체 배치 분할 삽입 또는 업데이트 성공 테스트""" - posts_data = [ - { - "id": str(uuid.uuid4()), - "title": f"Title {i}", - "url_slug": f"slug-{i}", - "released_at": get_local_now(), - } - for i in range(50) - ] - - # _upsert_batch 메서드만 모킹 - with patch.object( - scraper, "_upsert_batch", new_callable=AsyncMock - ) as mock_upsert: - with patch("asyncio.sleep", new_callable=AsyncMock): - result = await scraper.bulk_upsert_posts( - user, posts_data, batch_size=10 - ) - - assert result is True - # 50개 / 10개 배치 = 5번 호출 - assert mock_upsert.call_count == 5 - - @pytest.mark.asyncio - async def test_bulk_upsert_posts_failure(self, scraper, user): - """Post 객체 배치 분할 삽입 또는 업데이트 실패 테스트""" - posts_data = [ - { - "id": str(uuid.uuid4()), - "title": f"Title {i}", - "url_slug": f"slug-{i}", - "released_at": get_local_now(), - } - for i in range(10) - ] - - # 실제 예외를 발생시키는 비동기 함수 생성 - async def mock_async_error(*args, **kwargs): - raise Exception("DB 에러") - - # sync_to_async가 적절한 비동기 함수를 반환하도록 패치 - with patch("scraping.main.sync_to_async") as mock_sync_to_async: - mock_sync_to_async.return_value = mock_async_error - result = await scraper.bulk_upsert_posts( - user, posts_data, batch_size=5 - ) - - assert result is False - mock_sync_to_async.assert_called() - - @pytest.mark.asyncio - @pytest.mark.django_db - async def test_upsert_batch_creates_and_updates(self, scraper): - """ - _upsert_batch 메서드가 기존 게시물을 업데이트하고, 신규 게시물을 생성하는지 검증합니다. - """ - test_user = await sync_to_async(User.objects.create)( - velog_uuid=uuid.uuid4(), - access_token="test-access-token", - refresh_token="test-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - # 기존 게시물 생성 (sync_to_async로 감싸줌) - existing_post_uuid = str(uuid.uuid4()) - original_title = "Original Title" - original_slug = "original-slug" - original_time = get_local_now() - await sync_to_async(Post.objects.create)( - post_uuid=existing_post_uuid, - title=original_title, - user=test_user, - slug=original_slug, - released_at=original_time, - ) - - # 배치 데이터 준비: 기존 게시물 업데이트용과 신규 게시물 생성용 데이터 포함 - updated_title = "Updated Title" - updated_slug = "updated-slug" - updated_time = get_local_now() - - new_post_uuid = str(uuid.uuid4()) - new_title = "New Title" - new_slug = "new-slug" - new_time = get_local_now() - - batch_posts = [ - { - "id": existing_post_uuid, - "title": updated_title, - "url_slug": updated_slug, - "released_at": updated_time, - }, - { - "id": new_post_uuid, - "title": new_title, - "url_slug": new_slug, - "released_at": new_time, - }, - ] - - # _upsert_batch 호출 - await scraper._upsert_batch(test_user, batch_posts) - - # 기존 게시물 업데이트 확인 (sync_to_async 사용) - updated_post = await sync_to_async(Post.objects.get)( - post_uuid=existing_post_uuid - ) - assert updated_post.title == updated_title - assert updated_post.slug == updated_slug - assert updated_post.released_at == updated_time - - # 신규 게시물 생성 확인 (sync_to_async 사용) - new_post = await sync_to_async(Post.objects.get)( - post_uuid=new_post_uuid - ) - assert new_post.title == new_title - assert new_post.slug == new_slug - assert new_post.released_at == new_time - - # user 관계 필드를 직접 비교하지 말고 ID로 비교 - new_post_user_id = await sync_to_async(lambda: new_post.user_id)() - assert new_post_user_id == test_user.id - - @pytest.mark.asyncio - async def test_update_daily_statistics_success(self, scraper): - """데일리 통계 업데이트 또는 생성 성공 테스트""" - post_data = {"id": "post-123"} - stats_data = {"data": {"getStats": {"total": 100}}, "likes": 5} - - with patch( - "scraping.main.sync_to_async", new_callable=MagicMock - ) as mock_sync_to_async: - mock_async_func = AsyncMock() - mock_sync_to_async.return_value = mock_async_func - - await scraper.update_daily_statistics(post_data, stats_data) - - mock_sync_to_async.assert_called() - mock_async_func.assert_called_once() - - for call_args in mock_sync_to_async.call_args_list: - args, kwargs = call_args - - assert callable(args[0]) - - if kwargs: - assert "post-123" in str(kwargs.get("post_data", "")) - assert 100 in str(kwargs.get("stats_data", "")) - - @patch("scraping.main.sync_to_async", new_callable=MagicMock) - @pytest.mark.asyncio - async def test_update_daily_statistics_exception( - self, mock_sync_to_async, scraper - ): - """데일리 통계 업데이트 실패 테스트""" - post_data = {"id": "post-123"} - stats_data = {"data": {"getStats": {"total": 100}}, "likes": 5} - - mock_async_func = AsyncMock(side_effect=Exception("Database error")) - mock_sync_to_async.return_value = mock_async_func - - try: - await scraper.update_daily_statistics(post_data, stats_data) - except Exception: - pass - - mock_sync_to_async.assert_called() - mock_async_func.assert_called_once() - - @patch("scraping.main.fetch_post_stats") - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_success(self, mock_fetch, scraper): - """fetch_post_stats 성공 테스트""" - mock_fetch.side_effect = [ - None, - None, - {"data": {"getStats": {"total": 100}}}, - ] - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is not None - mock_fetch.assert_called() - assert mock_fetch.call_count == 3 - - for call_args in mock_fetch.call_args_list: - args, kwargs = call_args - assert "post-123" in str(args) or "post-123" in str(kwargs) - assert ( - "token-1" in str(args) - or "token-1" in str(kwargs) - or "token-2" in str(args) - or "token-2" in str(kwargs) - ) - - @patch("scraping.main.fetch_post_stats") - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_max_retries( - self, mock_fetch, scraper - ): - """최대 재시도 횟수 초과 테스트""" - mock_fetch.return_value = None - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is None - assert mock_fetch.call_count >= 3 - - @patch("scraping.main.fetch_post_stats", new_callable=AsyncMock) - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_failure(self, mock_fetch, scraper): - """fetch_post_stats 실패 테스트""" - mock_fetch.side_effect = [None, None, None] - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is None - assert mock_fetch.call_count == 3 - - @patch("scraping.main.fetch_velog_user_chk") - @patch("scraping.main.fetch_all_velog_posts") - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_process_user_success( - self, mock_aes, mock_fetch_posts, mock_fetch_user_chk, scraper, user - ): - """유저 데이터 전체 처리 성공 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - mock_fetch_user_chk.return_value = ( - {"access_token": "new-token"}, - {"data": {"currentUser": {"username": "testuser"}}}, - ) - mock_fetch_posts.return_value = [] - - with patch.object( - scraper, "update_old_tokens", new_callable=AsyncMock - ) as mock_update_tokens: - await scraper.process_user(user, MagicMock()) - - mock_update_tokens.assert_called_once() - - @patch("scraping.main.transaction.atomic") - @pytest.mark.django_db(transaction=True) - @pytest.mark.asyncio - async def test_process_user_failure_rollback( - self, mock_atomic, scraper, user - ): - """유저 데이터 처리 실패 시 롤백 확인 테스트""" - mock_session = AsyncMock() - mock_atomic.side_effect = transaction.atomic - - with patch( - "scraping.main.fetch_velog_user_chk", - side_effect=Exception("Failed to fetch user data"), - ): - with pytest.raises(Exception): - await scraper.process_user(user, mock_session) - - # 동기 쿼리를 비동기로 변환 - count = await sync_to_async(Post.objects.filter(user=user).count)() - assert count == 0 - - @pytest.mark.django_db(transaction=True) - @pytest.mark.asyncio - async def test_process_user_partial_failure_rollback(self, scraper, user): - """통계 업데이트 중 실패 시 롤백 확인 테스트""" - mock_session = AsyncMock() - post_uuid = uuid.uuid4() - - # 테스트용 게시물 직접 생성 - await sync_to_async(Post.objects.create)( - post_uuid=post_uuid, - user=user, - title="Test Post", - slug="test-slug", - released_at=get_local_now(), - ) - - # fetch_post_stats_limited 메서드에서 예외 발생시키기 - with patch.object( - scraper, - "fetch_post_stats_limited", - side_effect=Exception("Failed to fetch stats"), - ): - # bulk_upsert_posts 성공하도록 모킹 - with patch.object( - scraper, "bulk_upsert_posts", new_callable=AsyncMock - ) as mock_bulk_upsert: - mock_bulk_upsert.return_value = True - - # 다른 필요한 API 호출도 모킹 - with ( - patch( - "scraping.apis.fetch_velog_user_chk", - new_callable=AsyncMock, - ) as mock_user_chk, - patch( - "scraping.apis.fetch_all_velog_posts", - new_callable=AsyncMock, - ) as mock_posts, - ): - # 사용자 정보 모킹 - mock_user_chk.return_value = ( - {}, - {"data": {"currentUser": {"username": "testuser"}}}, - ) - - # 게시물 정보 모킹 - mock_posts.return_value = [ - { - "id": post_uuid, - "title": "Test Post", - "url_slug": "test-slug", - "released_at": get_local_now(), - } - ] - - # 예외를 처리하지만 게시물은 여전히 존재해야 함 - try: - await scraper.process_user(user, mock_session) - except Exception: - pass - - # 게시물이 존재하는지 확인 (sync_to_async로 래핑) - exists_func = sync_to_async( - lambda: Post.objects.filter(user=user).exists() - ) - exists = await exists_func() - assert exists - - # 통계 정보가 없는지 확인 - has_stats_func = sync_to_async( - lambda: PostDailyStatistics.objects.filter( - post__user=user - ).exists() - ) - has_stats = await has_stats_func() - assert not has_stats diff --git a/scraping/tests/test_main_posts.py b/scraping/tests/test_main_posts.py new file mode 100644 index 0000000..83ac813 --- /dev/null +++ b/scraping/tests/test_main_posts.py @@ -0,0 +1,276 @@ +import uuid +from unittest.mock import AsyncMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from posts.models import Post +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperPosts: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @pytest.mark.asyncio + async def test_bulk_upsert_posts_success(self, scraper, user): + """Post 객체 배치 분할 삽입 또는 업데이트 성공 테스트""" + posts_data = [ + { + "id": str(uuid.uuid4()), + "title": f"Title {i}", + "url_slug": f"slug-{i}", + "released_at": get_local_now(), + } + for i in range(50) + ] + + # _upsert_batch 메서드만 모킹 + with patch.object( + scraper, "_upsert_batch", new_callable=AsyncMock + ) as mock_upsert: + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await scraper.bulk_upsert_posts( + user, posts_data, batch_size=10 + ) + + assert result is True + # 50개 / 10개 배치 = 5번 호출 + assert mock_upsert.call_count == 5 + + @pytest.mark.asyncio + async def test_bulk_upsert_posts_failure(self, scraper, user): + """Post 객체 배치 분할 삽입 또는 업데이트 실패 테스트""" + posts_data = [ + { + "id": str(uuid.uuid4()), + "title": f"Title {i}", + "url_slug": f"slug-{i}", + "released_at": get_local_now(), + } + for i in range(10) + ] + + # 실제 예외를 발생시키는 비동기 함수 생성 + async def mock_async_error(*args, **kwargs): + raise Exception("DB 에러") + + # sync_to_async가 적절한 비동기 함수를 반환하도록 패치 + with patch("scraping.main.sync_to_async") as mock_sync_to_async: + mock_sync_to_async.return_value = mock_async_error + result = await scraper.bulk_upsert_posts( + user, posts_data, batch_size=5 + ) + + assert result is False + mock_sync_to_async.assert_called() + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_upsert_batch_creates_and_updates(self, scraper): + """ + _upsert_batch 메서드가 기존 게시물을 업데이트하고, 신규 게시물을 생성하는지 검증합니다. + """ + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 기존 게시물 생성 (sync_to_async로 감싸줌) + existing_post_uuid = str(uuid.uuid4()) + original_title = "Original Title" + original_slug = "original-slug" + original_time = get_local_now() + await sync_to_async(Post.objects.create)( + post_uuid=existing_post_uuid, + title=original_title, + user=test_user, + slug=original_slug, + released_at=original_time, + ) + + # 배치 데이터 준비: 기존 게시물 업데이트용과 신규 게시물 생성용 데이터 포함 + updated_title = "Updated Title" + updated_slug = "updated-slug" + updated_time = get_local_now() + + new_post_uuid = str(uuid.uuid4()) + new_title = "New Title" + new_slug = "new-slug" + new_time = get_local_now() + + batch_posts = [ + { + "id": existing_post_uuid, + "title": updated_title, + "url_slug": updated_slug, + "released_at": updated_time, + }, + { + "id": new_post_uuid, + "title": new_title, + "url_slug": new_slug, + "released_at": new_time, + }, + ] + + # _upsert_batch 호출 + await scraper._upsert_batch(test_user, batch_posts) + + # 기존 게시물 업데이트 확인 (sync_to_async 사용) + updated_post = await sync_to_async(Post.objects.get)( + post_uuid=existing_post_uuid + ) + assert updated_post.title == updated_title + assert updated_post.slug == updated_slug + assert updated_post.released_at == updated_time + + # 신규 게시물 생성 확인 (sync_to_async 사용) + new_post = await sync_to_async(Post.objects.get)( + post_uuid=new_post_uuid + ) + assert new_post.title == new_title + assert new_post.slug == new_slug + assert new_post.released_at == new_time + + # user 관계 필드를 직접 비교하지 말고 ID로 비교 + new_post_user_id = await sync_to_async(lambda: new_post.user_id)() + assert new_post_user_id == test_user.id + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_sync_post_active_status(self, scraper): + """sync_post_active_status 메서드 테스트""" + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 다양한 게시물 생성 - 활성 및 비활성 + post_uuid1 = str(uuid.uuid4()) + post_uuid2 = str(uuid.uuid4()) + post_uuid3 = str(uuid.uuid4()) + post_uuid4 = str(uuid.uuid4()) + + # 활성 게시물 2개 + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid1, + title="Active Post 1", + user=test_user, + slug="active-post-1", + released_at=get_local_now(), + is_active=True, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid2, + title="Active Post 2", + user=test_user, + slug="active-post-2", + released_at=get_local_now(), + is_active=True, + ) + + # 비활성 게시물 2개 + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid3, + title="Inactive Post 1", + user=test_user, + slug="inactive-post-1", + released_at=get_local_now(), + is_active=False, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid4, + title="Inactive Post 2", + user=test_user, + slug="inactive-post-2", + released_at=get_local_now(), + is_active=False, + ) + + # 현재 API에서 가져온 게시물 ID 집합 (post1과 post3만 포함) + current_post_ids = {post_uuid1, post_uuid3} + + # sync_post_active_status 메서드 호출 + await scraper.sync_post_active_status(test_user, current_post_ids) + + # 결과 확인: post1 = 활성 유지, post2 = 비활성으로 변경, post3 = 활성화로 변경, post4 = 비활성 유지 + post1 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid1) + assert post1.is_active is True, "post1 should remain active" + + post2 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid2) + assert post2.is_active is False, "post2 should be deactivated" + + post3 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid3) + assert post3.is_active is True, "post3 should be reactivated" + + post4 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid4) + assert post4.is_active is False, "post4 should remain inactive" + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_sync_post_active_status_safety_threshold(self, scraper): + """sync_post_active_status의 안전 임계값 테스트""" + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 10개의 활성 게시물 생성 + post_uuids = [str(uuid.uuid4()) for _ in range(10)] + + for i, post_uuid in enumerate(post_uuids): + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid, + title=f"Active Post {i}", + user=test_user, + slug=f"active-post-{i}", + released_at=get_local_now(), + is_active=True, + ) + + # 현재 API에서 가져온 게시물 ID 집합 (10개 중 2개만 포함 = 80% 비활성화 예정) + # 이는 scraper.py의 안전 임계값(50%)을 초과 + current_post_ids = {post_uuids[0], post_uuids[1]} + + # sync_post_active_status 메서드 호출 + await scraper.sync_post_active_status(test_user, current_post_ids) + + # 모든 게시물이 여전히 활성 상태여야 함 (안전 임계값으로 인해 작업이 수행되지 않음) + for post_uuid in post_uuids: + post = await sync_to_async(Post.objects.get)(post_uuid=post_uuid) + assert ( + post.is_active is True + ), f"Post {post_uuid} should remain active due to safety threshold" diff --git a/scraping/tests/test_main_statistics.py b/scraping/tests/test_main_statistics.py new file mode 100644 index 0000000..c9512ff --- /dev/null +++ b/scraping/tests/test_main_statistics.py @@ -0,0 +1,86 @@ +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from posts.models import Post, PostDailyStatistics +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperStatistics: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @pytest.mark.asyncio + async def test_update_daily_statistics_success(self, scraper): + """데일리 통계 업데이트 또는 생성 성공 테스트""" + post_data = {"id": "post-123", "likes": 10} + stats_data = {"data": {"getStats": {"total": 100}}} + + with patch( + "scraping.main.sync_to_async", new_callable=MagicMock + ) as mock_sync_to_async: + mock_async_func = AsyncMock() + mock_sync_to_async.return_value = mock_async_func + + await scraper.update_daily_statistics(post_data, stats_data) + + mock_sync_to_async.assert_called() + mock_async_func.assert_called_once() + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_update_daily_statistics_integration(self, scraper): + """데일리 통계 업데이트 통합 테스트""" + # 테스트 사용자 및 게시물 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + post_uuid = str(uuid.uuid4()) + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid, + title="Test Post", + user=test_user, + slug="test-post", + released_at=get_local_now(), + is_active=True, + ) + + # 통계 데이터 준비 + post_data = {"id": post_uuid, "likes": 25} + stats_data = {"data": {"getStats": {"total": 150}}} + + # update_daily_statistics 호출 + await scraper.update_daily_statistics(post_data, stats_data) + + # 결과 확인 + today = get_local_now().date() + stats = await sync_to_async(PostDailyStatistics.objects.get)( + post__post_uuid=post_uuid, date=today + ) + + assert stats.daily_view_count == 150 + assert stats.daily_like_count == 25 diff --git a/scraping/tests/test_main_tokens.py b/scraping/tests/test_main_tokens.py new file mode 100644 index 0000000..1bb720d --- /dev/null +++ b/scraping/tests/test_main_tokens.py @@ -0,0 +1,327 @@ +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperTokenAndProcessing: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_success( + self, mock_aes, scraper, user + ) -> None: + """토큰 업데이트 성공 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "new-access-token", + "refresh_token": "new-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is True + mock_asave.assert_called_once() + assert user.access_token == "encrypted-new-access-token" + assert user.refresh_token == "encrypted-new-refresh-token" + mock_encryption.decrypt.assert_any_call("encrypted-access-token") + mock_encryption.decrypt.assert_any_call("encrypted-refresh-token") + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_no_change( + self, mock_aes, scraper, user + ) -> None: + """토큰 업데이트 없음 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = lambda token: token + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "encrypted-access-token", + "refresh_token": "encrypted-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_expired_failure( + self, mock_aes, scraper, user + ): + """토큰이 만료되었을 때 업데이트 실패 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "decrypted-encrypted-access-token", + "refresh_token": "decrypted-encrypted-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_with_mocked_decryption_failure( + self, mock_aes, scraper, user + ): + """복호화가 제대로 되지 않았을 경우 업데이트 실패 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = lambda token: None + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "valid_token", + "refresh_token": "valid_token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_success(self, mock_fetch, scraper): + """fetch_post_stats 성공 테스트""" + mock_fetch.return_value = {"data": {"getStats": {"total": 100}}} + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is not None + assert result["data"]["getStats"]["total"] == 100 + mock_fetch.assert_called_once_with("post-123", "token-1", "token-2") + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_retry_success( + self, mock_fetch, scraper + ): + """fetch_post_stats 재시도 성공 테스트""" + mock_fetch.side_effect = [ + None, # 첫 번째 시도 실패 + {"data": {"getStats": {"total": 100}}}, # 두 번째 시도 성공 + ] + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is not None + assert result["data"]["getStats"]["total"] == 100 + assert mock_fetch.call_count == 2 + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_max_retries( + self, mock_fetch, scraper + ): + """최대 재시도 횟수 초과 테스트""" + mock_fetch.return_value = None + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is None + assert mock_fetch.call_count == 3 # 최대 3번 재시도 + + @patch("scraping.main.fetch_velog_user_chk") + @patch("scraping.main.fetch_all_velog_posts") + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_process_user_success( + self, mock_aes, mock_fetch_posts, mock_fetch_user_chk, scraper, user + ): + """유저 데이터 전체 처리 성공 테스트""" + # AES 암호화 모킹 + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + # 사용자 데이터 및 게시물 모킹 + mock_fetch_user_chk.return_value = ( + { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + }, + {"data": {"currentUser": {"username": "testuser"}}}, + ) + + post_uuid = str(uuid.uuid4()) + mock_fetch_posts.return_value = [ + { + "id": post_uuid, + "title": "Test Post", + "url_slug": "test-post", + "released_at": get_local_now(), + "likes": 15, + } + ] + + # 내부 메서드 모킹 + with ( + patch.object( + scraper, "update_old_tokens", new_callable=AsyncMock + ) as mock_update_tokens, + patch.object( + scraper, "bulk_upsert_posts", new_callable=AsyncMock + ) as mock_bulk_upsert, + patch.object( + scraper, "sync_post_active_status", new_callable=AsyncMock + ) as mock_sync_status, + patch.object( + scraper, "fetch_post_stats_limited", new_callable=AsyncMock + ) as mock_fetch_stats, + patch.object( + scraper, "update_daily_statistics", new_callable=AsyncMock + ) as mock_update_stats, + ): + # 통계 데이터 모킹 + mock_fetch_stats.return_value = { + "data": {"getStats": {"total": 150}} + } + + # 테스트 실행 + await scraper.process_user(user, AsyncMock()) + + # 메서드 호출 확인 + mock_update_tokens.assert_called_once() + mock_bulk_upsert.assert_called_once() + mock_sync_status.assert_called_once() + mock_fetch_stats.assert_called_once() + mock_update_stats.assert_called_once() + + @patch("scraping.main.logger") + @pytest.mark.asyncio + async def test_run_method(self, mock_logger, scraper): + """run 메서드 테스트""" + # 테스트용 사용자 객체 생성 + test_user = AsyncMock() + + # 비동기 이터레이터를 반환하는 모킹 함수 생성 + async def async_mock_filter(*args, **kwargs): + for user in [test_user]: + yield user + + # User.objects.filter 모킹 + with patch("users.models.User.objects.filter") as mock_filter: + # 비동기 이터레이터를 반환하도록 설정 + mock_filter.return_value = async_mock_filter() + + # aiohttp.ClientSession 모킹 + with patch("aiohttp.ClientSession") as mock_session: + mock_session_instance = MagicMock() + mock_session.return_value.__aenter__.return_value = ( + mock_session_instance + ) + + # process_user 메서드 모킹 + with patch.object( + scraper, "process_user", new_callable=AsyncMock + ) as mock_process: + # 실행 + await scraper.run() + + # 로그 및 메서드 호출 확인 + assert mock_logger.info.call_count >= 2 # 시작과 종료 로그 + mock_process.assert_called_once_with(test_user, mock_session_instance) + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_scraper_target_user_run(self): + """ScraperTargetUser 클래스의 run 메서드 테스트""" + from scraping.main import ScraperTargetUser + + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # ScraperTargetUser 인스턴스 생성 + target_scraper = ScraperTargetUser( + user_pk_list=[test_user.id], max_connections=10 + ) + + # process_user 메서드 모킹 + with patch.object( + target_scraper, "process_user", new_callable=AsyncMock + ) as mock_process: + # aiohttp.ClientSession 모킹 + with patch("aiohttp.ClientSession") as mock_session: + mock_session_instance = MagicMock() + mock_session.return_value.__aenter__.return_value = ( + mock_session_instance + ) + + # 실행 + await target_scraper.run() + + # process_user 호출 확인 + mock_process.assert_called_once()