## 0. 필요 라이브러리 import

In [1]:
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery

## 1. Pandas 라이브러리를 활용하여 BigQuery 다루기

#### 1-1) GCP 권한 인증

In [2]:
# 현재 작업중인 노트북을 기준으로 상대경로 입력
JSON_KEY_PATH = "sa_key/sprintda05_DE_key.json"

credentials = service_account.Credentials.from_service_account_file(JSON_KEY_PATH)

### 1-2) 고정값 정의

In [3]:
project_id = 'sprintda05-hyunsoo'
dataset = 'sprint_pokemon'
table = 'pokemon'
location = 'asia-northeast3'
query = f"SELECT * FROM {project_id}.{dataset}.{table}"

query

'SELECT * FROM sprintda05-hyunsoo.sprint_pokemon.pokemon'

#### 1-3) 데이터 읽기

In [5]:
## 서비스 계정 역할을 '탐색자' -> 'BigQuery 관리자'로 변경

df = pd.read_gbq(
    query=query,
    project_id=project_id,
    location=location,
    credentials=credentials
)

df.head()

Unnamed: 0,id,kor_name,eng_name,type1,type2,total,hp,attack,defense,special_attack,special_defense,speed,generation,is_legendary
0,1,이상해씨,Bulbasaur,Grass,Poison,318,45,49,49,65,65,45,1,False
1,2,이상해풀,Ivysaur,Grass,Poison,405,60,62,63,80,80,60,1,False
2,3,이상해꽃,Venusaur,Grass,Poison,525,80,82,83,100,100,80,1,False
3,4,파이리,Charmander,Fire,,309,39,52,43,60,50,65,1,False
4,5,리자드,Charmeleon,Fire,,405,58,64,58,80,65,80,1,False


#### 1-4) 데이터 쓰기

In [7]:
## point_his.parquet 파일 -> DataFrame 변경 -> Bigquery 테이블로 Load

point_his = pd.read_parquet('dataset/point_his.parquet')

point_his.head()

Unnamed: 0,idx,proc_ym,proc_ymd,point
0,96465,202306,20230624,1000
1,96465,202306,20230624,500
2,87940,202304,20230405,2000
3,87940,202304,20230405,3500
4,87940,202304,20230405,4000


In [8]:
point_his.to_gbq(
    destination_table="sprint_pokemon.point_his",
    project_id=project_id,
    location=location,
    if_exists='replace',
    credentials=credentials
)

## 2. google-cloud-bigquery 라이브러리를 활용하여 BigQuery 다루기

#### 2-1) GCP 권한 인증 및 Client 생성
- Client - BigQuery에 사용자가 내린 명령을 API로 전달하고 결과를 가져오는 역할

In [2]:
# 현재 작업중인 노트북을 기준으로 상대경로 입력
JSON_KEY_PATH = "sa_key/sprintda05_DE_key.json"

credentials = service_account.Credentials.from_service_account_file(JSON_KEY_PATH)
project_id = 'sprintda05-hyunsoo'
dataset = 'sprint_pokemon'
location = 'asia-northeast3'

In [3]:
client = bigquery.Client(
    project=project_id,
    location=location,
    credentials=credentials
)

#### 2-2) Dataset 관련 실습

In [27]:
## dataset 생성
dataset_name = 'python_dataset'

dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}")

client.create_dataset(
    dataset=dataset_obj,
    exists_ok=True
)

Dataset(DatasetReference('sprintda05-hyunsoo', 'python_dataset'))

In [19]:
## dataset 목록 조회

for dataset in client.list_datasets():
    print(dataset.dataset_id)

python_dataset
sprint_pokemon


In [20]:
## 데이터셋 삭제

client.delete_dataset(
    dataset=f"{project_id}.{dataset_name}",
    not_found_ok=True
)

In [None]:
## 해당 객체가 사용할 수 있는 메소드를 모두 다 출력!

dir(client)

