In [None]:
%pip install google-cloud-bigquery pandas db-dtypes pandas-gbq

In [3]:
from google.cloud import bigquery
import pandas_gbq
import pandas as pd
# import db_dtypes
# BigQuery 클라이언트 객체를 생성합니다.

client = bigquery.Client.from_service_account_json('../../../teemo-415918-c7066e71ebbb.json')

In [4]:
project_id = 'teemo-415918'
origin_table_id = "teemo-415918.match_dataset.match_test"
target_table_id = "teemo-415918.match_dataset.match_v1"
temp_table_id = "teemo-415918.match_dataset.temp"

In [5]:
cate_cols = {}
cate_cols['other'] = ['summonerId', 'teamId', 'individualPosition', 'role', 'championId', 'win', 'defense', ' flex', 'offense', 'matchId' ]
cate_cols['item'] = ['item0', 'item1', 'item2', 'item3', 'item4', 'item5', 'item6']
cate_cols['summonerSpell'] = ['summoner1Id', 'summoner2Id']

In [None]:
for cate in cate_cols.keys():
    if cate == 'other':
        for col in cate_cols[cate]:
            query = f"""
            SELECT `{col}`
            FROM `{origin_table_id}`
            """
            df = client.query(query).to_dataframe()

            category_mapping = {category: idx for idx, category in enumerate(df[col].unique(), start=1)}

            df[col] = df[col].map(category_mapping)

            pandas_gbq.to_gbq(df, temp_table_id, project_id=project_id, if_exists='replace')

            update_query = f"""
            UPDATE `{target_table_id}` AS target
            SET target.{col} = temp.{col}
            FROM `{temp_table_id}` AS temp
            WHERE target.id = temp.id
            """

    else:
        merge_df = pd.DataFrame(columns=[cate])
        for col in cate_cols[cate]:
            query = f"""
            SELECT `{col}`
            FROM `{origin_table_id}`
            """

            df = client.query(query).to_dataframe()
            df.columns =[cate]

            merge_df = pd.concat([merge_df, df], axis=0)

        category_mapping = {category: idx for idx, category in enumerate(merge_df[cate].unique(), start=1)}

        for col in cate_cols[cate]:
            df[col] = df[col].map(category_mapping)


In [None]:
origin_table = client.get_table(origin_table_id)
category_to_index = {}

write_disposition = bigquery.WriteDisposition.WRITE_APPEND

# 총 데이터 수 계산
query_count = f"SELECT COUNT(*) as total FROM `{origin_table_id}`"
total_rows = client.query(query_count).to_dataframe().iloc[0]['total']

print('total_rows : ', total_rows)

In [None]:
cate_cols = {}
cate_cols['other'] = ['summonerId', 'teamId', 'individualPosition', 'role', 'championId', 'win', 'defense', ' flex', 'offense', 'matchId' ]
cate_cols['item'] = ['item0', 'item1', 'item2', 'item3', 'item4', 'item5', 'item6']
cate_cols['summonerSpell'] = ['summoner1Id', 'summoner2Id']

In [None]:
def create_table(table_id, col_name, field_type):
    schema = [ bigquery.SchemaField(col_name, field_type) ]
    table = bigquery.Table(table_id, schema=schema)
    client.delete_table(table, not_found_ok=True)
    client.create_table(table)


def get_unique(col_name):
    # 고유 카테고리 값 수집
    query_unique_categories = f"""
    SELECT DISTINCT {col_name}
    FROM `{origin_table_id}`
    """

    df_unique_categories = client.query(query_unique_categories).to_dataframe()

    return df_unique_categories


