## 0. 필요 라이브러리 import

In [1]:
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery

## 1. Pandas 라이브러리를 활용하여 BigQuery 다루기

#### 1-1) GCP 권한 인증

In [2]:
JSON_KEY_PATH = "/home/sprint_test/DataEngineeringRepo/03_gcp_python/sa_key/sprintda05_DE_key.json"

credentials = service_account.Credentials.from_service_account_file(JSON_KEY_PATH)

### 1-2) 고정값 정의

In [6]:
project_id = 'sprintda05-hojae'
dataset = 'sprint_pokemon'
table = 'pokemon'
location = "asia-northeast3"
query = f"SELECT * FROM {project_id}.{dataset}.{table}"

query

'SELECT * FROM sprintda05-hojae.sprint_pokemon.pokemon'

#### 1-3) 데이터 읽기

In [7]:
## 서비스 계정 역할을 '탐색자' -> 'BigQuery 관리자'로 변경

pd.read_gbq(
    query=query,
    project_id=project_id,
    location=location,
    credentials=credentials
)

Unnamed: 0,id,kor_name,eng_name,type1,type2,total,hp,attack,defense,special_attack,special_defense,speed,generation,is_legendary
0,1,이상해씨,Bulbasaur,Grass,Poison,318,45,49,49,65,65,45,1,False
1,2,이상해풀,Ivysaur,Grass,Poison,405,60,62,63,80,80,60,1,False
2,3,이상해꽃,Venusaur,Grass,Poison,525,80,82,83,100,100,80,1,False
3,4,파이리,Charmander,Fire,,309,39,52,43,60,50,65,1,False
4,5,리자드,Charmeleon,Fire,,405,58,64,58,80,65,80,1,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
246,247,데기라스,Pupitar,Rock,Ground,410,70,84,70,65,70,51,2,False
247,248,마기라스,Tyranitar,Rock,Dark,600,100,134,110,95,100,61,2,False
248,249,루기아,Lugia,Psychic,Flying,680,106,90,130,90,154,110,2,True
249,250,칠색조,Ho-oh,Fire,Flying,680,106,130,90,110,154,90,2,True


#### 1-4) 데이터 쓰기

In [9]:
## point_his.parquet 파일 -> DataFrame 변경 -> Bigquery 테이블로 Load

# 파일 읽기
point_his = pd.read_parquet('dataset/point_his.parquet')

# BigQuery에 데이터 업로드
table_id = 'sprint_pokemon.point_his'  # 데이터셋.테이블명 형식으로 지정

point_his.to_gbq(
    destination_table=table_id,
    project_id=project_id,
    if_exists='replace',  # 'replace', 'append', 'fail' 중 선택
    location=location,
    credentials=credentials
)

## 2. google-cloud-bigquery 라이브러리를 활용하여 BigQuery 다루기

#### 2-1) GCP 권한 인증 및 Client 생성
- Client - BigQuery에 사용자가 내린 명령을 API로 전달하고 결과를 가져오는 역할

In [11]:
JSON_KEY_PATH = "/home/sprint_test/DataEngineeringRepo/03_gcp_python/sa_key/sprintda05_DE_key.json"

credentials = service_account.Credentials.from_service_account_file(JSON_KEY_PATH)

project_id = 'sprintda05-hojae'
dataset = 'sprint_pokemon'
table = 'pokemon'
location = "asia-northeast3"

In [12]:
client = bigquery.Client(
    project=project_id,
    location=location,
    credentials=credentials
)

#### 2-2) Dataset 관련 실습

In [29]:
## dataset 생성
dataset_name = 'python_dataset'

dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}")

client.create_dataset(
    dataset = dataset_obj,
    exists_ok = True # 동일한 이름의 데이터 셋이 있을 때 코드를 성공하게 할지 실패하게 할지
)

Dataset(DatasetReference('sprintda05-hojae', 'python_dataset'))

In [18]:
## dataset 목록 조회
for dataset in client.list_datasets():
    print(dataset.dataset_id)

python_dataset
sprint_pokemon


In [20]:
## 데이터셋 삭제
client.delete_dataset(
    dataset=f"{project_id}.{dataset_name}",
    not_found_ok=True # 해당 데이터 셋이 없어도 에러안나게 해줌
)

In [21]:
dir(client)