['SCOPE',
 '_SET_PROJECT',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_call_api',
 '_client_cert_source',
 '_connection',
 '_credentials',
 '_dataset_from_arg',
 '_default_load_job_config',
 '_default_query_job_config',
 '_determine_default',
 '_do_multipart_upload',
 '_do_resumable_upload',
 '_ensure_bqstorage_client',
 '_get_query_results',
 '_http',
 '_http_internal',
 '_initiate_resumable_upload',
 '_list_rows_from_query_results',
 '_location',
 '_schema_from_json_file_object',
 '_schema_to_json_file_object',
 'cancel_job',
 'close',
 'copy_table',
 'create_dataset',
 'create_job',
 'create_routi

#### 2-3) Table 관련 실습

In [23]:
## 테이블 목록 조회

for table in client.list_tables(dataset='sprint_pokemon'):
    print(table.table_id)

item_his
member
point_his
pokemon


In [26]:
## 테이블 조회 (BigQuery 테이블 -> DataFrame)

table_obj = client.query(
    query="SELECT * FROM sprint_pokemon.point_his",
    project=project_id,
    location=location
)

table_obj.to_dataframe().head()

Unnamed: 0,idx,proc_ym,proc_ymd,point
0,87376,202304,20230401,1000
1,87599,202304,20230401,1000
2,87682,202304,20230401,1000
3,87555,202304,20230401,1000
4,87569,202304,20230401,1000


In [None]:
## 테이블을 다른 dataset으로 복제
## sprint_pokemon.member -> python_dataset.member
def copy_tbl_func():
    client.copy_table(
        sources="sprint_pokemon.member",
        destination="python_dataset.member"
    )

CopyJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=4fdc8119-0b63-49e5-a6fc-b94fbc8ab62c>

In [30]:
## dataframe -> Bigquery Table (to_gbq랑 비슷)

regdate = pd.read_parquet('dataset/regdate.parquet')

client.load_table_from_dataframe(
    dataframe=regdate,
    destination="sprint_pokemon.regdate"
)

LoadJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=8e5e1dff-80e7-4433-a934-d4a52a389912>

In [31]:
## 테이블 삭제

client.delete_table(
    table="sprint_pokemon.regdate",
    not_found_ok=True
)

#### 2-4) 실습 미션 - 1

In [None]:
"""
dataset/ 디렉토리 아래의 5개 parquet 파일을 대상으로 작업.

- 데이터셋 ID : education
- 테이블명 : 파일명과 동일
- 요구사항
    1. bigquery client를 활용하여 작업해주세요
    2. 'education' 이라는 이름의 새로운 데이터셋을 생성해주세요.
    3. 해당 데이터셋 내부에 5개의 테이블을 저장해주세요.
    4. 'python_dataset' 데이터셋으로 member 제외 나머지 테이블 복사.
    
- 권장 사항!
  - 3번, 4번을 for문 써서 어떻게 구현할 수 있을지 고민!
"""

In [15]:
## dataset 생성
dataset_name = 'education'

dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}")

client.create_dataset(
    dataset=dataset_obj,
    exists_ok=True
)

Dataset(DatasetReference('sprintda05-hyunsoo', 'education'))

In [20]:
import os

for file in os.listdir('dataset'):
    # df 생성
    df = pd.read_parquet(f"dataset/{file}")
    
    # 테이블 이름 생성
    table_name = file.replace('.parquet', '')
    print(table_name)

    # 테이블을 Bigquery로 Load
    client.load_table_from_dataframe(
        dataframe=df,
        destination=f"education.{table_name}"
    )

member
item_his
point_his
regdate
study_his


In [22]:
for table in client.list_tables(dataset='education'):
    table_name = table.table_id
    
    """
    테이블 이름이 member가 아닌 경우에만 copy_table 함수 적용
    """
    if table_name != 'member':
        print(table_name)
        client.copy_table(
            sources=f"education.{table_name}",
            destination=f"python_dataset.{table_name}"
        )

item_his
point_his
regdate
study_his


#### 2-5) BigQuery 테이블 → GCS의 파일로 저장

In [None]:
# 저장소 관리자

# Storage Admin

In [4]:
## CSV 파일로 저장
# education.point_his -> GCS 버킷에 저장!

bucket_name = "sprintda05-hyunsoo-bucket"

client.extract_table(
    source="education.point_his",
    destination_uris=f"gs://{bucket_name}/edu_dataset/point_his.csv"
)

ExtractJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=5c4a9a9e-c9d2-40aa-98a7-baafc6acc5bc>

In [None]:
## 압축 파일로 저장

job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP

client.extract_table(
    source="education.point_his",
    destination_uris=f"gs://{bucket_name}/edu_dataset/point_his.csv.gz",
    job_config=job_config
)

ExtractJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=4f6553af-e2e3-4b96-8cf5-000b17a9ce22>

#### 2-6) GCS의 파일 → BigQuery 테이블로 Load

In [6]:
## CSV 파일 

job_config = bigquery.LoadJobConfig(
    skip_leading_rows=1,
    autodetect=True,
    source_format=bigquery.SourceFormat.CSV
    )

client.load_table_from_uri(
    source_uris="gs://sprintda05-hyunsoo-bucket/edu_dataset/point_his.csv",
    destination="education.point_his_gcs",
    job_config=job_config
)

LoadJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=d807fbc1-f5c8-4be4-acf5-93a110e9cc08>

In [8]:
## parquet 파일

job_config = bigquery.LoadJobConfig(
    autodetect=True,
    source_format=bigquery.SourceFormat.PARQUET
    )

client.load_table_from_uri(
    source_uris="gs://sprintda05-hyunsoo-bucket/bigquery_data/item_his.parquet",
    destination="education.item_his_gcs",
    job_config=job_config
)


LoadJob<project=sprintda05-hyunsoo, location=asia-northeast3, id=2cbb21de-6a99-485d-998a-ee38082c0643>

In [15]:
import os
import pandas as pd


df = pd.read_parquet("dataset/study_his.parquet")

df['proc_ymd'] = pd.to_datetime(df['proc_ymd'].astype(str), format='%Y%m%d')

In [17]:
date_range = pd.date_range(start='2023-04-01', end='2023-04-30')

base_path = os.getcwd()

for date in date_range:
    filtered_df = df[df['proc_ymd'] == date]
    if not filtered_df.empty:
        yyyy = f"yyyy={date.year}"
        mm = f"mm={date.strftime('%m')}"
        dd = f"dd={date.strftime('%d')}"
        
        dir_path = os.path.join(base_path, 'gcp_part_parquet', yyyy, mm, dd)
        os.makedirs(dir_path, exist_ok=True)
        
        file_path = os.path.join(dir_path, "data.parquet")
        filtered_df.to_parquet(file_path, index=False, engine='pyarrow', compression='gzip') 