From f64e02b4a199c94fb6aa9a95b73c3cc00ed4c43e Mon Sep 17 00:00:00 2001 From: Nuung Date: Sat, 19 Apr 2025 16:05:58 +0900 Subject: [PATCH 1/4] =?UTF-8?q?feature:=20=EB=B0=B0=EC=B9=98=20=EA=B3=BC?= =?UTF-8?q?=EC=A0=95=20=EC=A4=91=20=EC=82=AD=EC=A0=9C=20=ED=8C=90=EB=8B=A8?= =?UTF-8?q?=EB=90=98=EB=8A=94=20=EA=B2=8C=EC=8B=9C=EA=B8=80=EC=9D=84=20?= =?UTF-8?q?=EB=B9=84=ED=99=9C=EC=84=B1=20=EC=B2=98=EB=A6=AC,=20=EC=98=A4?= =?UTF-8?q?=ED=8C=90=EB=8B=A8=EB=90=9C=20=EB=B9=84=ED=99=9C=EC=84=B1?= =?UTF-8?q?=EC=9D=80=20=EB=8B=A4=EC=8B=9C=20=EC=82=B4=EB=A6=B4=20=EC=88=98?= =?UTF-8?q?=20=EC=9E=88=EB=8A=94=20=EB=A1=9C=EC=A7=81=EB=8F=84=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scraping/main.py | 70 ++++++++++++++++++--- scraping/tests/test_main.py | 120 ++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 7 deletions(-) diff --git a/scraping/main.py b/scraping/main.py index 25c3d88..435c1c2 100644 --- a/scraping/main.py +++ b/scraping/main.py @@ -5,7 +5,6 @@ import async_timeout import environ import sentry_sdk - from asgiref.sync import sync_to_async from django.db import transaction @@ -104,23 +103,51 @@ async def _upsert_batch( def _execute_transaction() -> None: with transaction.atomic(): post_uuids = [post["id"] for post in batch_posts] - existing_posts = { + + # 현재 유저의 모든 게시글 가져오기 (active와 inactive 모두) + all_user_posts = { str(post.post_uuid): post - for post in Post.objects.filter(post_uuid__in=post_uuids) + for post in Post.objects.filter(user=user) + } + + # 현재 활성화된 게시글만 가져오기 + existing_active_posts = { + uuid: post + for uuid, post in all_user_posts.items() + if post.is_active and uuid in post_uuids + } + + # 비활성화된 게시글 가져오기 (재활성화 여부 확인용) + inactive_posts = { + uuid: post + for uuid, post in all_user_posts.items() + if not post.is_active and uuid in post_uuids } posts_to_create = [] posts_to_update = [] + posts_to_reactivate = [] + # 현재 데이터의 각 게시글에 대해 처리 for post_data in batch_posts: post_uuid = post_data["id"] - if post_uuid in existing_posts: - post = existing_posts[post_uuid] + if post_uuid in existing_active_posts: + # 이미 활성화된 게시글 업데이트 + post = existing_active_posts[post_uuid] post.title = post_data["title"] post.slug = post_data["url_slug"] post.released_at = post_data["released_at"] posts_to_update.append(post) + elif post_uuid in inactive_posts: + # 비활성화된 게시글 재활성화 + post = inactive_posts[post_uuid] + post.title = post_data["title"] + post.slug = post_data["url_slug"] + post.released_at = post_data["released_at"] + post.is_active = True + posts_to_reactivate.append(post) else: + # 새 게시글 생성 posts_to_create.append( Post( post_uuid=post_uuid, @@ -128,16 +155,45 @@ def _execute_transaction() -> None: user=user, slug=post_data["url_slug"], released_at=post_data["released_at"], + is_active=True, ) ) + # 과거 활성화 게시글 중 현재 데이터에 없는 게시글은 비활성화 + active_posts_to_deactivate = [] + current_post_uuids = set( + post_data["id"] for post_data in batch_posts + ) + + for post_uuid, post in all_user_posts.items(): + if post.is_active and post_uuid not in current_post_uuids: + post.is_active = False + active_posts_to_deactivate.append(post) + + # 일괄 업데이트 및 생성 수행 if posts_to_update: Post.objects.bulk_update( - posts_to_update, ["title", "slug", "released_at"] + posts_to_update, + ["title", "slug", "released_at"], + batch_size=200, + ) + + if posts_to_reactivate: + Post.objects.bulk_update( + posts_to_reactivate, + ["title", "slug", "released_at", "is_active"], + batch_size=200, + ) + + if active_posts_to_deactivate: + Post.objects.bulk_update( + active_posts_to_deactivate, + ["is_active"], + batch_size=200, ) if posts_to_create: - Post.objects.bulk_create(posts_to_create) + Post.objects.bulk_create(posts_to_create, batch_size=200) await _execute_transaction() diff --git a/scraping/tests/test_main.py b/scraping/tests/test_main.py index 8a56ab3..557f8c1 100644 --- a/scraping/tests/test_main.py +++ b/scraping/tests/test_main.py @@ -266,6 +266,126 @@ async def test_upsert_batch_creates_and_updates(self, scraper): new_post_user_id = await sync_to_async(lambda: new_post.user_id)() assert new_post_user_id == test_user.id + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_upsert_batch_deactivates_old_posts(self, scraper): + """ + _upsert_batch 메서드가 더 이상 API에서 반환되지 않는 게시물을 비활성화하는지 검증합니다. + """ + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 기존 활성화 상태의 게시물 3개 생성 + post_uuid1 = str(uuid.uuid4()) + post_uuid2 = str(uuid.uuid4()) + post_uuid3 = str(uuid.uuid4()) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid1, + title="Post 1", + user=test_user, + slug="post-1", + released_at=get_local_now(), + is_active=True, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid2, + title="Post 2", + user=test_user, + slug="post-2", + released_at=get_local_now(), + is_active=True, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid3, + title="Post 3", + user=test_user, + slug="post-3", + released_at=get_local_now(), + is_active=True, + ) + + # 새로운 배치 데이터에는 post_uuid1만 포함 (나머지는 비활성화되어야 함) + batch_posts = [ + { + "id": post_uuid1, + "title": "Updated Post 1", + "url_slug": "updated-post-1", + "released_at": get_local_now(), + } + ] + + # _upsert_batch 호출 + await scraper._upsert_batch(test_user, batch_posts) + + # 게시물 상태 확인 + # post_uuid1은 여전히 활성 상태여야 함 + post1 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid1) + assert post1.is_active is True + assert post1.title == "Updated Post 1" + + # post_uuid2와 post_uuid3는 비활성 상태로 변경되어야 함 + post2 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid2) + assert post2.is_active is False + + post3 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid3) + assert post3.is_active is False + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_upsert_batch_reactivates_inactive_posts(self, scraper): + """ + _upsert_batch 메서드가 비활성화된 게시물이 다시 API 결과에 포함될 경우 재활성화하는지 검증합니다. + """ + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 비활성화된 게시물 생성 + inactive_post_uuid = str(uuid.uuid4()) + await sync_to_async(Post.objects.create)( + post_uuid=inactive_post_uuid, + title="Inactive Post", + user=test_user, + slug="inactive-post", + released_at=get_local_now(), + is_active=False, + ) + + # 배치 데이터에 비활성화된 게시물 포함 + batch_posts = [ + { + "id": inactive_post_uuid, + "title": "Reactivated Post", + "url_slug": "reactivated-post", + "released_at": get_local_now(), + } + ] + + # _upsert_batch 호출 + await scraper._upsert_batch(test_user, batch_posts) + + # 게시물 상태 확인 - 재활성화되어야 함 + post = await sync_to_async(Post.objects.get)( + post_uuid=inactive_post_uuid + ) + assert post.is_active is True + assert post.title == "Reactivated Post" + assert post.slug == "reactivated-post" + @pytest.mark.asyncio async def test_update_daily_statistics_success(self, scraper): """데일리 통계 업데이트 또는 생성 성공 테스트""" From 00b3a7e6db387e1eeb5a3390c5b292bfae930f9e Mon Sep 17 00:00:00 2001 From: Nuung Date: Sat, 19 Apr 2025 17:03:59 +0900 Subject: [PATCH 2/4] =?UTF-8?q?feature:=20=EC=9A=B4=EC=98=81=EA=B3=84=20?= =?UTF-8?q?=EC=97=90=EC=84=9C=20=EB=A1=9C=EC=BB=AC=EB=A1=9C,=20=EB=8D=B0?= =?UTF-8?q?=EC=9D=B4=ED=84=B0=20=EB=A7=88=EC=9D=B4=EA=B7=B8=EB=A0=88?= =?UTF-8?q?=EC=9D=B4=EC=85=98=20=ED=95=98=EB=8A=94=20=EC=BD=94=EB=93=9C=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backoffice/settings/local.py | 2 + modules/data_migrate/__init__.py | 0 modules/data_migrate/posts_migration.py | 101 +++++++++++++++++++ modules/data_migrate/statistics_migration.py | 94 +++++++++++++++++ modules/data_migrate/users_migration.py | 87 ++++++++++++++++ 5 files changed, 284 insertions(+) create mode 100644 modules/data_migrate/__init__.py create mode 100644 modules/data_migrate/posts_migration.py create mode 100644 modules/data_migrate/statistics_migration.py create mode 100644 modules/data_migrate/users_migration.py diff --git a/backoffice/settings/local.py b/backoffice/settings/local.py index 68ade3e..567f42a 100644 --- a/backoffice/settings/local.py +++ b/backoffice/settings/local.py @@ -8,3 +8,5 @@ ] CORS_ALLOW_ALL_ORIGINS = True + +# DATABASES["prod"] = env.db() # noqa: F405 diff --git a/modules/data_migrate/__init__.py b/modules/data_migrate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/data_migrate/posts_migration.py b/modules/data_migrate/posts_migration.py new file mode 100644 index 0000000..71ecc3a --- /dev/null +++ b/modules/data_migrate/posts_migration.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +""" +게시글 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 Post 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from posts.models import Post +from users.models import User + +print("게시글 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 + chunk_size = 500 + offset = 0 + total_migrated = 0 + success_count = 0 + update_count = 0 + skipped_count = 0 + + while True: + # 원격 DB에서 게시글 데이터를 청크 단위로 가져오기 + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT p.id, p.created_at, p.updated_at, p.post_uuid, p.user_id, + p.title, p.is_active, p.slug, p.released_at, u.velog_uuid + FROM posts_post p + JOIN users_user u ON p.user_id = u.id + ORDER BY p.id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + posts = cursor.fetchall() + + if not posts: + break + + print(f"게시글 데이터 {len(posts)}개 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for post in posts: + # 게시글이 이미 로컬에 존재하는지 확인 + existing_post = Post.objects.filter(post_uuid=post[3]).first() + + # velog_uuid를 이용해 로컬 사용자 찾기 + velog_uuid = post[9] # velog_uuid는 10번째 컬럼 + + try: + user = User.objects.get(velog_uuid=velog_uuid) + except User.DoesNotExist: + print( + f"UUID {velog_uuid}의 사용자가 로컬에 존재하지 않습니다. 게시글 {post[3]} 건너뜁니다." + ) + skipped_count += 1 + continue + + if existing_post: + print( + f"UUID {post[3]}의 게시글이 이미 존재합니다. 정보를 업데이트합니다..." + ) + # 기존 게시글 정보 업데이트 + existing_post.title = post[5] + existing_post.is_active = post[6] + existing_post.slug = post[7] + existing_post.released_at = post[8] + existing_post.save() + update_count += 1 + else: + print(f"UUID {post[3]}의 새 게시글을 생성합니다.") + # 새 게시글 생성 + Post.objects.create( + id=post[0], + created_at=post[1], + updated_at=post[2], + post_uuid=post[3], + user=user, + title=post[5], + is_active=post[6], + slug=post[7], + released_at=post[8], + ) + success_count += 1 + + total_migrated += len(posts) + offset += chunk_size + print( + f"현재까지 {total_migrated}개의 게시글을 마이그레이션했습니다..." + ) + + print( + f"게시글 마이그레이션이 완료되었습니다. 새로 생성: {success_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" + ) + print(f"총 처리된 레코드: {total_migrated}개") + +except Exception as e: + print(f"게시글 마이그레이션 중 오류 발생: {e}") diff --git a/modules/data_migrate/statistics_migration.py b/modules/data_migrate/statistics_migration.py new file mode 100644 index 0000000..acb23ac --- /dev/null +++ b/modules/data_migrate/statistics_migration.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +""" +게시글 통계 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 PostDailyStatistics 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from posts.models import Post, PostDailyStatistics + +print("게시글 통계 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 + chunk_size = 1000 + offset = 0 + total_migrated = 0 + new_count = 0 + update_count = 0 + skipped_count = 0 + + while True: + # 원격 DB에서 통계 데이터를 청크 단위로 가져오기 (post_uuid와 함께) + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT pds.id, pds.created_at, pds.updated_at, pds.post_id, pds.date, + pds.daily_view_count, pds.daily_like_count, p.post_uuid + FROM posts_postdailystatistics pds + JOIN posts_post p ON pds.post_id = p.id + ORDER BY pds.id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + stats_chunk = cursor.fetchall() + + if not stats_chunk: + break + + print(f"통계 데이터 {len(stats_chunk)}개 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for stat in stats_chunk: + # post_uuid를 이용해 로컬 게시글 찾기 + post_uuid = stat[7] # post_uuid는 8번째 컬럼 + + try: + post = Post.objects.get(post_uuid=post_uuid) + except Post.DoesNotExist: + print( + f"UUID {post_uuid}의 게시글이 로컬에 존재하지 않습니다. 통계 {stat[0]} 건너뜁니다." + ) + skipped_count += 1 + continue + + # 통계가 이미 로컬에 존재하는지 확인 (post 객체와 date로 찾기) + existing_stat = PostDailyStatistics.objects.filter( + post=post, date=stat[4] + ).first() + + if existing_stat: + # 기존 통계 정보 업데이트 + existing_stat.daily_view_count = stat[5] + existing_stat.daily_like_count = stat[6] + existing_stat.save() + update_count += 1 + else: + # 새 통계 생성 + PostDailyStatistics.objects.create( + id=stat[0], + created_at=stat[1], + updated_at=stat[2], + post=post, # 로컬 post 객체 사용 + date=stat[4], + daily_view_count=stat[5], + daily_like_count=stat[6], + ) + new_count += 1 + + total_migrated += len(stats_chunk) + offset += chunk_size + print( + f"현재까지 {total_migrated}개의 통계 데이터를 마이그레이션했습니다..." + ) + + print( + f"게시글 통계 마이그레이션이 완료되었습니다. 새로 생성: {new_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" + ) + print(f"총 처리된 레코드: {total_migrated}개") + +except Exception as e: + print(f"게시글 통계 마이그레이션 중 오류 발생: {e}") diff --git a/modules/data_migrate/users_migration.py b/modules/data_migrate/users_migration.py new file mode 100644 index 0000000..c641715 --- /dev/null +++ b/modules/data_migrate/users_migration.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +""" +사용자 데이터 마이그레이션 스크립트 + +원격/운영 데이터베이스에서 로컬 데이터베이스로 User 테이블 데이터를 이관합니다. +""" + +from django.db import connections, transaction + +from users.models import User + +print("사용자 마이그레이션을 시작합니다...") + +try: + # 청크 크기 설정 (메모리 문제 방지를 위해) + chunk_size = 500 + offset = 0 + total_migrated = 0 + success_count = 0 + update_count = 0 + + while True: + # 원격 DB에서 사용자 데이터를 청크 단위로 가져오기 + with connections["prod"].cursor() as cursor: + cursor.execute( + f""" + SELECT id, created_at, updated_at, velog_uuid, access_token, + refresh_token, group_id, email, is_active + FROM users_user + ORDER BY id + LIMIT {chunk_size} OFFSET {offset} + """ + ) + users = cursor.fetchall() + + if not users: + break + + print(f"사용자 데이터 {len(users)}명 처리 중 (오프셋 {offset})...") + + # 로컬 DB에 데이터 삽입 + with transaction.atomic(): + for user in users: + # 사용자가 이미 로컬에 존재하는지 확인 + existing_user = User.objects.filter(velog_uuid=user[3]).first() + + if existing_user: + print( + f"UUID {user[3]}의 사용자가 이미 존재합니다. 정보를 업데이트합니다..." + ) + # 기존 사용자 정보 업데이트 + existing_user.access_token = user[4] + existing_user.refresh_token = user[5] + existing_user.group_id = user[6] + existing_user.email = user[7] + existing_user.is_active = user[8] + existing_user.save() + update_count += 1 + else: + print(f"UUID {user[3]}의 새 사용자를 생성합니다.") + # 새 사용자 생성 + User.objects.create( + id=user[0], + created_at=user[1], + updated_at=user[2], + velog_uuid=user[3], + access_token=user[4], + refresh_token=user[5], + group_id=user[6], + email=user[7], + is_active=user[8], + ) + success_count += 1 + + total_migrated += len(users) + offset += chunk_size + print( + f"현재까지 {total_migrated}명의 사용자를 마이그레이션했습니다..." + ) + + print( + f"사용자 마이그레이션이 완료되었습니다. 새로 생성: {success_count}명, 업데이트: {update_count}명" + ) + print(f"총 처리된 사용자: {total_migrated}명") + +except Exception as e: + print(f"사용자 마이그레이션 중 오류 발생: {e}") From 4b2b311d5925d393ff901de1e8ce1e00b7e2129f Mon Sep 17 00:00:00 2001 From: Nuung Date: Sat, 19 Apr 2025 17:12:37 +0900 Subject: [PATCH 3/4] =?UTF-8?q?modify:=20=EB=A7=88=EC=9D=B4=EA=B7=B8?= =?UTF-8?q?=EB=A0=88=EC=9D=B4=EC=85=98,=20=ED=86=B5=EA=B3=84=EB=8A=94=207?= =?UTF-8?q?=EC=9D=BC=EB=A1=9C=20=EC=BB=B7=ED=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/data_migrate/statistics_migration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/data_migrate/statistics_migration.py b/modules/data_migrate/statistics_migration.py index acb23ac..b19ee1f 100644 --- a/modules/data_migrate/statistics_migration.py +++ b/modules/data_migrate/statistics_migration.py @@ -29,6 +29,7 @@ pds.daily_view_count, pds.daily_like_count, p.post_uuid FROM posts_postdailystatistics pds JOIN posts_post p ON pds.post_id = p.id + WHERE pds.date >= CURRENT_DATE - INTERVAL '7 days' ORDER BY pds.id LIMIT {chunk_size} OFFSET {offset} """ From 1303549868c1c53cfe18f8124e540f9715147ed9 Mon Sep 17 00:00:00 2001 From: Nuung Date: Sat, 19 Apr 2025 17:49:52 +0900 Subject: [PATCH 4/4] =?UTF-8?q?feature:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EA=B7=9C=EB=AA=A8=EA=B0=80=20=EC=BB=A4=EC=A7=90=EC=97=90=20?= =?UTF-8?q?=EB=94=B0=EB=9D=BC=20=EC=A7=91=EC=A4=91=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8C=85=20=EC=84=B9=EC=85=98=EA=B3=BC=20=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/data_migrate/posts_migration.py | 1 - modules/data_migrate/statistics_migration.py | 2 +- modules/data_migrate/users_migration.py | 1 - scraping/main.py | 147 +++-- scraping/tests/test_main.py | 616 ------------------- scraping/tests/test_main_posts.py | 276 +++++++++ scraping/tests/test_main_statistics.py | 86 +++ scraping/tests/test_main_tokens.py | 327 ++++++++++ 8 files changed, 774 insertions(+), 682 deletions(-) delete mode 100644 scraping/tests/test_main.py create mode 100644 scraping/tests/test_main_posts.py create mode 100644 scraping/tests/test_main_statistics.py create mode 100644 scraping/tests/test_main_tokens.py diff --git a/modules/data_migrate/posts_migration.py b/modules/data_migrate/posts_migration.py index 71ecc3a..10f7968 100644 --- a/modules/data_migrate/posts_migration.py +++ b/modules/data_migrate/posts_migration.py @@ -74,7 +74,6 @@ print(f"UUID {post[3]}의 새 게시글을 생성합니다.") # 새 게시글 생성 Post.objects.create( - id=post[0], created_at=post[1], updated_at=post[2], post_uuid=post[3], diff --git a/modules/data_migrate/statistics_migration.py b/modules/data_migrate/statistics_migration.py index b19ee1f..2f58f0a 100644 --- a/modules/data_migrate/statistics_migration.py +++ b/modules/data_migrate/statistics_migration.py @@ -29,7 +29,7 @@ pds.daily_view_count, pds.daily_like_count, p.post_uuid FROM posts_postdailystatistics pds JOIN posts_post p ON pds.post_id = p.id - WHERE pds.date >= CURRENT_DATE - INTERVAL '7 days' + WHERE pds.date >= CURRENT_DATE - INTERVAL '3 days' ORDER BY pds.id LIMIT {chunk_size} OFFSET {offset} """ diff --git a/modules/data_migrate/users_migration.py b/modules/data_migrate/users_migration.py index c641715..ff42e7a 100644 --- a/modules/data_migrate/users_migration.py +++ b/modules/data_migrate/users_migration.py @@ -60,7 +60,6 @@ print(f"UUID {user[3]}의 새 사용자를 생성합니다.") # 새 사용자 생성 User.objects.create( - id=user[0], created_at=user[1], updated_at=user[2], velog_uuid=user[3], diff --git a/scraping/main.py b/scraping/main.py index 435c1c2..46964f0 100644 --- a/scraping/main.py +++ b/scraping/main.py @@ -103,51 +103,23 @@ async def _upsert_batch( def _execute_transaction() -> None: with transaction.atomic(): post_uuids = [post["id"] for post in batch_posts] - - # 현재 유저의 모든 게시글 가져오기 (active와 inactive 모두) - all_user_posts = { + existing_posts = { str(post.post_uuid): post - for post in Post.objects.filter(user=user) - } - - # 현재 활성화된 게시글만 가져오기 - existing_active_posts = { - uuid: post - for uuid, post in all_user_posts.items() - if post.is_active and uuid in post_uuids - } - - # 비활성화된 게시글 가져오기 (재활성화 여부 확인용) - inactive_posts = { - uuid: post - for uuid, post in all_user_posts.items() - if not post.is_active and uuid in post_uuids + for post in Post.objects.filter(post_uuid__in=post_uuids) } posts_to_create = [] posts_to_update = [] - posts_to_reactivate = [] - # 현재 데이터의 각 게시글에 대해 처리 for post_data in batch_posts: post_uuid = post_data["id"] - if post_uuid in existing_active_posts: - # 이미 활성화된 게시글 업데이트 - post = existing_active_posts[post_uuid] + if post_uuid in existing_posts: + post = existing_posts[post_uuid] post.title = post_data["title"] post.slug = post_data["url_slug"] post.released_at = post_data["released_at"] posts_to_update.append(post) - elif post_uuid in inactive_posts: - # 비활성화된 게시글 재활성화 - post = inactive_posts[post_uuid] - post.title = post_data["title"] - post.slug = post_data["url_slug"] - post.released_at = post_data["released_at"] - post.is_active = True - posts_to_reactivate.append(post) else: - # 새 게시글 생성 posts_to_create.append( Post( post_uuid=post_uuid, @@ -155,48 +127,87 @@ def _execute_transaction() -> None: user=user, slug=post_data["url_slug"], released_at=post_data["released_at"], - is_active=True, ) ) - # 과거 활성화 게시글 중 현재 데이터에 없는 게시글은 비활성화 - active_posts_to_deactivate = [] - current_post_uuids = set( - post_data["id"] for post_data in batch_posts - ) - - for post_uuid, post in all_user_posts.items(): - if post.is_active and post_uuid not in current_post_uuids: - post.is_active = False - active_posts_to_deactivate.append(post) - - # 일괄 업데이트 및 생성 수행 if posts_to_update: Post.objects.bulk_update( - posts_to_update, - ["title", "slug", "released_at"], - batch_size=200, - ) - - if posts_to_reactivate: - Post.objects.bulk_update( - posts_to_reactivate, - ["title", "slug", "released_at", "is_active"], - batch_size=200, - ) - - if active_posts_to_deactivate: - Post.objects.bulk_update( - active_posts_to_deactivate, - ["is_active"], - batch_size=200, + posts_to_update, ["title", "slug", "released_at"] ) if posts_to_create: - Post.objects.bulk_create(posts_to_create, batch_size=200) + Post.objects.bulk_create(posts_to_create) await _execute_transaction() + async def sync_post_active_status( + self, + user: User, + current_post_ids: set[str], + min_posts_threshold: int = 1, + ) -> None: + """현재 API에서 가져온 게시글 목록을 기준으로 활성/비활성 상태 동기화 + + Args: + user: 대상 사용자 + current_post_ids: 현재 API에서 가져온 게시글 ID 집합 + min_posts_threshold: API 응답에 최소 이 개수 이상의 게시글이 있어야 상태변경을 실행 + """ + + # API 응답이 너무 적으면 상태변경 하지 않음 (API 오류 가능성) + if len(current_post_ids) < min_posts_threshold: + logger.warning( + f"Skipping post status sync for user {user.velog_uuid} - Too few posts returned ({len(current_post_ids)})" + ) + return + + @sync_to_async(thread_sensitive=True) # type: ignore + def _execute_sync() -> None: + # 1. 비활성화 로직: 현재 목록에 없는 활성화된 게시글 찾기 + posts_to_deactivate = Post.objects.filter( + user=user, is_active=True + ).exclude(post_uuid__in=current_post_ids) + + deactivation_count = posts_to_deactivate.count() + + # 너무 많은 게시글이 비활성화되는 경우 방어 로직 + active_posts_count = Post.objects.filter( + user=user, is_active=True + ).count() + + if ( + active_posts_count > 0 + and deactivation_count / active_posts_count > 0.5 + ): + logger.warning( + f"Suspicious deactivation detected for user {user.velog_uuid}: " + f"Would deactivate {deactivation_count} out of {active_posts_count} posts. " + f"Skipping post status sync as a safety measure." + ) + return + + # 2. 재활성화 로직: 현재 목록에 있지만 비활성화된 게시글 찾기 + posts_to_reactivate = Post.objects.filter( + user=user, is_active=False, post_uuid__in=current_post_ids + ) + + reactivation_count = posts_to_reactivate.count() + + # 상태 업데이트 실행 + if deactivation_count > 0: + logger.info( + f"Deactivating {deactivation_count} posts for user {user.velog_uuid}" + ) + posts_to_deactivate.update(is_active=False) + + if reactivation_count > 0: + logger.info( + f"Reactivating {reactivation_count} posts for user {user.velog_uuid}" + ) + posts_to_reactivate.update(is_active=True) + + await _execute_sync() + async def update_daily_statistics( self, post: dict[str, str], stats: dict[str, str] ) -> None: @@ -346,9 +357,19 @@ async def process_user( fetched_posts = await fetch_all_velog_posts( session, username, origin_access_token, origin_refresh_token ) + all_post_ids = {post["id"] for post in fetched_posts} + logger.info( + f"Fetched {len(all_post_ids)} posts for user {user.velog_uuid}" + ) + # 게시물이 새로 생겼으면 추가, 아니면 업데이트 await self.bulk_upsert_posts(user, fetched_posts) + # 게시글 활성/비활성 상태 동기화 + await self.sync_post_active_status( + user, all_post_ids, min_posts_threshold=1 + ) + # 게시물을 적절한 크기의 청크로 나누어 처리 chunk_size = 20 for i in range(0, len(fetched_posts), chunk_size): diff --git a/scraping/tests/test_main.py b/scraping/tests/test_main.py deleted file mode 100644 index 557f8c1..0000000 --- a/scraping/tests/test_main.py +++ /dev/null @@ -1,616 +0,0 @@ -import uuid -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest -from asgiref.sync import sync_to_async -from django.db import transaction - -from posts.models import Post, PostDailyStatistics -from scraping.main import Scraper -from users.models import User -from utils.utils import get_local_now - - -class TestScraper: - @pytest.fixture - def scraper(self): - """Scraper 인스턴스 생성""" - return Scraper(group_range=range(1, 10), max_connections=10) - - @pytest.fixture - def user(self, db): - """테스트용 User 객체 생성""" - return User.objects.create( - velog_uuid=uuid.uuid4(), - access_token="encrypted-access-token", - refresh_token="encrypted-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_success( - self, mock_aes, scraper, user - ) -> None: - """토큰 업데이트 성공 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "new-access-token", - "refresh_token": "new-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is True - mock_asave.assert_called_once() - assert user.access_token == "encrypted-new-access-token" - assert user.refresh_token == "encrypted-new-refresh-token" - mock_encryption.decrypt.assert_any_call("encrypted-access-token") - mock_encryption.decrypt.assert_any_call("encrypted-refresh-token") - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_no_change( - self, mock_aes, scraper, user - ) -> None: - """토큰 업데이트 없음 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = lambda token: token - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "encrypted-access-token", - "refresh_token": "encrypted-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_expired_failure( - self, mock_aes, scraper, user - ): - """토큰이 만료되었을 때 업데이트 실패 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "decrypted-encrypted-access-token", - "refresh_token": "decrypted-encrypted-refresh-token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_update_old_tokens_with_mocked_decryption_failure( - self, mock_aes, scraper, user - ): - """복호화가 제대로 되지 않았을 경우 업데이트 실패 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = lambda token: None - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - new_tokens = { - "access_token": "valid_token", - "refresh_token": "valid_token", - } - - with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: - result = await scraper.update_old_tokens( - user, mock_encryption, new_tokens - ) - - assert result is False - mock_asave.assert_not_called() - - @pytest.mark.asyncio - async def test_bulk_upsert_posts_success(self, scraper, user): - """Post 객체 배치 분할 삽입 또는 업데이트 성공 테스트""" - posts_data = [ - { - "id": str(uuid.uuid4()), - "title": f"Title {i}", - "url_slug": f"slug-{i}", - "released_at": get_local_now(), - } - for i in range(50) - ] - - # _upsert_batch 메서드만 모킹 - with patch.object( - scraper, "_upsert_batch", new_callable=AsyncMock - ) as mock_upsert: - with patch("asyncio.sleep", new_callable=AsyncMock): - result = await scraper.bulk_upsert_posts( - user, posts_data, batch_size=10 - ) - - assert result is True - # 50개 / 10개 배치 = 5번 호출 - assert mock_upsert.call_count == 5 - - @pytest.mark.asyncio - async def test_bulk_upsert_posts_failure(self, scraper, user): - """Post 객체 배치 분할 삽입 또는 업데이트 실패 테스트""" - posts_data = [ - { - "id": str(uuid.uuid4()), - "title": f"Title {i}", - "url_slug": f"slug-{i}", - "released_at": get_local_now(), - } - for i in range(10) - ] - - # 실제 예외를 발생시키는 비동기 함수 생성 - async def mock_async_error(*args, **kwargs): - raise Exception("DB 에러") - - # sync_to_async가 적절한 비동기 함수를 반환하도록 패치 - with patch("scraping.main.sync_to_async") as mock_sync_to_async: - mock_sync_to_async.return_value = mock_async_error - result = await scraper.bulk_upsert_posts( - user, posts_data, batch_size=5 - ) - - assert result is False - mock_sync_to_async.assert_called() - - @pytest.mark.asyncio - @pytest.mark.django_db - async def test_upsert_batch_creates_and_updates(self, scraper): - """ - _upsert_batch 메서드가 기존 게시물을 업데이트하고, 신규 게시물을 생성하는지 검증합니다. - """ - test_user = await sync_to_async(User.objects.create)( - velog_uuid=uuid.uuid4(), - access_token="test-access-token", - refresh_token="test-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - # 기존 게시물 생성 (sync_to_async로 감싸줌) - existing_post_uuid = str(uuid.uuid4()) - original_title = "Original Title" - original_slug = "original-slug" - original_time = get_local_now() - await sync_to_async(Post.objects.create)( - post_uuid=existing_post_uuid, - title=original_title, - user=test_user, - slug=original_slug, - released_at=original_time, - ) - - # 배치 데이터 준비: 기존 게시물 업데이트용과 신규 게시물 생성용 데이터 포함 - updated_title = "Updated Title" - updated_slug = "updated-slug" - updated_time = get_local_now() - - new_post_uuid = str(uuid.uuid4()) - new_title = "New Title" - new_slug = "new-slug" - new_time = get_local_now() - - batch_posts = [ - { - "id": existing_post_uuid, - "title": updated_title, - "url_slug": updated_slug, - "released_at": updated_time, - }, - { - "id": new_post_uuid, - "title": new_title, - "url_slug": new_slug, - "released_at": new_time, - }, - ] - - # _upsert_batch 호출 - await scraper._upsert_batch(test_user, batch_posts) - - # 기존 게시물 업데이트 확인 (sync_to_async 사용) - updated_post = await sync_to_async(Post.objects.get)( - post_uuid=existing_post_uuid - ) - assert updated_post.title == updated_title - assert updated_post.slug == updated_slug - assert updated_post.released_at == updated_time - - # 신규 게시물 생성 확인 (sync_to_async 사용) - new_post = await sync_to_async(Post.objects.get)( - post_uuid=new_post_uuid - ) - assert new_post.title == new_title - assert new_post.slug == new_slug - assert new_post.released_at == new_time - - # user 관계 필드를 직접 비교하지 말고 ID로 비교 - new_post_user_id = await sync_to_async(lambda: new_post.user_id)() - assert new_post_user_id == test_user.id - - @pytest.mark.asyncio - @pytest.mark.django_db - async def test_upsert_batch_deactivates_old_posts(self, scraper): - """ - _upsert_batch 메서드가 더 이상 API에서 반환되지 않는 게시물을 비활성화하는지 검증합니다. - """ - test_user = await sync_to_async(User.objects.create)( - velog_uuid=uuid.uuid4(), - access_token="test-access-token", - refresh_token="test-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - # 기존 활성화 상태의 게시물 3개 생성 - post_uuid1 = str(uuid.uuid4()) - post_uuid2 = str(uuid.uuid4()) - post_uuid3 = str(uuid.uuid4()) - - await sync_to_async(Post.objects.create)( - post_uuid=post_uuid1, - title="Post 1", - user=test_user, - slug="post-1", - released_at=get_local_now(), - is_active=True, - ) - - await sync_to_async(Post.objects.create)( - post_uuid=post_uuid2, - title="Post 2", - user=test_user, - slug="post-2", - released_at=get_local_now(), - is_active=True, - ) - - await sync_to_async(Post.objects.create)( - post_uuid=post_uuid3, - title="Post 3", - user=test_user, - slug="post-3", - released_at=get_local_now(), - is_active=True, - ) - - # 새로운 배치 데이터에는 post_uuid1만 포함 (나머지는 비활성화되어야 함) - batch_posts = [ - { - "id": post_uuid1, - "title": "Updated Post 1", - "url_slug": "updated-post-1", - "released_at": get_local_now(), - } - ] - - # _upsert_batch 호출 - await scraper._upsert_batch(test_user, batch_posts) - - # 게시물 상태 확인 - # post_uuid1은 여전히 활성 상태여야 함 - post1 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid1) - assert post1.is_active is True - assert post1.title == "Updated Post 1" - - # post_uuid2와 post_uuid3는 비활성 상태로 변경되어야 함 - post2 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid2) - assert post2.is_active is False - - post3 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid3) - assert post3.is_active is False - - @pytest.mark.asyncio - @pytest.mark.django_db - async def test_upsert_batch_reactivates_inactive_posts(self, scraper): - """ - _upsert_batch 메서드가 비활성화된 게시물이 다시 API 결과에 포함될 경우 재활성화하는지 검증합니다. - """ - test_user = await sync_to_async(User.objects.create)( - velog_uuid=uuid.uuid4(), - access_token="test-access-token", - refresh_token="test-refresh-token", - group_id=1, - email="test@example.com", - is_active=True, - ) - - # 비활성화된 게시물 생성 - inactive_post_uuid = str(uuid.uuid4()) - await sync_to_async(Post.objects.create)( - post_uuid=inactive_post_uuid, - title="Inactive Post", - user=test_user, - slug="inactive-post", - released_at=get_local_now(), - is_active=False, - ) - - # 배치 데이터에 비활성화된 게시물 포함 - batch_posts = [ - { - "id": inactive_post_uuid, - "title": "Reactivated Post", - "url_slug": "reactivated-post", - "released_at": get_local_now(), - } - ] - - # _upsert_batch 호출 - await scraper._upsert_batch(test_user, batch_posts) - - # 게시물 상태 확인 - 재활성화되어야 함 - post = await sync_to_async(Post.objects.get)( - post_uuid=inactive_post_uuid - ) - assert post.is_active is True - assert post.title == "Reactivated Post" - assert post.slug == "reactivated-post" - - @pytest.mark.asyncio - async def test_update_daily_statistics_success(self, scraper): - """데일리 통계 업데이트 또는 생성 성공 테스트""" - post_data = {"id": "post-123"} - stats_data = {"data": {"getStats": {"total": 100}}, "likes": 5} - - with patch( - "scraping.main.sync_to_async", new_callable=MagicMock - ) as mock_sync_to_async: - mock_async_func = AsyncMock() - mock_sync_to_async.return_value = mock_async_func - - await scraper.update_daily_statistics(post_data, stats_data) - - mock_sync_to_async.assert_called() - mock_async_func.assert_called_once() - - for call_args in mock_sync_to_async.call_args_list: - args, kwargs = call_args - - assert callable(args[0]) - - if kwargs: - assert "post-123" in str(kwargs.get("post_data", "")) - assert 100 in str(kwargs.get("stats_data", "")) - - @patch("scraping.main.sync_to_async", new_callable=MagicMock) - @pytest.mark.asyncio - async def test_update_daily_statistics_exception( - self, mock_sync_to_async, scraper - ): - """데일리 통계 업데이트 실패 테스트""" - post_data = {"id": "post-123"} - stats_data = {"data": {"getStats": {"total": 100}}, "likes": 5} - - mock_async_func = AsyncMock(side_effect=Exception("Database error")) - mock_sync_to_async.return_value = mock_async_func - - try: - await scraper.update_daily_statistics(post_data, stats_data) - except Exception: - pass - - mock_sync_to_async.assert_called() - mock_async_func.assert_called_once() - - @patch("scraping.main.fetch_post_stats") - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_success(self, mock_fetch, scraper): - """fetch_post_stats 성공 테스트""" - mock_fetch.side_effect = [ - None, - None, - {"data": {"getStats": {"total": 100}}}, - ] - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is not None - mock_fetch.assert_called() - assert mock_fetch.call_count == 3 - - for call_args in mock_fetch.call_args_list: - args, kwargs = call_args - assert "post-123" in str(args) or "post-123" in str(kwargs) - assert ( - "token-1" in str(args) - or "token-1" in str(kwargs) - or "token-2" in str(args) - or "token-2" in str(kwargs) - ) - - @patch("scraping.main.fetch_post_stats") - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_max_retries( - self, mock_fetch, scraper - ): - """최대 재시도 횟수 초과 테스트""" - mock_fetch.return_value = None - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is None - assert mock_fetch.call_count >= 3 - - @patch("scraping.main.fetch_post_stats", new_callable=AsyncMock) - @pytest.mark.asyncio - async def test_fetch_post_stats_limited_failure(self, mock_fetch, scraper): - """fetch_post_stats 실패 테스트""" - mock_fetch.side_effect = [None, None, None] - - result = await scraper.fetch_post_stats_limited( - "post-123", "token-1", "token-2" - ) - - assert result is None - assert mock_fetch.call_count == 3 - - @patch("scraping.main.fetch_velog_user_chk") - @patch("scraping.main.fetch_all_velog_posts") - @patch("scraping.main.AESEncryption") - @pytest.mark.asyncio - async def test_process_user_success( - self, mock_aes, mock_fetch_posts, mock_fetch_user_chk, scraper, user - ): - """유저 데이터 전체 처리 성공 테스트""" - mock_encryption = mock_aes.return_value - mock_encryption.decrypt.side_effect = ( - lambda token: f"decrypted-{token}" - ) - mock_encryption.encrypt.side_effect = ( - lambda token: f"encrypted-{token}" - ) - - mock_fetch_user_chk.return_value = ( - {"access_token": "new-token"}, - {"data": {"currentUser": {"username": "testuser"}}}, - ) - mock_fetch_posts.return_value = [] - - with patch.object( - scraper, "update_old_tokens", new_callable=AsyncMock - ) as mock_update_tokens: - await scraper.process_user(user, MagicMock()) - - mock_update_tokens.assert_called_once() - - @patch("scraping.main.transaction.atomic") - @pytest.mark.django_db(transaction=True) - @pytest.mark.asyncio - async def test_process_user_failure_rollback( - self, mock_atomic, scraper, user - ): - """유저 데이터 처리 실패 시 롤백 확인 테스트""" - mock_session = AsyncMock() - mock_atomic.side_effect = transaction.atomic - - with patch( - "scraping.main.fetch_velog_user_chk", - side_effect=Exception("Failed to fetch user data"), - ): - with pytest.raises(Exception): - await scraper.process_user(user, mock_session) - - # 동기 쿼리를 비동기로 변환 - count = await sync_to_async(Post.objects.filter(user=user).count)() - assert count == 0 - - @pytest.mark.django_db(transaction=True) - @pytest.mark.asyncio - async def test_process_user_partial_failure_rollback(self, scraper, user): - """통계 업데이트 중 실패 시 롤백 확인 테스트""" - mock_session = AsyncMock() - post_uuid = uuid.uuid4() - - # 테스트용 게시물 직접 생성 - await sync_to_async(Post.objects.create)( - post_uuid=post_uuid, - user=user, - title="Test Post", - slug="test-slug", - released_at=get_local_now(), - ) - - # fetch_post_stats_limited 메서드에서 예외 발생시키기 - with patch.object( - scraper, - "fetch_post_stats_limited", - side_effect=Exception("Failed to fetch stats"), - ): - # bulk_upsert_posts 성공하도록 모킹 - with patch.object( - scraper, "bulk_upsert_posts", new_callable=AsyncMock - ) as mock_bulk_upsert: - mock_bulk_upsert.return_value = True - - # 다른 필요한 API 호출도 모킹 - with ( - patch( - "scraping.apis.fetch_velog_user_chk", - new_callable=AsyncMock, - ) as mock_user_chk, - patch( - "scraping.apis.fetch_all_velog_posts", - new_callable=AsyncMock, - ) as mock_posts, - ): - # 사용자 정보 모킹 - mock_user_chk.return_value = ( - {}, - {"data": {"currentUser": {"username": "testuser"}}}, - ) - - # 게시물 정보 모킹 - mock_posts.return_value = [ - { - "id": post_uuid, - "title": "Test Post", - "url_slug": "test-slug", - "released_at": get_local_now(), - } - ] - - # 예외를 처리하지만 게시물은 여전히 존재해야 함 - try: - await scraper.process_user(user, mock_session) - except Exception: - pass - - # 게시물이 존재하는지 확인 (sync_to_async로 래핑) - exists_func = sync_to_async( - lambda: Post.objects.filter(user=user).exists() - ) - exists = await exists_func() - assert exists - - # 통계 정보가 없는지 확인 - has_stats_func = sync_to_async( - lambda: PostDailyStatistics.objects.filter( - post__user=user - ).exists() - ) - has_stats = await has_stats_func() - assert not has_stats diff --git a/scraping/tests/test_main_posts.py b/scraping/tests/test_main_posts.py new file mode 100644 index 0000000..83ac813 --- /dev/null +++ b/scraping/tests/test_main_posts.py @@ -0,0 +1,276 @@ +import uuid +from unittest.mock import AsyncMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from posts.models import Post +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperPosts: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @pytest.mark.asyncio + async def test_bulk_upsert_posts_success(self, scraper, user): + """Post 객체 배치 분할 삽입 또는 업데이트 성공 테스트""" + posts_data = [ + { + "id": str(uuid.uuid4()), + "title": f"Title {i}", + "url_slug": f"slug-{i}", + "released_at": get_local_now(), + } + for i in range(50) + ] + + # _upsert_batch 메서드만 모킹 + with patch.object( + scraper, "_upsert_batch", new_callable=AsyncMock + ) as mock_upsert: + with patch("asyncio.sleep", new_callable=AsyncMock): + result = await scraper.bulk_upsert_posts( + user, posts_data, batch_size=10 + ) + + assert result is True + # 50개 / 10개 배치 = 5번 호출 + assert mock_upsert.call_count == 5 + + @pytest.mark.asyncio + async def test_bulk_upsert_posts_failure(self, scraper, user): + """Post 객체 배치 분할 삽입 또는 업데이트 실패 테스트""" + posts_data = [ + { + "id": str(uuid.uuid4()), + "title": f"Title {i}", + "url_slug": f"slug-{i}", + "released_at": get_local_now(), + } + for i in range(10) + ] + + # 실제 예외를 발생시키는 비동기 함수 생성 + async def mock_async_error(*args, **kwargs): + raise Exception("DB 에러") + + # sync_to_async가 적절한 비동기 함수를 반환하도록 패치 + with patch("scraping.main.sync_to_async") as mock_sync_to_async: + mock_sync_to_async.return_value = mock_async_error + result = await scraper.bulk_upsert_posts( + user, posts_data, batch_size=5 + ) + + assert result is False + mock_sync_to_async.assert_called() + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_upsert_batch_creates_and_updates(self, scraper): + """ + _upsert_batch 메서드가 기존 게시물을 업데이트하고, 신규 게시물을 생성하는지 검증합니다. + """ + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 기존 게시물 생성 (sync_to_async로 감싸줌) + existing_post_uuid = str(uuid.uuid4()) + original_title = "Original Title" + original_slug = "original-slug" + original_time = get_local_now() + await sync_to_async(Post.objects.create)( + post_uuid=existing_post_uuid, + title=original_title, + user=test_user, + slug=original_slug, + released_at=original_time, + ) + + # 배치 데이터 준비: 기존 게시물 업데이트용과 신규 게시물 생성용 데이터 포함 + updated_title = "Updated Title" + updated_slug = "updated-slug" + updated_time = get_local_now() + + new_post_uuid = str(uuid.uuid4()) + new_title = "New Title" + new_slug = "new-slug" + new_time = get_local_now() + + batch_posts = [ + { + "id": existing_post_uuid, + "title": updated_title, + "url_slug": updated_slug, + "released_at": updated_time, + }, + { + "id": new_post_uuid, + "title": new_title, + "url_slug": new_slug, + "released_at": new_time, + }, + ] + + # _upsert_batch 호출 + await scraper._upsert_batch(test_user, batch_posts) + + # 기존 게시물 업데이트 확인 (sync_to_async 사용) + updated_post = await sync_to_async(Post.objects.get)( + post_uuid=existing_post_uuid + ) + assert updated_post.title == updated_title + assert updated_post.slug == updated_slug + assert updated_post.released_at == updated_time + + # 신규 게시물 생성 확인 (sync_to_async 사용) + new_post = await sync_to_async(Post.objects.get)( + post_uuid=new_post_uuid + ) + assert new_post.title == new_title + assert new_post.slug == new_slug + assert new_post.released_at == new_time + + # user 관계 필드를 직접 비교하지 말고 ID로 비교 + new_post_user_id = await sync_to_async(lambda: new_post.user_id)() + assert new_post_user_id == test_user.id + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_sync_post_active_status(self, scraper): + """sync_post_active_status 메서드 테스트""" + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 다양한 게시물 생성 - 활성 및 비활성 + post_uuid1 = str(uuid.uuid4()) + post_uuid2 = str(uuid.uuid4()) + post_uuid3 = str(uuid.uuid4()) + post_uuid4 = str(uuid.uuid4()) + + # 활성 게시물 2개 + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid1, + title="Active Post 1", + user=test_user, + slug="active-post-1", + released_at=get_local_now(), + is_active=True, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid2, + title="Active Post 2", + user=test_user, + slug="active-post-2", + released_at=get_local_now(), + is_active=True, + ) + + # 비활성 게시물 2개 + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid3, + title="Inactive Post 1", + user=test_user, + slug="inactive-post-1", + released_at=get_local_now(), + is_active=False, + ) + + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid4, + title="Inactive Post 2", + user=test_user, + slug="inactive-post-2", + released_at=get_local_now(), + is_active=False, + ) + + # 현재 API에서 가져온 게시물 ID 집합 (post1과 post3만 포함) + current_post_ids = {post_uuid1, post_uuid3} + + # sync_post_active_status 메서드 호출 + await scraper.sync_post_active_status(test_user, current_post_ids) + + # 결과 확인: post1 = 활성 유지, post2 = 비활성으로 변경, post3 = 활성화로 변경, post4 = 비활성 유지 + post1 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid1) + assert post1.is_active is True, "post1 should remain active" + + post2 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid2) + assert post2.is_active is False, "post2 should be deactivated" + + post3 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid3) + assert post3.is_active is True, "post3 should be reactivated" + + post4 = await sync_to_async(Post.objects.get)(post_uuid=post_uuid4) + assert post4.is_active is False, "post4 should remain inactive" + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_sync_post_active_status_safety_threshold(self, scraper): + """sync_post_active_status의 안전 임계값 테스트""" + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # 10개의 활성 게시물 생성 + post_uuids = [str(uuid.uuid4()) for _ in range(10)] + + for i, post_uuid in enumerate(post_uuids): + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid, + title=f"Active Post {i}", + user=test_user, + slug=f"active-post-{i}", + released_at=get_local_now(), + is_active=True, + ) + + # 현재 API에서 가져온 게시물 ID 집합 (10개 중 2개만 포함 = 80% 비활성화 예정) + # 이는 scraper.py의 안전 임계값(50%)을 초과 + current_post_ids = {post_uuids[0], post_uuids[1]} + + # sync_post_active_status 메서드 호출 + await scraper.sync_post_active_status(test_user, current_post_ids) + + # 모든 게시물이 여전히 활성 상태여야 함 (안전 임계값으로 인해 작업이 수행되지 않음) + for post_uuid in post_uuids: + post = await sync_to_async(Post.objects.get)(post_uuid=post_uuid) + assert ( + post.is_active is True + ), f"Post {post_uuid} should remain active due to safety threshold" diff --git a/scraping/tests/test_main_statistics.py b/scraping/tests/test_main_statistics.py new file mode 100644 index 0000000..c9512ff --- /dev/null +++ b/scraping/tests/test_main_statistics.py @@ -0,0 +1,86 @@ +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from posts.models import Post, PostDailyStatistics +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperStatistics: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @pytest.mark.asyncio + async def test_update_daily_statistics_success(self, scraper): + """데일리 통계 업데이트 또는 생성 성공 테스트""" + post_data = {"id": "post-123", "likes": 10} + stats_data = {"data": {"getStats": {"total": 100}}} + + with patch( + "scraping.main.sync_to_async", new_callable=MagicMock + ) as mock_sync_to_async: + mock_async_func = AsyncMock() + mock_sync_to_async.return_value = mock_async_func + + await scraper.update_daily_statistics(post_data, stats_data) + + mock_sync_to_async.assert_called() + mock_async_func.assert_called_once() + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_update_daily_statistics_integration(self, scraper): + """데일리 통계 업데이트 통합 테스트""" + # 테스트 사용자 및 게시물 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + post_uuid = str(uuid.uuid4()) + await sync_to_async(Post.objects.create)( + post_uuid=post_uuid, + title="Test Post", + user=test_user, + slug="test-post", + released_at=get_local_now(), + is_active=True, + ) + + # 통계 데이터 준비 + post_data = {"id": post_uuid, "likes": 25} + stats_data = {"data": {"getStats": {"total": 150}}} + + # update_daily_statistics 호출 + await scraper.update_daily_statistics(post_data, stats_data) + + # 결과 확인 + today = get_local_now().date() + stats = await sync_to_async(PostDailyStatistics.objects.get)( + post__post_uuid=post_uuid, date=today + ) + + assert stats.daily_view_count == 150 + assert stats.daily_like_count == 25 diff --git a/scraping/tests/test_main_tokens.py b/scraping/tests/test_main_tokens.py new file mode 100644 index 0000000..1bb720d --- /dev/null +++ b/scraping/tests/test_main_tokens.py @@ -0,0 +1,327 @@ +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from asgiref.sync import sync_to_async + +from scraping.main import Scraper +from users.models import User +from utils.utils import get_local_now + + +class TestScraperTokenAndProcessing: + @pytest.fixture + def scraper(self): + """Scraper 인스턴스 생성""" + return Scraper(group_range=range(1, 10), max_connections=10) + + @pytest.fixture + def user(self, db): + """테스트용 User 객체 생성""" + return User.objects.create( + velog_uuid=uuid.uuid4(), + access_token="encrypted-access-token", + refresh_token="encrypted-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_success( + self, mock_aes, scraper, user + ) -> None: + """토큰 업데이트 성공 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "new-access-token", + "refresh_token": "new-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is True + mock_asave.assert_called_once() + assert user.access_token == "encrypted-new-access-token" + assert user.refresh_token == "encrypted-new-refresh-token" + mock_encryption.decrypt.assert_any_call("encrypted-access-token") + mock_encryption.decrypt.assert_any_call("encrypted-refresh-token") + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_no_change( + self, mock_aes, scraper, user + ) -> None: + """토큰 업데이트 없음 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = lambda token: token + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "encrypted-access-token", + "refresh_token": "encrypted-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_expired_failure( + self, mock_aes, scraper, user + ): + """토큰이 만료되었을 때 업데이트 실패 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "decrypted-encrypted-access-token", + "refresh_token": "decrypted-encrypted-refresh-token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_update_old_tokens_with_mocked_decryption_failure( + self, mock_aes, scraper, user + ): + """복호화가 제대로 되지 않았을 경우 업데이트 실패 테스트""" + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = lambda token: None + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + new_tokens = { + "access_token": "valid_token", + "refresh_token": "valid_token", + } + + with patch.object(user, "asave", new_callable=AsyncMock) as mock_asave: + result = await scraper.update_old_tokens( + user, mock_encryption, new_tokens + ) + + assert result is False + mock_asave.assert_not_called() + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_success(self, mock_fetch, scraper): + """fetch_post_stats 성공 테스트""" + mock_fetch.return_value = {"data": {"getStats": {"total": 100}}} + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is not None + assert result["data"]["getStats"]["total"] == 100 + mock_fetch.assert_called_once_with("post-123", "token-1", "token-2") + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_retry_success( + self, mock_fetch, scraper + ): + """fetch_post_stats 재시도 성공 테스트""" + mock_fetch.side_effect = [ + None, # 첫 번째 시도 실패 + {"data": {"getStats": {"total": 100}}}, # 두 번째 시도 성공 + ] + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is not None + assert result["data"]["getStats"]["total"] == 100 + assert mock_fetch.call_count == 2 + + @patch("scraping.main.fetch_post_stats") + @pytest.mark.asyncio + async def test_fetch_post_stats_limited_max_retries( + self, mock_fetch, scraper + ): + """최대 재시도 횟수 초과 테스트""" + mock_fetch.return_value = None + + result = await scraper.fetch_post_stats_limited( + "post-123", "token-1", "token-2" + ) + + assert result is None + assert mock_fetch.call_count == 3 # 최대 3번 재시도 + + @patch("scraping.main.fetch_velog_user_chk") + @patch("scraping.main.fetch_all_velog_posts") + @patch("scraping.main.AESEncryption") + @pytest.mark.asyncio + async def test_process_user_success( + self, mock_aes, mock_fetch_posts, mock_fetch_user_chk, scraper, user + ): + """유저 데이터 전체 처리 성공 테스트""" + # AES 암호화 모킹 + mock_encryption = mock_aes.return_value + mock_encryption.decrypt.side_effect = ( + lambda token: f"decrypted-{token}" + ) + mock_encryption.encrypt.side_effect = ( + lambda token: f"encrypted-{token}" + ) + + # 사용자 데이터 및 게시물 모킹 + mock_fetch_user_chk.return_value = ( + { + "access_token": "new-token", + "refresh_token": "new-refresh-token", + }, + {"data": {"currentUser": {"username": "testuser"}}}, + ) + + post_uuid = str(uuid.uuid4()) + mock_fetch_posts.return_value = [ + { + "id": post_uuid, + "title": "Test Post", + "url_slug": "test-post", + "released_at": get_local_now(), + "likes": 15, + } + ] + + # 내부 메서드 모킹 + with ( + patch.object( + scraper, "update_old_tokens", new_callable=AsyncMock + ) as mock_update_tokens, + patch.object( + scraper, "bulk_upsert_posts", new_callable=AsyncMock + ) as mock_bulk_upsert, + patch.object( + scraper, "sync_post_active_status", new_callable=AsyncMock + ) as mock_sync_status, + patch.object( + scraper, "fetch_post_stats_limited", new_callable=AsyncMock + ) as mock_fetch_stats, + patch.object( + scraper, "update_daily_statistics", new_callable=AsyncMock + ) as mock_update_stats, + ): + # 통계 데이터 모킹 + mock_fetch_stats.return_value = { + "data": {"getStats": {"total": 150}} + } + + # 테스트 실행 + await scraper.process_user(user, AsyncMock()) + + # 메서드 호출 확인 + mock_update_tokens.assert_called_once() + mock_bulk_upsert.assert_called_once() + mock_sync_status.assert_called_once() + mock_fetch_stats.assert_called_once() + mock_update_stats.assert_called_once() + + @patch("scraping.main.logger") + @pytest.mark.asyncio + async def test_run_method(self, mock_logger, scraper): + """run 메서드 테스트""" + # 테스트용 사용자 객체 생성 + test_user = AsyncMock() + + # 비동기 이터레이터를 반환하는 모킹 함수 생성 + async def async_mock_filter(*args, **kwargs): + for user in [test_user]: + yield user + + # User.objects.filter 모킹 + with patch("users.models.User.objects.filter") as mock_filter: + # 비동기 이터레이터를 반환하도록 설정 + mock_filter.return_value = async_mock_filter() + + # aiohttp.ClientSession 모킹 + with patch("aiohttp.ClientSession") as mock_session: + mock_session_instance = MagicMock() + mock_session.return_value.__aenter__.return_value = ( + mock_session_instance + ) + + # process_user 메서드 모킹 + with patch.object( + scraper, "process_user", new_callable=AsyncMock + ) as mock_process: + # 실행 + await scraper.run() + + # 로그 및 메서드 호출 확인 + assert mock_logger.info.call_count >= 2 # 시작과 종료 로그 + mock_process.assert_called_once_with(test_user, mock_session_instance) + + @pytest.mark.asyncio + @pytest.mark.django_db + async def test_scraper_target_user_run(self): + """ScraperTargetUser 클래스의 run 메서드 테스트""" + from scraping.main import ScraperTargetUser + + # 테스트 사용자 생성 + test_user = await sync_to_async(User.objects.create)( + velog_uuid=uuid.uuid4(), + access_token="test-access-token", + refresh_token="test-refresh-token", + group_id=1, + email="test@example.com", + is_active=True, + ) + + # ScraperTargetUser 인스턴스 생성 + target_scraper = ScraperTargetUser( + user_pk_list=[test_user.id], max_connections=10 + ) + + # process_user 메서드 모킹 + with patch.object( + target_scraper, "process_user", new_callable=AsyncMock + ) as mock_process: + # aiohttp.ClientSession 모킹 + with patch("aiohttp.ClientSession") as mock_session: + mock_session_instance = MagicMock() + mock_session.return_value.__aenter__.return_value = ( + mock_session_instance + ) + + # 실행 + await target_scraper.run() + + # process_user 호출 확인 + mock_process.assert_called_once()