In [1]:
import pymysql
from sqlalchemy import create_engine
import pandas as pd
from pandas.io import sql
import datetime
import pandas as pd
import pandasql as ps
from concurrent.futures import ThreadPoolExecutor, as_completed


In [2]:
# DB connection Info

user = 'user'
pw = 'pw'
ip = 'ip'
port = 'port'
db = 'db_name'    # SQL client -> 필요한 DB 이름 확인

In [3]:
# DB Connect
pymysql.install_as_MySQLdb()
cnx = create_engine("mysql://{user}:{pw}@{ip}:{port}/{db}".format(user=user, pw=pw, ip=ip, port=port, db=db))

In [None]:
def fetch_and_save(offset, limit, retries=3):
    for attempt in range(retries):
        try:
            # SQL 쿼리 실행
            query = f"""
            SELECT uid, ref_uid, ref_count
            FROM reference
            LIMIT {limit} OFFSET {offset};
            """
            df_chunk = pd.read_sql_query(query, cnx)
            
            if not df_chunk.empty:
                # 데이터 파일에 저장
                mode = 'w' if offset == 0 else 'a'
                header = True if offset == 0 else False
                df_chunk.to_csv(f'./mid/thread_reference/reference_chunk_{offset}.txt', sep='\t', mode=mode, index=False, header=header)
                return len(df_chunk)
            else:
                # 데이터가 비어있으면 반복을 종료하도록 0을 반환
                return 0
        except Exception as e:
            print(f"Error fetching data for offset {offset}: {e}")
            if attempt == retries - 1:
                raise
            else:
                print(f"Retrying... Attempt {attempt + 2} of {retries}")

def main(limit=1000000):
    start_time = datetime.datetime.now()
    print(f"처리 시작 시간: {start_time}")

    total_records = 0
    offset = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = []
        while True:
            print(f"청크 처리 시작: Offset {offset}")
            future = executor.submit(fetch_and_save, offset, limit)
            futures.append(future)
            offset += limit  # 다음 작업을 위해 offset 업데이트

            # 각 future의 완료를 기다리고 결과 처리
            for future in as_completed(futures):
                result = future.result()
                if result == 0:
                    # 더 이상 처리할 데이터가 없으면 루프 종료
                    print("더 이상 처리할 데이터가 없습니다. 모든 데이터 처리 완료.")
                    futures.remove(future)  # 처리 완료된 future 제거
                    break
                else:
                    # 처리된 레코드 수 업데이트 및 진행 상황 출력
                    total_records += result
                    print(f"청크 처리 완료: Offset {offset - limit}, 처리된 레코드 수: {result}")
                    futures.remove(future)  # 처리 완료된 future 제거

            if result == 0:
                break

    end_time = datetime.datetime.now()
    print(f"처리 완료 시간: {end_time}, 총 처리 시간: {end_time - start_time}, 총 처리된 레코드 수: {total_records}")
    
main()

처리 시작 시간: 2024-02-19 14:47:35.395070
청크 처리 시작: Offset 0
청크 처리 완료: Offset 0, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 1000000
청크 처리 완료: Offset 1000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 2000000
청크 처리 완료: Offset 2000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 3000000
청크 처리 완료: Offset 3000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 4000000
청크 처리 완료: Offset 4000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 5000000
청크 처리 완료: Offset 5000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 6000000
청크 처리 완료: Offset 6000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 7000000
청크 처리 완료: Offset 7000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 8000000
청크 처리 완료: Offset 8000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 9000000
청크 처리 완료: Offset 9000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 10000000
청크 처리 완료: Offset 10000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 11000000
청크 처리 완료: Offset 11000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 12000000
청크 처리 완료: Offset 12000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 13000000
청크 처리 완료: Offset 13000000, 처리된 레코

청크 처리 완료: Offset 113000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 114000000
청크 처리 완료: Offset 114000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 115000000
청크 처리 완료: Offset 115000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 116000000
청크 처리 완료: Offset 116000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 117000000
청크 처리 완료: Offset 117000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 118000000
청크 처리 완료: Offset 118000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 119000000
청크 처리 완료: Offset 119000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 120000000
청크 처리 완료: Offset 120000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 121000000
청크 처리 완료: Offset 121000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 122000000
청크 처리 완료: Offset 122000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 123000000
청크 처리 완료: Offset 123000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 124000000
청크 처리 완료: Offset 124000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 125000000
청크 처리 완료: Offset 125000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 126000000
청크 처리 완료: Offset 126000000, 처리된 레코드 수:

청크 처리 완료: Offset 224000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 225000000
청크 처리 완료: Offset 225000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 226000000
청크 처리 완료: Offset 226000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 227000000
청크 처리 완료: Offset 227000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 228000000
청크 처리 완료: Offset 228000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 229000000
청크 처리 완료: Offset 229000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 230000000
청크 처리 완료: Offset 230000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 231000000
청크 처리 완료: Offset 231000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 232000000
청크 처리 완료: Offset 232000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 233000000
청크 처리 완료: Offset 233000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 234000000
청크 처리 완료: Offset 234000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 235000000
청크 처리 완료: Offset 235000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 236000000
청크 처리 완료: Offset 236000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 237000000
청크 처리 완료: Offset 237000000, 처리된 레코드 수:

청크 처리 완료: Offset 335000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 336000000
청크 처리 완료: Offset 336000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 337000000
청크 처리 완료: Offset 337000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 338000000
청크 처리 완료: Offset 338000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 339000000
청크 처리 완료: Offset 339000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 340000000
청크 처리 완료: Offset 340000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 341000000
청크 처리 완료: Offset 341000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 342000000
청크 처리 완료: Offset 342000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 343000000
청크 처리 완료: Offset 343000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 344000000
청크 처리 완료: Offset 344000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 345000000
청크 처리 완료: Offset 345000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 346000000
청크 처리 완료: Offset 346000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 347000000
청크 처리 완료: Offset 347000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 348000000
청크 처리 완료: Offset 348000000, 처리된 레코드 수:

청크 처리 완료: Offset 446000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 447000000
청크 처리 완료: Offset 447000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 448000000
청크 처리 완료: Offset 448000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 449000000
청크 처리 완료: Offset 449000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 450000000
청크 처리 완료: Offset 450000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 451000000
청크 처리 완료: Offset 451000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 452000000
청크 처리 완료: Offset 452000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 453000000
청크 처리 완료: Offset 453000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 454000000
청크 처리 완료: Offset 454000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 455000000
청크 처리 완료: Offset 455000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 456000000
청크 처리 완료: Offset 456000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 457000000
청크 처리 완료: Offset 457000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 458000000
청크 처리 완료: Offset 458000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 459000000
청크 처리 완료: Offset 459000000, 처리된 레코드 수:

청크 처리 완료: Offset 557000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 558000000
청크 처리 완료: Offset 558000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 559000000
청크 처리 완료: Offset 559000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 560000000
청크 처리 완료: Offset 560000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 561000000
청크 처리 완료: Offset 561000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 562000000
청크 처리 완료: Offset 562000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 563000000
청크 처리 완료: Offset 563000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 564000000
청크 처리 완료: Offset 564000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 565000000
청크 처리 완료: Offset 565000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 566000000
청크 처리 완료: Offset 566000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 567000000
청크 처리 완료: Offset 567000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 568000000
청크 처리 완료: Offset 568000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 569000000
청크 처리 완료: Offset 569000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 570000000
청크 처리 완료: Offset 570000000, 처리된 레코드 수:

청크 처리 완료: Offset 668000000, 처리된 레코드 수: 1000000
청크 처리 시작: Offset 669000000
