## (변성윤 마스터님) 구글 빅쿼리 명령어(쿼리) 참고자료: https://zzsza.github.io/bigquery/guide.html

### 의존성 파일 설치

In [None]:
# !pip install google-cloud-bigquery
# !pip install gspread oauth2client
# !pip install db-dtypes
# !pip install pandas-gbq

In [None]:
from google.oauth2 import service_account
from google.cloud import bigquery

import pandas as pd
import numpy as np
from tqdm import tqdm
from datetime import datetime

from crwlnt.notiAPI.product import NotiServerRequest

import time
import pickle
import pandas_gbq

## 구글 클라우드 연동 및 BigQuery에서 데이터 불러오기

In [None]:
SERVICE_ACCOUNT_FILE = "./config/level3-416207-893f91c9529e.json"  # 키 json 파일

# Credentials 객체 생성
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE)

# 빅쿼리 클라이언트 객체 생성
project_id = "level3-416207"
client = bigquery.Client(credentials=credentials, project=project_id)

# 쿼리 실행
# 빅쿼리 디렉토리는 <프로젝트ID>.<데이터셋ID>.<테이블ID> 순으로 저장되어있음 ex) level3-416207.l3_30.l3_30
QUERY = (
    '''
    SELECT hashed_ip, local_time, request_url_endpoint, uri_first
    FROM `level3-416207.log_129.revised_log_129`
    WHERE local_time >= TIMESTAMP '2024-01-13 00:00:00 UTC'
    ''')


# API request
df = client.query(QUERY).to_dataframe()
df

## /products/

In [None]:
df_products = df[df['uri_first'] == 'products']
df_products['products'] = df_products['request_url_endpoint'].map(lambda x: x[10:])
df_products = df_products[df_products['products'].str.startswith('?path=')==False]
df_products

In [None]:
df_new_products = df_products.loc[:,['hashed_ip', 'local_time', 'products']]
df_new_products

In [None]:
df_new_products.info()

## /spec/

In [None]:
df_spec = df[df['uri_first']=='spec']
df_spec['products'] = df_spec['request_url_endpoint'].map(lambda x: x[6:])
df_spec

In [None]:
df_new_spec = df_spec.loc[:, ['hashed_ip', 'local_time', 'products']]
df_new_spec

In [None]:
df_new_spec.info()

## /redirect/

In [None]:
df_redirect = df[df['uri_first']=='redirect']
df_redirect['products'] = df_redirect['request_url_endpoint'].map(lambda x: x[10:])
df_redirect = df_redirect[df_redirect['products'].str.startswith('?p=')]
df_redirect['products'] = df_redirect['products'].map(lambda x: x.split('&')[0].split('=')[1] + '-' + x.split('&')[1].split('=')[1] + '-' +x.split('&')[2].split('=')[1])
df_redirect

In [None]:
df_new_redirect = df_redirect.loc[:,['hashed_ip', 'local_time', 'products']]
df_new_redirect

In [None]:
df_new_redirect.info()

## /

In [None]:
df_re = df[df['uri_first'].str.startswith('?p=', na=False)]

df_re['products'] = None
for i in list(df_re.index):
    try:
        df_re.loc[i,'products'] = df_re.loc[i, 'uri_first'].split('&')[0].split('=')[1] + '-' + df_re.loc[i, 'uri_first'].split('&')[1].split('=')[1] + '-' + df_re.loc[i, 'uri_first'].split('&')[2].split('=')[1]
    except:
        pass

df_re

In [None]:
df_new_re = df_re[df_re['products'].isna()==False].loc[:,['hashed_ip', 'local_time', 'products']]
df_new_re

In [None]:
df_new_re.info()

## Concat

In [None]:
product = pd.concat([df_new_products, df_new_re, df_new_redirect, df_new_spec], axis=0, ignore_index=True)
product

In [None]:
product['products'] = product['products'].astype(str)

for i in tqdm(list(product.index)):
    if '/' in product.loc[i,'products']:
        product.loc[i, 'products'] = product.loc[i, 'products'].split('/')[0]

product = product[product['products']!='']
product

## product_id/piv 형태만 남기고 검색되는 product의 interaction만 남기기

In [None]:
product_id = product[product['products'].apply(lambda x: len(str(x)) == 64)]
product_piv = product[product['products'].str.contains('-')]

product_id_list = list(set(product_id['products'].values))
product_piv_list = list(set(product_piv['products'].values))