['SCOPE',
 '_SET_PROJECT',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_call_api',
 '_client_cert_source',
 '_connection',
 '_credentials',
 '_dataset_from_arg',
 '_default_load_job_config',
 '_default_query_job_config',
 '_determine_default',
 '_do_multipart_upload',
 '_do_resumable_upload',
 '_ensure_bqstorage_client',
 '_get_query_results',
 '_http',
 '_http_internal',
 '_initiate_resumable_upload',
 '_list_rows_from_query_results',
 '_location',
 '_schema_from_json_file_object',
 '_schema_to_json_file_object',
 'cancel_job',
 'close',
 'copy_table',
 'create_dataset',
 'create_job',
 'create_routi

#### 2-3) Table 관련 실습

In [22]:
## 테이블 목록 조회
client.list_tables(
    dataset = 'sprint_pokemon'
    )

<google.api_core.page_iterator.HTTPIterator at 0x7ede886d4830>

In [24]:
# for문 이용해야하는 것들을 이터레이터라고 한다
for table in client.list_tables(dataset = 'sprint_pokemon'):
    print(table.table_id)

item_his
member
point_his
pokemon


In [28]:
## 테이블 조회(Bigquery 테이블 -> DataFrtame으로 불러오기)
table_obj = client.query(
    query="SELECT * FROM sprint_pokemon.point_his",
    project=project_id, # project랑 location은 client선언 할떄 명시를 해줬기 때문에 값 안줘도 될걸?
    location=location
)

# dir(table_obj) # 이걸로 뭐 쓸 수 있는지 보고 쓰면됨
table_obj.to_dataframe().head()


Unnamed: 0,idx,proc_ym,proc_ymd,point
0,87376,202304,20230401,1000
1,87599,202304,20230401,1000
2,87682,202304,20230401,1000
3,87555,202304,20230401,1000
4,87569,202304,20230401,1000


In [34]:
# 삭제 했던거 다시 생성하기
## dataset 생성
dataset_name = 'python_dataset'

dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}")

client.create_dataset(
    dataset = dataset_obj,
    exists_ok = True # 동일한 이름의 데이터 셋이 있을 때 코드를 성공하게 할지 실패하게 할지
)

## 테이블을 다른 dataset으로 복제
## sprint_pokemon.member -> python_dataset.member
client.copy_table(
    sources="sprint_pokemon.member",
    destination="python_dataset.member"
)


CopyJob<project=sprintda05-hojae, location=asia-northeast3, id=56d586d3-ce50-40fd-9fa3-852aef768597>

In [37]:
## dataframe -> Bigquery Table (to_gbq랑 비슷)

regdate = pd.read_parquet('dataset/regdate.parquet')

client.load_table_from_dataframe(
    dataframe=regdate,
    destination="sprint_pokemon.regdate"
)


LoadJob<project=sprintda05-hojae, location=asia-northeast3, id=751e91f7-6361-4546-af38-9881ebe9c1c7>

In [38]:
## 테이블 삭제

client.delete_table(
    table="sprint_pokemon.regdate",
    not_found_ok=True
)

#### 2-4) 실습 미션 - 1

In [None]:
# dataset/ 디렉토리 아래의 5개 parquet 파일을 대상으로 작업.

# - 데이터셋 ID : education
# - 테이블명 : 파일명과 동일
# - 요구사항
#     1. bigquery client를 활용하여 작업해주세요
#     2. 'education' 이라는 이름의 새로운 데이터셋을 생성해주세요.
#     3. 해당 데이터셋 내부에 5개의 테이블을 저장해주세요.
#     4. 'python_dataset' 데이터셋으로 member 제외 나머지 테이블 복사.
    
#   권장 사항!
#   - 3번, 4번을 for문 써서 어떻게 구현할 수 있을지 고민!

##### 라이브러리를 import하고 GCP 권한 인증

In [2]:
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import os

# GCP 권한 인증
JSON_KEY_PATH = "/home/sprint_test/DataEngineeringRepo/03_gcp_python/sa_key/sprintda05_DE_key.json"
credentials = service_account.Credentials.from_service_account_file(JSON_KEY_PATH)

# 프로젝트 정보 설정
project_id = 'sprintda05-hojae'  # 본인의 프로젝트 ID로 변경
location = "asia-northeast3"