def other_cate(cate_group, batch_size):
    num_batches = (total_rows + batch_size - 1) // batch_size
    for col_name in cate_cols[cate_group]:
        df_unique_categories = get_unique(col_name)
        category_to_index[col_name] = {category: idx for idx, category in enumerate(df_unique_categories[col_name], start=1)}

        # 인덱싱 테이블을 저장
        print('start: ', col_name, "index_table")
        table_id = pp_table_id + f'.{col_name}_index_table'

        for i in origin_table.schema:
            if i.name == col_name:
                field_type = i.field_type
                break

        schema = create_table(table_id, col_name, field_type)
        
        for batch_num in range(num_batches):
            start_row = batch_num * batch_size
            
            cut_df = df_unique_categories.iloc[start_row : start_row + batch_size]

            job_config = bigquery.LoadJobConfig(schema=schema, write_disposition=write_disposition)
            job = client.load_table_from_dataframe(cut_df, table_id, job_config=job_config)
            job.result()


        ### 인덱싱된 데이터 저장
        print('start: ', col_name, "indexed_table")
        table_id = pp_table_id + f'.{col_name}_indexed_table'
        schema = create_table(table_id, col_name, "INTEGER")
        for batch_num in range(num_batches):
            start_row = batch_num * batch_size
            query_batch = f"""
            SELECT {col_name}
            FROM `{table_id}`
            LIMIT {batch_size} OFFSET {start_row}
            """
            df_batch = client.query(query_batch).to_dataframe()
            
            # 인덱스 매핑 적용
            df_batch[col_name] = df_batch[col_name].map(category_to_index[col_name])
            
            job_config = bigquery.LoadJobConfig(schema=schema, write_disposition=write_disposition)
            job = client.load_table_from_dataframe(df_batch, table_id, job_config=job_config)
            job.result()


def series_cate(cate_group, batch_size):
    num_batches = (total_rows + batch_size - 1) // batch_size
    df_unique_categories = pd.DataFrame()
    for col_name in cate_cols[cate_group]:
        df_unique_categories = pd.concat([df_unique_categories, get_unique(col_name)], axis=0)

    print(df_unique_categories.sample(10))
    df_unique_categories.drop_duplicates(inplace=True)
    category_to_index = {category: idx for idx, category in enumerate(df_unique_categories, start=1)}
    
    print('start: ', cate_group, "index_table")
    table_id = pp_table_id + f'.{cate_group}_indexed_table'
    schema = create_table(table_id, cate_group, "INTEGER")

    for batch_num in range(num_batches):
        start_row = batch_num * batch_size
        
        cut_df = df_unique_categories.iloc[start_row : start_row + batch_size]

        job_config = bigquery.LoadJobConfig(schema=schema, write_disposition=write_disposition)
        job = client.load_table_from_dataframe(cut_df, table_id, job_config=job_config)
        job.result()

    print('start: ', cate_group, "indexed_table")
    table_id = pp_table_id + f'.{cate_group}_indexed_table'
    schema = create_table(table_id, cate_group, "INTEGER")
    for batch_num in range(num_batches):
        start_row = batch_num * batch_size
        query_batch = f"""
        SELECT {col_name}
        FROM `{table_id}`
        LIMIT {batch_size} OFFSET {start_row}
        """
        df_batch = client.query(query_batch).to_dataframe()
        
        # 인덱스 매핑 적용
        df_batch[col_name] = df_batch[col_name].map(category_to_index)
        
        job_config = bigquery.LoadJobConfig(schema=schema, write_disposition=write_disposition)
        job = client.load_table_from_dataframe(df_batch, table_id, job_config=job_config)
        job.result()

In [None]:
# 배치 처리를 위한 설정
batch_size = 100000  # 배치 크기 설정
num_batches = (total_rows + batch_size - 1) // batch_size  # 필요한 배치 수 계산

for cate_group in cate_cols.keys():
    if cate_group == 'other':
        other_cate(cate_group, batch_size)

    else:
        series_cate(cate_group, batch_size)

In [None]:
cont_cols = []
cate_cols = ['summonerId', 'teamId', 'individualPosition', 'role', 'championId', 'item0', 'item1', 'item2', 'item3', 'item4', 'item5', 'item6', 
             'summoner1Id', 'summoner2Id', 'win', 'defense', ' flex', 'offense', 'matchId']

for col in cate_cols:
    table_id = pp_table_id + f'.{col}_index_table'
    # client.delete_table(table_id, not_found_ok=True)
    # client.create_table(table_id)



In [None]:
query_batch = f"""
        CREATE TABLE `teemo-415918.match_dataset.match_test2_backup` AS
        SELECT *
        FROM `teemo-415918.match_dataset.match_test2`
        """

client.query(query_batch)