In [1]:
!pip install google-cloud-bigquery pandas google-cloud-storage



In [2]:
!pip install openpyxl



In [496]:
import io
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.bigquery import SchemaField
import numpy as np
import pandas as pd
import math

## 定数定義

In [562]:
BUCKET_NAME = "project-asl-chibabank" 
DATASET_DIR = "T02_デーブルデータ"
DESC_COL_DIR = "Column_description" 

PROJECT_ID = "qwiklabs-asl-02-9dacbbe2194b"
DATASET_ID = "ASL_dataset4"

In [563]:
def remove_char_from_array(arr, char_to_remove, char_to_replace):
    return [s.replace(char_to_remove, char_to_replace) for s in arr]

In [504]:
FILE_NAME_LIST = [
    "CIF別取引属性_個人_サンプル.xlsx", 
    "CRM_交渉履歴_サンプル.xlsx", 
    "VISAデビット契約情報_サンプル.xlsx", 
    "チャネル別利用状況_サンプル.xlsx", 
    "保険元帳_サンプル.xlsx",
    "入払情報_日次_サンプル.xlsx",
    "名寄せ取引属性_日次_サンプル.xlsx",
    "完済融資バッチキー_サンプル.xlsx",
    "定期預金元帳_サンプル.xlsx",
    "為替取引明細_サンプル.xlsx",
    "融資バッチキー_サンプル.xlsx",
    "融資ローン元帳_サンプル.xlsx"
]

In [505]:
TABLE_ID_LIST = remove_char_from_array(FILE_NAME_LIST, "_サンプル.xlsx", "")

In [526]:
DESC_COL_FILE_NAME_LIST = [
    "desc_col_CIF別取引属性_個人_サンプル.xlsx",
    "desc_col_CRM_交渉履歴_サンプル.xlsx", 
    "desc_col_VISAデビット契約情報_サンプル.xlsx", 
    "desc_col_チャネル別利用状況_サンプル.xlsx", 
    "desc_col_保険元帳_サンプル.xlsx",
    "desc_col_入払情報_日次_サンプル.xlsx",
    "desc_col_名寄せ取引属性_日次_サンプル.xlsx",
    "desc_col_完済融資バッチキー_サンプル.xlsx",
    "desc_col_定期預金元帳_サンプル.xlsx",
    "desc_col_為替取引明細_サンプル.xlsx",
    "desc_col_融資バッチキー_サンプル.xlsx",
    "desc_col_融資ローン元帳_サンプル.xlsx"
]

## テーブルデータ取得

In [507]:
def rename_columns(array):
    columns_1 = remove_char_from_array(array, "−", "_")
    columns_2 = remove_char_from_array(columns_1, "・", "_")
    columns_3 = remove_char_from_array(columns_2, "／", "_")
    columns_4 = remove_char_from_array(columns_3, "（", "_")
    columns_5 = remove_char_from_array(columns_4, "）", "")
    return columns_5

In [528]:
def get_table_data(file_name_list):
    table_data_list = []

    for file_name in file_name_list:
        blob = bucket.blob(f"{DATASET_DIR}/{file_name}")
        data = blob.download_as_string()

        try:
            df = pd.read_excel(io.BytesIO(data))
            columns = rename_columns(df.columns.values)
            df.columns = columns
            table_data_list.append(df)
        except Exception as e:
            print(f"Error reading Excel file: {e}")
            exit(1)
            
    return table_data_list

In [529]:
table_data_list = get_table_data(file_name_list=FILE_NAME_LIST)

In [509]:
# table_data_list = []

# for file_name in FILE_NAME_LIST:
#     blob = bucket.blob(f"{DATASET_DIR}/{file_name}")
#     data = blob.download_as_string()

#     try:
#         df = pd.read_excel(io.BytesIO(data))
#         columns = rename_columns(df.columns.values)
#         df.columns = columns
#         table_data_list.append(df)
#     except Exception as e:
#         print(f"Error reading Excel file: {e}")
#         exit(1)

In [530]:
# ダミーデータ特有の処理
if table_data_list[5]["銀行コード"][0] != "134": 
    table_data_list[5]["銀行コード"][0] = "134"
    
