In [None]:
import os
import time
import requests
import pandas as pd
import yfinance as yf
from google.cloud import bigquery
from google.cloud import storage
import datetime as dt
from tqdm import tqdm
from google.oauth2 import service_account

class GCSBigQueryFacade:
    def __init__(self, project_id, dataset_name, table_name, bucket_name):
        self.project_id = project_id
        self.dataset_name = dataset_name
        self.table_name = table_name
        self.bucket_name = bucket_name

        service_account_info = {
            "type": "service_account",
            "project_id": os.environ.get("GCP_PROJECT_ID"),
            "private_key_id": os.environ.get("GCP_PRIVATE_KEY_ID"),
            "private_key": os.environ.get("GCP_PRIVATE_KEY").replace('\\n', '\n'),
            "client_email": os.environ.get("GCP_CLIENT_EMAIL"),
            "client_id": os.environ.get("GCP_CLIENT_ID"),
            "auth_uri": os.environ.get("GCP_AUTH_URI"),
            "token_uri": os.environ.get("GCP_TOKEN_URI"),
            "auth_provider_x509_cert_url": os.environ.get("GCP_AUTH_PROVIDER_X509_CERT_URL"),
            "client_x509_cert_url": os.environ.get("GCP_CLIENT_X509_CERT_URL")
        }
        credentials = service_account.Credentials.from_service_account_info(service_account_info)
        
        self.bq_client = bigquery.Client(credentials=credentials, project=service_account_info["project_id"])
        self.storage_client = storage.Client(credentials=credentials, project=service_account_info["project_id"])

    def get_max_date_from_bq(self):
        query = f"""
            SELECT MAX(Date) as max_date 
            FROM `{self.project_id}.{self.dataset_name}.{self.table_name}`
        """
        query_job = self.bq_client.query(query)
        results = query_job.result()
        for row in results:
            return row['max_date']

    def upload_to_gcs(self, local_file_path, destination_blob_name):
        bucket = self.storage_client.bucket(self.bucket_name)
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(local_file_path)
        tqdm.write(f"File {local_file_path} uploaded to {destination_blob_name}.")

    def load_data_to_bigquery(self, source_uri):
        dataset_ref = self.bq_client.dataset(self.dataset_name)
        table_ref = dataset_ref.table(self.table_name)

        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.PARQUET,
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND
        )
        
        load_job = self.bq_client.load_table_from_uri(
            source_uri, table_ref, job_config=job_config
        )

        # ロードジョブが完了するまで待つ
        load_job.result()

        tqdm.write(f"Data loaded into BigQuery table {self.dataset_name}.{self.table_name} from {source_uri}.")

    def read_parquet_from_gcs(self, file_name):
        bucket = self.storage_client.bucket(self.bucket_name)
        blob = bucket.blob(file_name)
        
        local_file_path = f"./{file_name}"
        blob.download_to_filename(local_file_path)
        
        df = pd.read_parquet(local_file_path, engine='pyarrow')
        
        return df

    
def suppress_yfinance_warnings():
    import logging
    yf_logger = logging.getLogger("yfinance")
    yf_logger.setLevel(logging.ERROR)

def stocklist():
    
    PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
    DATASET_NAME = 'stock_dataset'
    BUCKET_NAME = 'stock-data-bucket_hopop'
    TABLE_NAME = 'stock_data'
    # stocklist_file_name = "stocklist.parquet"
    gcs_bq = GCSBigQueryFacade(PROJECT_ID, DATASET_NAME, TABLE_NAME, BUCKET_NAME)
    
    url = "https://www.jpx.co.jp/markets/statistics-equities/misc/tvdivq0000001vg2-att/data_j.xls"
    r = requests.get(url)
    with open('data_j.xls', 'wb') as output:
        output.write(r.content)
    stocklist = pd.read_excel("./data_j.xls")
    stocklist = stocklist[stocklist["市場・商品区分"].isin(["プライム（内国株式）", "グロース（内国株式）", "スタンダード（内国株式）"])]
    stocklist['コード'] = stocklist['コード'].astype(str)
    stock_names_df = stocklist[['コード', '銘柄名']].rename(columns={'コード': 'code', '銘柄名': 'name'})
    
    # すべての銘柄データを一度にまとめてParquetファイルとして保存
    local_file_name = f"stocklist.parquet"
    local_file_path = f"./output/{local_file_name}"
    os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
    
    # Parquet形式で保存
    stock_names_df.to_parquet(local_file_path, engine='pyarrow', index=False)
    
    # GCSへのアップロード
    gcs_bq.upload_to_gcs(local_file_path, local_file_name)
    
    # # BigQueryへのデータロード
    # source_uri = f"gs://{BUCKET_NAME}/{local_file_name}"
    # gcs_bq.load_data_to_bigquery(source_uri)