print(len(product_id_list))
print(len(product_piv_list))
print(len(product_id_list) + len(product_piv_list))

In [None]:
# 원하는 개수로 product_id/piv 리스트 자르기
def chunk_array(array, chunk_size):
    return [array[i:i+chunk_size] for i in range(0, len(array), chunk_size)]


# 원하는 개수만큼 request 보내기
def send_array_in_chunks(array, chunk_size, type):
    chunks = chunk_array(array, chunk_size)
    product_df = pd.DataFrame(columns=['id', 'piv', 'title'])
    for chunk in chunks:
        if type == 'ids':
            res = NotiServerRequest.bulk_product_info(ids=chunk)
        else:
            res = NotiServerRequest.bulk_product_info(pivs=chunk)

        result = res.json()['data']['products']
        for j in range(len(result)):
            try:
                product_df.loc[len(product_df)] = [result[j]['id'], result[j]['piv'], result[j]['title']]
            except:
                pass

        print(len(product_df))
        time.sleep(11)
    return product_df

In [None]:
# n개씩 묶어서 보내기, 남은 원소들 보내기
chunk_size = 5000

id_list_df = send_array_in_chunks(product_id_list, chunk_size, 'ids')
piv_list_df = send_array_in_chunks(product_piv_list, chunk_size, 'pivs')

In [None]:
# product_id로 검색은 가능하지만 piv만 없고 title이 있는 경우는 남기기
res = NotiServerRequest.bulk_product_info(ids=list(set(product_id_list) - set(id_list_df['id'].values)))
result = res.json()['data']['products']
for i in range(len(result)):
    try:
        id_list_df.loc[len(id_list_df)] = [result[i]['id'], None, result[i]['title']]
    except:
        pass

In [None]:
# 검색해도 안 나오는 product_id, piv 지우기
# product_~~_list = 로그에 등장한 product_id/piv 중복 없이 전부 다 있는 list
# ~~_list_df = api로 검색했을 때 검색이 된 상품을 모아놓은 df
# set(product_id_list) - set(id_list_df['id'].values) => 차집합 이용해서 전체로그에는 있지만 검색이 안 된 product_id 찾기

id_error_dict = {id_error:None for id_error in list(set(product_id_list) - set(id_list_df['id'].values))}
piv_error_dict = {piv_error:None for piv_error in list(set(product_piv_list) - set(piv_list_df['piv'].values))}

product_id['products'] = product_id['products'].map(lambda x: id_error_dict[x] if x in list(id_error_dict.keys()) else x)
product_piv['products'] = product_piv['products'].map(lambda x: piv_error_dict[x] if x in list(piv_error_dict.keys()) else x)

product_id = product_id[product_id['products'].notnull()]
product_piv = product_piv[product_piv['products'].notnull()]

## interaction 파일 저장하기

In [None]:
product = pd.concat([product_piv, product_id], axis=0, ignore_index=True)
product = product.sort_values('local_time')
print(product.shape)

In [None]:
product.to_csv('./crwlnt/data_csv/interaction/interaction_240113_final.csv', index=False)

## 파생) product_info_df.pickle 파일 저장 및 불러오기
- id, piv, title 포함

In [None]:
product_info_df = pd.concat([id_list_df, piv_list_df], axis=0, ignore_index=True)
product_info_df

In [None]:
# with open('product_info_df.pickle','wb') as fw:
#     pickle.dump(product_info_df, fw)

In [None]:
# with open('./product_info_df.pickle', 'rb') as fr:
#     product_info_df = pickle.load(fr)

## BigQuery에 업로드

In [None]:
import pandas_gbq

# 업로드할 데이터 경로 설정/ 없는 경로로 설정해주면 새로 생성해줌
upload_project_id = "level3-416207" 
upload_dataset_id = 'l3_30'
upload_table_id = 'upload_test'

# 업로드
#pandas_gbq.to_gbq(df, destination_table=f'{upload_dataset_id}.{upload_table_id}', project_id=upload_project_id, if_exists='replace', credentials=credentials)

# '''
# if_exists 매개변수
# 'fail': 기존 테이블이 이미 존재하는 경우에는 업로드를 실패시킵니다. 기본값은 'fail'입니다.
# 'replace': 기존 테이블이 이미 존재하는 경우에는 해당 테이블을 덮어씁니다.
# 'append': 기존 테이블이 이미 존재하는 경우에는 데이터를 테이블에 추가합니다.
# ''' 