table_data_list[5]["銀行コード"] = table_data_list[5]["銀行コード"].astype(int)

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  table_data_list[5]["銀行コード"][0] = "134"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  table_data_list[5]["銀行コー

## テーブルデータをBigQueryにアップロード

In [531]:
numpy_to_bigquery_type = {
    'bool': 'BOOL',
    'int8': 'INT64',  
    'int16': 'INT64',
    'int32': 'INT64',
    'int64': 'INT64',
    'uint8': 'INT64', 
    'uint16': 'INT64',
    'uint32': 'INT64',
    'uint64': 'INT64', 
    'float16': 'FLOAT64',
    'float32': 'FLOAT64',
    'float64': 'FLOAT64',
    'complex64': 'BIGNUMERIC', 
    'complex128': 'BIGNUMERIC',
    'object': 'STRING', 
    'datetime64[ns]': 'TIMESTAMP',
    'timedelta64[ns]': 'INT64',
    'str': 'STRING',
    'bytes': 'BYTES'
}

In [513]:
client = bigquery.Client(project=PROJECT_ID)

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

In [557]:
# カラムdescriptionをファイルからダウンロード
def get_column_description(file_name,DESC_COL_DIR = DESC_COL_DIR):
    blob = bucket.blob(f"{DESC_COL_DIR}/{file_name}")
    data = blob.download_as_string()

    try:
        df_desc_col = pd.read_excel(io.BytesIO(data))
    except Exception as e:
        print(f"Error reading Excel file: {e}")
        exit(1)
        
    return df_desc_col


# カラムdescriptionの辞書を作成
def create_column_desc_dict(df, column_key="項目名称", column_value="コ ー ド"):
    column_desc_dict = {}
    column = rename_columns(df[column_key])
    column_desc = df[column_value]
    
    for i in range(len(df[column_key])):
        column_desc_dict[column[i]] = column_desc[i]
    
    for key, value in column_desc_dict.items():
        if type(value) == float:
            if math.isnan(value):
                column_desc_dict[key] = str(' ')
                
    return column_desc_dict


# カラムデータタイプのリスト作成
def get_bq_dtype_list(data_list):
    column_name_list = data_list.columns.tolist()
    dtypes = data_list.dtypes.tolist()

    dtype_list = []
    for dtype in dtypes:
     dtype_list.append(dtype.name)

    bq_dtype_list = []
    for numpy_type in dtype_list:
        bq_dtype = numpy_to_bigquery_type.get(numpy_type)
        bq_dtype_list.append(bq_dtype)
        
    return bq_dtype_list


# テーブルスキーマ、テーブルの作成＋データアップロード
def create_table(table_id, column_desc_dict, bq_dtype_list):
    schema=[]
    
    for j,key in enumerate(column_desc_dict):
        schema.append(SchemaField(key, field_type=bq_dtype_list[j] ,description=column_desc_dict[key]))
    try:
        table_ref = client.get_table(table_id)
        print(f"Table {table_id} already exists.")
    except Exception as e:
        try:
            table = bigquery.Table(table_id, schema=schema)
            table = client.create_table(table)  
            print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
        except Exception as e:
            print(f"Error creating table: {e}")
            
    table_path = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID_LIST[i]}"
    job_config = bigquery.LoadJobConfig(
        autodetect=False,  
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  
        source_format=bigquery.SourceFormat.CSV,
        schema=schema    
    )
    try: 
        job = client.load_table_from_dataframe(
            table_data_list[i], table_path, job_config=job_config
        ) 
        job.result() 
        print(f"データのアップロードが完了しました。テーブル： {TABLE_ID_LIST[i]}")
    except Exception as e:
        print(f"Error creating table: {e}")

In [561]:
for i in range(len(DESC_COL_FILE_NAME_LIST)):
    # カラムの説明データの作成
    df_desc_col = get_column_description(DESC_COL_FILE_NAME_LIST[i])
    column_desc_dict = create_column_desc_dict(df_desc_col)

    # カラムデータタイプのリスト作成
    bq_dtype_list = get_bq_dtype_list(table_data_list[i])

    # テーブル作成
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID_LIST[i]}"
    create_table(table_id, column_desc_dict, bq_dtype_list)

Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.CIF別取引属性_個人
データのアップロードが完了しました。テーブル： CIF別取引属性_個人
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.CRM_交渉履歴
データのアップロードが完了しました。テーブル： CRM_交渉履歴
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.VISAデビット契約情報
データのアップロードが完了しました。テーブル： VISAデビット契約情報
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.チャネル別利用状況
データのアップロードが完了しました。テーブル： チャネル別利用状況
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.保険元帳
データのアップロードが完了しました。テーブル： 保険元帳
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.入払情報_日次
データのアップロードが完了しました。テーブル： 入払情報_日次
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.名寄せ取引属性_日次
データのアップロードが完了しました。テーブル： 名寄せ取引属性_日次
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.完済融資バッチキー
データのアップロードが完了しました。テーブル： 完済融資バッチキー
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.定期預金元帳
データのアップロードが完了しました。テーブル： 定期預金元帳
Created table qwiklabs-asl-02-9dacbbe2194b.ASL_dataset4.為替取引明細
データのアップロードが完了しました。テーブル： 為替取引明細
Created table qw