# BigQuery 클라이언트 생성
client = bigquery.Client(
    project=project_id,
    location=location,
    credentials=credentials
)

##### 1. 'education' 데이터셋 생성

In [40]:
# 'education' 데이터셋 생성
dataset_name = 'education'
dataset_obj = bigquery.Dataset(f"{project_id}.{dataset_name}")
client.create_dataset(
    dataset=dataset_obj,
    exists_ok=True  # 이미 존재해도 에러 발생하지 않음
)

Dataset(DatasetReference('sprintda05-hojae', 'education'))

##### 2. 5개의 parquet 파일을 테이블로 저장

In [None]:
# dataset의 parquet 파일 목록
parquet_files = [f for f in os.listdir('dataset') if f[-8:] == '.parquet']

for file in parquet_files:
    # 파일명 확장자 제거 -> 테이블명
    table_name = file.replace('.parquet', '')
    
    df = pd.read_parquet(f'dataset/{file}')
    
    # DataFrame -> BigQuery
    client.load_table_from_dataframe(
        dataframe=df,
        destination=f"{project_id}.{dataset_name}.{table_name}"
    )

##### 3. 'member' 테이블을 제외한 나머지 테이블을 'python_dataset'으로 복사

In [44]:
# education 데이터셋의 모든 테이블 목록 가져오기
tables = list(client.list_tables(dataset_name))
table_ids = [table.table_id for table in tables]

# 'member'를 제외한 테이블만 복사
for table_id in table_ids:
    if table_id != 'member':
        source = f"{project_id}.{dataset_name}.{table_id}"
        destination = f"{project_id}.python_dataset.{table_id}"
        
        client.copy_table(
            sources=source,
            destination=destination
        )

##### 확인하기

In [48]:
# education 데이터셋의 테이블 확인
print("'education' dataset:")
for table in client.list_tables(dataset_name):
    print(f"- {table.table_id}")

# python_dataset의 테이블 확인
print("\n'python_dataset' dataset:")
for table in client.list_tables("python_dataset"):
    print(f"- {table.table_id}")

'education' dataset:
- item_his
- member
- point_his
- regdate
- study_his

'python_dataset' dataset:
- item_his
- member
- point_his
- regdate
- study_his


#### 2-5) BigQuery 테이블 → GCS의 파일로 저장

In [7]:
## CSV 파일로 저장
# education.point_his -> GCS 버킷에 저장!

bucket_name = "sprintda05-hojae-bucket"

client.extract_table(
    source="education.point_his",
    destination_uris=f"gs://{bucket_name}/edu_dataset/point_his.csv" # 해당 디렉토리가 없어도 만들어서 저장해줌
)

ExtractJob<project=sprintda05-hojae, location=asia-northeast3, id=435b4351-c41e-4a29-8a2a-4409e671b2ce>

In [8]:
## 압축 파일로 저장

job_config = bigquery.job.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP

client.extract_table(
   source="education.point_his",
   destination_uris=f"gs://{bucket_name}/edu_dataset/point_his.csv.gz",
   job_config=job_config
)

ExtractJob<project=sprintda05-hojae, location=asia-northeast3, id=df309b25-9a9c-4763-9d7a-2fa34184fc2b>

#### 2-6) GCS의 파일 → BigQuery 테이블로 Load

In [9]:
## CSV 파일

job_config = bigquery.LoadJobConfig(
   skip_leading_rows=1,
   autodetect=True,
   source_format=bigquery.SourceFormat.CSV
)

client.load_table_from_uri(
   source_uris="gs://sprintda05-hojae-bucket/edu_dataset/point_his.csv",
   destination="education.point_his_gcs",
   job_config=job_config
)

LoadJob<project=sprintda05-hojae, location=asia-northeast3, id=ec22a9c3-2286-452d-977e-fe60095893ed>

In [10]:
## parquet 파일
job_config = bigquery.LoadJobConfig(
   autodetect=True,
   source_format=bigquery.SourceFormat.PARQUET
)

client.load_table_from_uri(
   source_uris="gs://sprintda05-hojae-bucket/bigquery_data/item_his.parquet",
   destination="education.item_his_gcs",
   job_config=job_config
)

LoadJob<project=sprintda05-hojae, location=asia-northeast3, id=94da6250-c7b5-4a06-8c33-076bf1969478>