-
Notifications
You must be signed in to change notification settings - Fork 1
[25.04.19 / TASK-157] Feature: 배치, soft delete 기능 이를 위한 테스트 코드 업데이트와 데이터 마이그레이션 추가 #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,3 +8,5 @@ | |
| ] | ||
|
|
||
| CORS_ALLOW_ALL_ORIGINS = True | ||
|
|
||
| # DATABASES["prod"] = env.db() # noqa: F405 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| #!/usr/bin/env python | ||
| """ | ||
| 게시글 데이터 마이그레이션 스크립트 | ||
|
|
||
| 원격/운영 데이터베이스에서 로컬 데이터베이스로 Post 테이블 데이터를 이관합니다. | ||
| """ | ||
|
|
||
| from django.db import connections, transaction | ||
|
|
||
| from posts.models import Post | ||
| from users.models import User | ||
|
|
||
| print("게시글 마이그레이션을 시작합니다...") | ||
|
|
||
| try: | ||
| # 청크 크기 설정 | ||
| chunk_size = 500 | ||
| offset = 0 | ||
| total_migrated = 0 | ||
| success_count = 0 | ||
| update_count = 0 | ||
| skipped_count = 0 | ||
|
|
||
| while True: | ||
| # 원격 DB에서 게시글 데이터를 청크 단위로 가져오기 | ||
| with connections["prod"].cursor() as cursor: | ||
| cursor.execute( | ||
| f""" | ||
| SELECT p.id, p.created_at, p.updated_at, p.post_uuid, p.user_id, | ||
| p.title, p.is_active, p.slug, p.released_at, u.velog_uuid | ||
| FROM posts_post p | ||
| JOIN users_user u ON p.user_id = u.id | ||
| ORDER BY p.id | ||
| LIMIT {chunk_size} OFFSET {offset} | ||
| """ | ||
| ) | ||
| posts = cursor.fetchall() | ||
|
|
||
| if not posts: | ||
| break | ||
|
|
||
| print(f"게시글 데이터 {len(posts)}개 처리 중 (오프셋 {offset})...") | ||
|
|
||
| # 로컬 DB에 데이터 삽입 | ||
| with transaction.atomic(): | ||
| for post in posts: | ||
| # 게시글이 이미 로컬에 존재하는지 확인 | ||
| existing_post = Post.objects.filter(post_uuid=post[3]).first() | ||
|
|
||
| # velog_uuid를 이용해 로컬 사용자 찾기 | ||
| velog_uuid = post[9] # velog_uuid는 10번째 컬럼 | ||
|
|
||
| try: | ||
| user = User.objects.get(velog_uuid=velog_uuid) | ||
| except User.DoesNotExist: | ||
| print( | ||
| f"UUID {velog_uuid}의 사용자가 로컬에 존재하지 않습니다. 게시글 {post[3]} 건너뜁니다." | ||
| ) | ||
| skipped_count += 1 | ||
| continue | ||
|
|
||
| if existing_post: | ||
| print( | ||
| f"UUID {post[3]}의 게시글이 이미 존재합니다. 정보를 업데이트합니다..." | ||
| ) | ||
| # 기존 게시글 정보 업데이트 | ||
| existing_post.title = post[5] | ||
| existing_post.is_active = post[6] | ||
| existing_post.slug = post[7] | ||
| existing_post.released_at = post[8] | ||
| existing_post.save() | ||
| update_count += 1 | ||
| else: | ||
| print(f"UUID {post[3]}의 새 게시글을 생성합니다.") | ||
| # 새 게시글 생성 | ||
| Post.objects.create( | ||
| created_at=post[1], | ||
| updated_at=post[2], | ||
| post_uuid=post[3], | ||
| user=user, | ||
| title=post[5], | ||
| is_active=post[6], | ||
| slug=post[7], | ||
| released_at=post[8], | ||
| ) | ||
| success_count += 1 | ||
|
|
||
| total_migrated += len(posts) | ||
| offset += chunk_size | ||
| print( | ||
| f"현재까지 {total_migrated}개의 게시글을 마이그레이션했습니다..." | ||
| ) | ||
|
|
||
| print( | ||
| f"게시글 마이그레이션이 완료되었습니다. 새로 생성: {success_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" | ||
| ) | ||
| print(f"총 처리된 레코드: {total_migrated}개") | ||
|
|
||
| except Exception as e: | ||
| print(f"게시글 마이그레이션 중 오류 발생: {e}") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| #!/usr/bin/env python | ||
| """ | ||
| 게시글 통계 데이터 마이그레이션 스크립트 | ||
|
|
||
| 원격/운영 데이터베이스에서 로컬 데이터베이스로 PostDailyStatistics 테이블 데이터를 이관합니다. | ||
| """ | ||
|
|
||
| from django.db import connections, transaction | ||
|
|
||
| from posts.models import Post, PostDailyStatistics | ||
|
|
||
| print("게시글 통계 마이그레이션을 시작합니다...") | ||
|
|
||
| try: | ||
| # 청크 크기 설정 | ||
| chunk_size = 1000 | ||
| offset = 0 | ||
| total_migrated = 0 | ||
| new_count = 0 | ||
| update_count = 0 | ||
| skipped_count = 0 | ||
|
|
||
| while True: | ||
| # 원격 DB에서 통계 데이터를 청크 단위로 가져오기 (post_uuid와 함께) | ||
| with connections["prod"].cursor() as cursor: | ||
| cursor.execute( | ||
| f""" | ||
| SELECT pds.id, pds.created_at, pds.updated_at, pds.post_id, pds.date, | ||
| pds.daily_view_count, pds.daily_like_count, p.post_uuid | ||
| FROM posts_postdailystatistics pds | ||
| JOIN posts_post p ON pds.post_id = p.id | ||
| WHERE pds.date >= CURRENT_DATE - INTERVAL '3 days' | ||
| ORDER BY pds.id | ||
| LIMIT {chunk_size} OFFSET {offset} | ||
| """ | ||
| ) | ||
| stats_chunk = cursor.fetchall() | ||
|
|
||
| if not stats_chunk: | ||
| break | ||
|
|
||
| print(f"통계 데이터 {len(stats_chunk)}개 처리 중 (오프셋 {offset})...") | ||
|
|
||
| # 로컬 DB에 데이터 삽입 | ||
| with transaction.atomic(): | ||
| for stat in stats_chunk: | ||
| # post_uuid를 이용해 로컬 게시글 찾기 | ||
| post_uuid = stat[7] # post_uuid는 8번째 컬럼 | ||
|
|
||
| try: | ||
| post = Post.objects.get(post_uuid=post_uuid) | ||
| except Post.DoesNotExist: | ||
| print( | ||
| f"UUID {post_uuid}의 게시글이 로컬에 존재하지 않습니다. 통계 {stat[0]} 건너뜁니다." | ||
| ) | ||
| skipped_count += 1 | ||
| continue | ||
|
|
||
| # 통계가 이미 로컬에 존재하는지 확인 (post 객체와 date로 찾기) | ||
| existing_stat = PostDailyStatistics.objects.filter( | ||
| post=post, date=stat[4] | ||
| ).first() | ||
|
|
||
| if existing_stat: | ||
| # 기존 통계 정보 업데이트 | ||
| existing_stat.daily_view_count = stat[5] | ||
| existing_stat.daily_like_count = stat[6] | ||
| existing_stat.save() | ||
| update_count += 1 | ||
| else: | ||
| # 새 통계 생성 | ||
| PostDailyStatistics.objects.create( | ||
| id=stat[0], | ||
| created_at=stat[1], | ||
| updated_at=stat[2], | ||
| post=post, # 로컬 post 객체 사용 | ||
| date=stat[4], | ||
| daily_view_count=stat[5], | ||
| daily_like_count=stat[6], | ||
| ) | ||
| new_count += 1 | ||
|
|
||
| total_migrated += len(stats_chunk) | ||
| offset += chunk_size | ||
| print( | ||
| f"현재까지 {total_migrated}개의 통계 데이터를 마이그레이션했습니다..." | ||
| ) | ||
|
|
||
| print( | ||
| f"게시글 통계 마이그레이션이 완료되었습니다. 새로 생성: {new_count}개, 업데이트: {update_count}개, 건너뜀: {skipped_count}개" | ||
| ) | ||
| print(f"총 처리된 레코드: {total_migrated}개") | ||
|
|
||
| except Exception as e: | ||
| print(f"게시글 통계 마이그레이션 중 오류 발생: {e}") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| #!/usr/bin/env python | ||
| """ | ||
| 사용자 데이터 마이그레이션 스크립트 | ||
|
|
||
| 원격/운영 데이터베이스에서 로컬 데이터베이스로 User 테이블 데이터를 이관합니다. | ||
| """ | ||
|
|
||
| from django.db import connections, transaction | ||
|
|
||
| from users.models import User | ||
|
|
||
| print("사용자 마이그레이션을 시작합니다...") | ||
|
|
||
| try: | ||
| # 청크 크기 설정 (메모리 문제 방지를 위해) | ||
| chunk_size = 500 | ||
| offset = 0 | ||
| total_migrated = 0 | ||
| success_count = 0 | ||
| update_count = 0 | ||
|
|
||
| while True: | ||
| # 원격 DB에서 사용자 데이터를 청크 단위로 가져오기 | ||
| with connections["prod"].cursor() as cursor: | ||
| cursor.execute( | ||
| f""" | ||
| SELECT id, created_at, updated_at, velog_uuid, access_token, | ||
| refresh_token, group_id, email, is_active | ||
| FROM users_user | ||
| ORDER BY id | ||
| LIMIT {chunk_size} OFFSET {offset} | ||
| """ | ||
| ) | ||
| users = cursor.fetchall() | ||
|
|
||
| if not users: | ||
| break | ||
|
|
||
| print(f"사용자 데이터 {len(users)}명 처리 중 (오프셋 {offset})...") | ||
|
|
||
| # 로컬 DB에 데이터 삽입 | ||
| with transaction.atomic(): | ||
| for user in users: | ||
| # 사용자가 이미 로컬에 존재하는지 확인 | ||
| existing_user = User.objects.filter(velog_uuid=user[3]).first() | ||
|
|
||
| if existing_user: | ||
| print( | ||
| f"UUID {user[3]}의 사용자가 이미 존재합니다. 정보를 업데이트합니다..." | ||
| ) | ||
| # 기존 사용자 정보 업데이트 | ||
| existing_user.access_token = user[4] | ||
| existing_user.refresh_token = user[5] | ||
| existing_user.group_id = user[6] | ||
| existing_user.email = user[7] | ||
| existing_user.is_active = user[8] | ||
| existing_user.save() | ||
| update_count += 1 | ||
| else: | ||
| print(f"UUID {user[3]}의 새 사용자를 생성합니다.") | ||
| # 새 사용자 생성 | ||
| User.objects.create( | ||
| created_at=user[1], | ||
| updated_at=user[2], | ||
| velog_uuid=user[3], | ||
| access_token=user[4], | ||
| refresh_token=user[5], | ||
| group_id=user[6], | ||
| email=user[7], | ||
| is_active=user[8], | ||
| ) | ||
| success_count += 1 | ||
|
|
||
| total_migrated += len(users) | ||
| offset += chunk_size | ||
| print( | ||
| f"현재까지 {total_migrated}명의 사용자를 마이그레이션했습니다..." | ||
| ) | ||
|
|
||
| print( | ||
| f"사용자 마이그레이션이 완료되었습니다. 새로 생성: {success_count}명, 업데이트: {update_count}명" | ||
| ) | ||
| print(f"총 처리된 사용자: {total_migrated}명") | ||
|
|
||
| except Exception as e: | ||
| print(f"사용자 마이그레이션 중 오류 발생: {e}") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,6 @@ | |
| import async_timeout | ||
| import environ | ||
| import sentry_sdk | ||
|
|
||
| from asgiref.sync import sync_to_async | ||
| from django.db import transaction | ||
|
|
||
|
|
@@ -141,6 +140,74 @@ def _execute_transaction() -> None: | |
|
|
||
| await _execute_transaction() | ||
|
|
||
| async def sync_post_active_status( | ||
| self, | ||
| user: User, | ||
| current_post_ids: set[str], | ||
| min_posts_threshold: int = 1, | ||
| ) -> None: | ||
| """현재 API에서 가져온 게시글 목록을 기준으로 활성/비활성 상태 동기화 | ||
|
|
||
| Args: | ||
| user: 대상 사용자 | ||
| current_post_ids: 현재 API에서 가져온 게시글 ID 집합 | ||
| min_posts_threshold: API 응답에 최소 이 개수 이상의 게시글이 있어야 상태변경을 실행 | ||
| """ | ||
|
|
||
| # API 응답이 너무 적으면 상태변경 하지 않음 (API 오류 가능성) | ||
| if len(current_post_ids) < min_posts_threshold: | ||
| logger.warning( | ||
| f"Skipping post status sync for user {user.velog_uuid} - Too few posts returned ({len(current_post_ids)})" | ||
| ) | ||
| return | ||
|
|
||
| @sync_to_async(thread_sensitive=True) # type: ignore | ||
| def _execute_sync() -> None: | ||
| # 1. 비활성화 로직: 현재 목록에 없는 활성화된 게시글 찾기 | ||
| posts_to_deactivate = Post.objects.filter( | ||
| user=user, is_active=True | ||
| ).exclude(post_uuid__in=current_post_ids) | ||
|
|
||
| deactivation_count = posts_to_deactivate.count() | ||
|
|
||
| # 너무 많은 게시글이 비활성화되는 경우 방어 로직 | ||
| active_posts_count = Post.objects.filter( | ||
| user=user, is_active=True | ||
| ).count() | ||
|
|
||
| if ( | ||
| active_posts_count > 0 | ||
| and deactivation_count / active_posts_count > 0.5 | ||
| ): | ||
| logger.warning( | ||
| f"Suspicious deactivation detected for user {user.velog_uuid}: " | ||
| f"Would deactivate {deactivation_count} out of {active_posts_count} posts. " | ||
| f"Skipping post status sync as a safety measure." | ||
| ) | ||
| return | ||
|
Comment on lines
+173
to
+187
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 비율로 방어 로직 추가하신게 섬세해서 너무 좋은 것 같아요.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 흐흐 아주 중요한 부분 짚어주셔서 감사해요. 알아봐주셔서 감격 ㅎ 근데 이제 서비스 관점에서 언급한 것과 같이 3개 쓰고 2개 비활성화 하는 사람이 정말.. 대시보드 통계가 궁금할까..? 쪽이었고, 일단 주관으로 "아니다" 라고 결론을 내렸네요. 근데 [ 만약 3개 쓰고 2개 비활성화 하는 유저가 나타난다면! 그때 바꾸자! ] 에 가까운 것 같아요 :)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 결론 내리신 부분 이해 했습니다! 저도 같은 생각입니다ㅎㅎ 👍 |
||
|
|
||
| # 2. 재활성화 로직: 현재 목록에 있지만 비활성화된 게시글 찾기 | ||
| posts_to_reactivate = Post.objects.filter( | ||
| user=user, is_active=False, post_uuid__in=current_post_ids | ||
| ) | ||
|
|
||
| reactivation_count = posts_to_reactivate.count() | ||
|
|
||
| # 상태 업데이트 실행 | ||
| if deactivation_count > 0: | ||
| logger.info( | ||
| f"Deactivating {deactivation_count} posts for user {user.velog_uuid}" | ||
| ) | ||
| posts_to_deactivate.update(is_active=False) | ||
|
|
||
| if reactivation_count > 0: | ||
| logger.info( | ||
| f"Reactivating {reactivation_count} posts for user {user.velog_uuid}" | ||
| ) | ||
| posts_to_reactivate.update(is_active=True) | ||
|
|
||
| await _execute_sync() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 신규 데이터에 없는데 과거 데이터에는 있을 경우만 고려하신게 아니라, 반대의 상황 즉 활성화까지 고려하신 부분이 놀라웠습니다!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 사실 비즈니스로직 설계 때 보다 다루는 데이터의양, 그리고 사실 이게 핵심 비즈니스 로직에 가깝기때문에 저는 flow chart 와 시퀀스 다이어그램에 진심으로 힘을 쏟습니다! 저희 특성상 아직까진 API 에 엄청 복잡한 로직이 없을 뿐이지, 앞으로 만약 생긴다면 똑같이 갈거에요!! 제 기준이 있다기 보단 계속 엣지 케이스에 집착하다보니까 이렇게 까지 된 것 같아요. 이 배치가 잘 못 되었을 경우 사실 API 는 죄없이 다 터질꺼고, 그러면 client side 도 억울하게 다 터지니까요 ㅎㅎ |
||
|
|
||
| async def update_daily_statistics( | ||
| self, post: dict[str, str], stats: dict[str, str] | ||
| ) -> None: | ||
|
|
@@ -290,9 +357,19 @@ async def process_user( | |
| fetched_posts = await fetch_all_velog_posts( | ||
| session, username, origin_access_token, origin_refresh_token | ||
| ) | ||
| all_post_ids = {post["id"] for post in fetched_posts} | ||
| logger.info( | ||
| f"Fetched {len(all_post_ids)} posts for user {user.velog_uuid}" | ||
| ) | ||
|
|
||
| # 게시물이 새로 생겼으면 추가, 아니면 업데이트 | ||
| await self.bulk_upsert_posts(user, fetched_posts) | ||
|
|
||
| # 게시글 활성/비활성 상태 동기화 | ||
| await self.sync_post_active_status( | ||
| user, all_post_ids, min_posts_threshold=1 | ||
| ) | ||
|
|
||
| # 게시물을 적절한 크기의 청크로 나누어 처리 | ||
| chunk_size = 20 | ||
| for i in range(0, len(fetched_posts), chunk_size): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API 응답이 너무 적을 경우, is_active를 변경하지 않는다는 말씀이신가요?
최소 개수 이상의 기준을 세우신 이유가 어떤 상황을 고려하신건지 설명해주실 수 있으신가요?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#26 (comment)
@ooheunda 님과 같은 질문 주셔서 영광임다! 위 코멘트로 대체 할게요!