In [174]:
# for i in range(len(TABLE_ID_LIST)):
#     table_id = TABLE_ID_LIST[i]
#     desc_col_file_name = f"{DESC_COL_FOLDER_NAME}.{DESC_COL_FILE_NAME_LIST[i]}"
    
#     df_desc_col = get_column_description(desc_col_file_name_list)
#     column_desc_dict = create_column_desc_dict(df_desc_col)
    
#     for key, value in column_desc_dict.items():
#         schema = ["name"=key, "description"=value]
#         try:
#             table_ref = client.get_table(f"{PROJECT_ID}.{DATASET_ID}.{table_id}")
#             print(f"Table {table_id} already exists.")
#         except Exception as e:
#             try:
#                 table = bigquery.Table(f"{PROJECT_ID}.{DATASET_ID}.{table_id}", schema=schema)
#                 table = client.create_table(table)  
#                 print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
#             except Exception as e:
#                 print(f"Error creating table: {e}")


Table CIF別取引属性_個人 already exists.
Table CRM_交渉履歴 already exists.
Table VISAデビット契約情報 already exists.
Table チャネル別利用状況 already exists.
Table 保険元帳 already exists.
Created table qwiklabs-asl-02-9dacbbe2194b.project.入払情報_日次
Table 名寄せ取引属性_日次 already exists.
Table 完済融資バッチキー already exists.
Table 定期預金元帳 already exists.
Table 為替取引明細 already exists.
Table 融資バッチキー already exists.
Table 融資ローン元帳 already exists.


In [545]:
# for i in range(len(TABLE_ID_LIST)):
#     table_path = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID_LIST[i]}"
#     job_config = bigquery.LoadJobConfig(
#     autodetect=False,  
#     write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  
#     source_format=bigquery.SourceFormat.CSV, 
#     )
#     try: 
#         job = client.load_table_from_dataframe(
#             table_data_list[i], table_path, job_config=job_config
#         ) 
#         job.result() 
#         print(f"データのアップロードが完了しました。テーブル： {TABLE_ID_LIST[i]}")
#     except Exception as e:
#         print(f"Error creating table: {e}")

データのアップロードが完了しました。テーブル： CIF別取引属性_個人
データのアップロードが完了しました。テーブル： CRM_交渉履歴
データのアップロードが完了しました。テーブル： VISAデビット契約情報
データのアップロードが完了しました。テーブル： チャネル別利用状況
データのアップロードが完了しました。テーブル： 保険元帳
データのアップロードが完了しました。テーブル： 入払情報_日次
データのアップロードが完了しました。テーブル： 名寄せ取引属性_日次
データのアップロードが完了しました。テーブル： 完済融資バッチキー
データのアップロードが完了しました。テーブル： 定期預金元帳
データのアップロードが完了しました。テーブル： 為替取引明細
データのアップロードが完了しました。テーブル： 融資バッチキー
データのアップロードが完了しました。テーブル： 融資ローン元帳


## カラムdescriptionの追加

In [358]:
# client = bigquery.Client()
# table_ref = client.get_table(f"{DATASET_ID}.{TABLE_ID_LIST[0]}")

# new_schema = []
# for field in table_ref.schema:
#     field.description = 'test'
#     field.description = column_desc_dict.get(field.name)
#     new_schema.append(field)

# table_ref.schema = new_schema

# # テーブルを更新
# table = client.update_table(table_ref, ["schema"])  # 更新する属性を指定

# print(f"Table {table_ref.reference} updated successfully.")

AttributeError: can't set attribute 'description'