def main():
    suppress_yfinance_warnings()
    
    PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
    DATASET_NAME = 'stock_dataset'
    BUCKET_NAME = 'stock-data-bucket_hopop'
    TABLE_NAME = 'stock_data'
    gcs_bq = GCSBigQueryFacade(PROJECT_ID, DATASET_NAME, TABLE_NAME, BUCKET_NAME)
    
    max_date = gcs_bq.get_max_date_from_bq()
    max_date = pd.to_datetime(max_date).date()
    START_DATE = max_date + dt.timedelta(days=1)
    END_DATE = dt.date.today()
    
    if START_DATE >= END_DATE:
        tqdm.write("最新データが既に存在します。新しいデータはありません。")
        return
    
    # ParquetファイルをGCSから読み込む
    stock_names_df = gcs_bq.read_parquet_from_gcs("stocklist.parquet")
    print(f'{START_DATE} ~ {END_DATE}')
    
    
    combined_df = pd.DataFrame()
    # tqdmを使用して進捗を可視化（バーを最上部に固定）
    progress_bar = tqdm(stock_names_df.iterrows(), total=len(stock_names_df), ncols=100, leave=True, position=0)
    
    for index, row in progress_bar:
        stock_code = str(row['code']).strip()
        ticker = f"{stock_code}.T"
        
        df = yf.download(ticker, start=START_DATE, end=END_DATE)
    
        if df.empty:
            tqdm.write(f"No data found for {ticker}. Skipping...")
        else:
            df.reset_index(inplace=True)
            df = df.rename(columns={
                'Date': 'Date',
                'Open': 'Open',
                'High': 'High',
                'Low': 'Low',
                'Close': 'Close',
                'Adj Close': 'Adj_Close',
                'Volume': 'Volume'
            })
            df['Stock_Code'] = stock_code
    
            df = df[['Date', 'Stock_Code', 'Open', 'High', 'Low', 'Close', 'Adj_Close', 'Volume']]
            df['Date'] = pd.to_datetime(df['Date']).dt.strftime('%Y-%m-%d')
            
            combined_df = pd.concat([combined_df, df], ignore_index=True)
        
        time.sleep(1)
    
    progress_bar.close()
    
    if not combined_df.empty:
        # すべての銘柄データを一度にまとめてParquetファイルとして保存
        local_file_name = f"combined_stock_data.parquet"
        local_file_path = f"./output/{local_file_name}"
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        
        # Parquet形式で保存
        combined_df.to_parquet(local_file_path, engine='pyarrow', index=False)
        
        # GCSへのアップロード
        combined_file_name = "combined_stock_data.parquet"
        gcs_bq.upload_to_gcs(local_file_path, combined_file_name)
        
        # BigQueryへのデータロード
        source_uri = f"gs://{BUCKET_NAME}/{combined_file_name}"
        gcs_bq.load_data_to_bigquery(source_uri)
    else:
        tqdm.write("combined_dfが空です。処理をスキップします。")

if __name__ == "__main__":
    stocklist()
    main()

File ./output/stocklist.parquet uploaded to stocklist.parquet.
2024-10-26 ~ 2024-11-24


[*********************100%***********************]  1 of 1 completed       | 0/3832 [00:00<?, ?it/s]
[*********************100%***********************]  1 of 1 completed/3832 [00:01<1:30:42,  1.42s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:02<1:21:08,  1.27s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:03<1:18:57,  1.24s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:04<1:17:20,  1.21s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:06<1:16:28,  1.20s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:07<1:15:53,  1.19s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:08<1:15:55,  1.19s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:09<1:18:06,  1.23s/it]
[*********************100%***********************]  1 of 1 completed/3832 [00:10<1:16:37,  

No data found for 1939.T. Skipping...


[*********************100%***********************]  1 of 1 completed/3832 [03:22<2:00:10,  1.97s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:23<1:46:39,  1.75s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:24<1:36:56,  1.59s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:26<1:30:48,  1.49s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:27<1:28:04,  1.44s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:28<1:25:57,  1.41s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:30<1:24:26,  1.38s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:31<1:21:23,  1.33s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:32<1:18:49,  1.29s/it]
[*********************100%***********************]  1 of 1 completed/3832 [03:33<1:17:50,  

In [None]:
# from google.cloud import bigquery
# from google.cloud import storage

# class Gcs_client:
#     def __init__(self):
#         self.client = storage.Client()

#     def create_bucket(self, bucket_name):
#         try:
#             bucket = self.client.get_bucket(bucket_name)
#             print(f"Bucket {bucket_name} already exists.")
#         except storage.exceptions.NotFound:
#             bucket = self.client.create_bucket(bucket_name)
#             print(f"Bucket {bucket_name} created.")

#     def list_all_objects(self, bucket_name):
#         bucket = self.client.bucket(bucket_name)
#         blobs = bucket.list_blobs()
#         return [blob.name for blob in blobs]

#     def upload_gcs(self, bucket_name, local_file_path, destination_blob_name):
#         bucket = self.client.bucket(bucket_name)
#         blob = bucket.blob(destination_blob_name)
#         blob.upload_from_filename(local_file_path)
#         print(f"Uploaded {local_file_path} to gs://{bucket_name}/{destination_blob_name}")

#     def load_data_to_bigquery(self, source_uri):
#         dataset_ref = self.bq_client.dataset(self.dataset_name)
#         table_ref = dataset_ref.table(self.table_name)

#         job_config = bigquery.LoadJobConfig(
#             source_format=bigquery.SourceFormat.PARQUET,
#             write_disposition=bigquery.WriteDisposition.WRITE_APPEND
#         )
        
#         load_job = self.bq_client.load_table_from_uri(
#             source_uri, table_ref, job_config=job_config
#         )

#         # ロードジョブが完了するまで待つ
#         load_job.result()

#         tqdm.write(f"Data loaded into BigQuery table {self.dataset_name}.{self.table_name} from {source_uri}.")ss

# class Bigquery_client:
#     def __init__(self):
#         self.client = bigquery.Client()