<a href="https://colab.research.google.com/github/98672794/colab/blob/main/%E5%B9%A3%E6%AD%B7%E5%8F%B20506aiking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# https://chat.openai.com/share/385cf2f2-6d58-49a6-a6c3-6cf843432263
# https://chat.openai.com/share/05762a39-5cf3-4e56-a89f-24c384d87da2

# 歷史加密貨幣
# https://github.com/David-Woroniuk/Historic_Crypto
!pip install Historic-Crypto

# 數據庫庫
!pip install db-sqlite3

# 以表格的形式打印数据
!pip install tabulate

# 上传文件到Google Drive
# https://chat.openai.com/share/05762a39-5cf3-4e56-a89f-24c384d87da2
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

In [None]:
# G进行身份验证
from google.colab import auth
auth.authenticate_user()

In [None]:
# 33版 $$$$$$$$$$$$$$$$$$$$

# -*- coding: utf-8 -*-

# 取幣歷
import requests
import json
import time
import pandas as pd
from datetime import datetime, timedelta
import sqlite3
import sys
from random import randint

# Google Drive上传下载文件
import os
from google.colab import auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.http import MediaFileUpload





class HistoricalData(object):
    """
    此類提供了一個方法，用於在用戶指定的時間段內收集指定加密貨幣的歷史價格數據。該類利用了CoinBase Pro API來提取歷史數據，提供了一種高效的數據提取方法。
    請注意，歷史價格數據可能不完整，因為當沒有可用的數據時（Coinbase Pro API文檔）不會發布數據。
    :param: ticker: 單一的加密貨幣代碼。 (str)
    :param: granularity: 價格數據的頻率，以秒為單位，可以是：60、300、900、3600、21600、86400之一。 (int)
    :param: start_date: 格式為YYYY-MM-DD-HH-MM的日期字符串。 (str)
    :param: end_date: 格式為YYYY-MM-DD-HH-MM的日期字符串，默認為現在。 (str)
    :param: verbose: 提取過程中的打印，默認為True。 (bool)
    :returns: data: 包含請求的加密貨幣數據的Pandas DataFrame。 (pd.DataFrame)
    """

    def __init__(self,ticker,granularity,start_date,end_date=None,verbose=True):
        if verbose:
            print("檢查輸入參數是否符合格式要求。")
        if not all(isinstance(v, str) for v in [ticker, start_date]):
            raise TypeError("ticker和start_date參數必須是字符串或None類型。")
        if not isinstance(end_date, (str, type(None))):
            raise TypeError("end_date參數必須是字符串或None類型。")
        if not isinstance(verbose, bool):
            raise TypeError("verbose參數必須是布林值。")
        if isinstance(granularity, int) is False:
            raise TypeError("granularity必須是整數對象。")
        if granularity not in [60, 300, 900, 3600, 21600, 86400]:
            raise ValueError("granularity參數必須是60、300、900、3600、21600或86400秒之一。")

        if not end_date:
            end_date = datetime.today().strftime("%Y-%m-%d-%H-%M")
        else:
            end_date_datetime = datetime.strptime(end_date, '%Y-%m-%d-%H-%M')
            start_date_datetime = datetime.strptime(start_date, '%Y-%m-%d-%H-%M')
            if start_date_datetime >= end_date_datetime:
                raise ValueError("end_date參數不能早於start_date參數。")

        self.ticker = ticker
        self.granularity = granularity
        self.start_date = start_date
        self.start_date_string = None
        self.end_date = end_date
        self.end_date_string = None
        self.verbose = verbose

    def _ticker_checker(self):
        """此輔助函數檢查CoinBase Pro API上是否有這個代碼。"""
        if self.verbose:
            print("檢查用戶提供的代碼是否在CoinBase Pro API上可用。")

        tkr_response = requests.get("https://api.pro.coinbase.com/products")
        if tkr_response.status_code in [200, 201, 202, 203, 204]:
            if self.verbose:
                print('連接到CoinBase Pro API。')
            response_data = pd.json_normalize(json.loads(tkr_response.text))
            ticker_list = response_data["id"].tolist()

        elif tkr_response.status_code in [400, 401, 404]:
            if self.verbose:
                print("狀態碼: {}，到CoinBase Pro API的請求錯誤。".format(tkr_response.status_code))
            sys.exit()
        elif tkr_response.status_code in [403, 500, 501]:
            if self.verbose:
                print("狀態碼: {}，無法連接到CoinBase Pro API。".format(tkr_response.status_code))
            sys.exit()
        else:
            if self.verbose:
                print("狀態碼: {}，連接到CoinBase Pro API時出錯。".format(tkr_response.status_code))
            sys.exit()

        if self.ticker in ticker_list:
            if self.verbose:
                print("在CoinBase Pro API上找到代碼'{}'，繼續提取。".format(self.ticker))
        else:
            raise ValueError("代碼'{}'在CoinBase Pro API上不可用。請使用Cryptocurrencies類識別正確的代碼。".format(self.ticker))

    def _date_cleaner(self, date_time: (datetime, str)):
        """此輔助函數將輸入呈現為API所需格式的日期時間。"""
        if not isinstance(date_time, (datetime, str)):
            raise TypeError("date_time參數必須是datetime類型。")
        if isinstance(date_time, str):
            output_date = datetime.strptime(date_time, '%Y-%m-%d-%H-%M').isoformat()
        else:
            #output_date = date_time.strftime("%Y-%m-%d-%H-%M")
            output_date = date_time.strftime("%Y-%m-%d, %H:%M:%S")
            output_date = output_date[:10] + 'T' + output_date[12:]
        return output_date



    def retrieve_data(self):
        """此函數返回數據。"""
        if self.verbose:
            print("格式化日期。")

        self._ticker_checker()
        self.start_date_string = self._date_cleaner(self.start_date)
        self.end_date_string = self._date_cleaner(self.end_date)
        start = datetime.strptime(self.start_date, "%Y-%m-%d-%H-%M")
        end = datetime.strptime(self.end_date, "%Y-%m-%d-%H-%M")
        request_volume = abs((end - start).total_seconds()) / self.granularity

        if request_volume <= 300:
            response = requests.get(
                "https://api.pro.coinbase.com/products/{0}/candles?start={1}&end={2}&granularity={3}".format(
                    self.ticker,
                    self.start_date_string,
                    self.end_date_string,
                    self.granularity))
            if response.status_code in [200, 201, 202, 203, 204]:
                if self.verbose:
                    print('從Coinbase Pro API檢索數據。')
                data = pd.DataFrame(json.loads(response.text))
                if not data.empty:  # 檢查 DataFrame 是否為空
                    data.columns = ["time", "low", "high", "open", "close", "volume"]
                    data["time"] = pd.to_datetime(data["time"], unit='s')
                    data = data[data['time'].between(start, end)]
                    data.set_index("time", drop=True, inplace=True)
                    data.sort_index(ascending=True, inplace=True)
                    data.drop_duplicates(subset=None, keep='first', inplace=True)
                    if self.verbose:
                        print('返回數據。')
                    return data
                else:
                    if self.verbose:
                        print("API返回的數據為空。")
                    return pd.DataFrame()  # 返回一個空的DataFrame
            else:
                if self.verbose:
                    print("未能從CoinBase Pro API獲取數據。")
                return pd.DataFrame()  # 返回一個空的DataFrame
        else:
            # API限制:
            max_per_mssg = 300
            data = pd.DataFrame()
            for i in range(int(request_volume / max_per_mssg) + 1):
                provisional_start = start + timedelta(0, i * (self.granularity * max_per_mssg))
                provisional_start = self._date_cleaner(provisional_start)
                provisional_end = start + timedelta(0, (i + 1) * (self.granularity * max_per_mssg))
                provisional_end = self._date_cleaner(provisional_end)

                print("臨時開始時間: {}".format(provisional_start))
                print("臨時結束時間: {}".format(provisional_end))
                response = requests.get(
                    "https://api.pro.coinbase.com/products/{0}/candles?start={1}&end={2}&granularity={3}".format(
                        self.ticker,
                        provisional_start,
                        provisional_end,
                        self.granularity))

                if response.status_code in [200, 201, 202, 203, 204]:
                    if self.verbose:
                        print('提取第{}部分的數據（共{}部分）'.format(i + 1, (int(request_volume / max_per_mssg) + 1)))
                    dataset = pd.DataFrame(json.loads(response.text))
                    if not dataset.empty:
                        data = pd.concat([data, dataset], ignore_index=True)
                        time.sleep(randint(0, 2))
                    else:
                        print("""CoinBase Pro API沒有在'{}'開始時提供可用的數據。
                        嘗試較晚的日期:'{}'""".format(self.ticker, self.start_date, provisional_start))
                        time.sleep(randint(0, 2))
                elif response.status_code in [400, 401, 404]:
                    if self.verbose:
                        print("狀態碼: {}，到CoinBase Pro API的請求錯誤。".format(response.status_code))
                    sys.exit()
                elif response.status_code in [403, 500, 501]:
                    if self.verbose:
                        print("狀態碼: {}，無法連接到CoinBase Pro API。".format(response.status_code))
                    sys.exit()
                else:
                    if self.verbose:
                        print("狀態碼: {}，連接到CoinBase Pro API時出錯。".format(response.status_code))
                    sys.exit()
            if not data.empty:  # 檢查 DataFrame 是否為空
                data.columns = ["time", "low", "high", "open", "close", "volume"]
                data["time"] = pd.to_datetime(data["time"], unit='s')
                data = data[data['time'].between(start, end)]
                data.set_index("time", drop=True, inplace=True)
                data.sort_index(ascending=True, inplace=True)
                data.drop_duplicates(subset=None, keep='first', inplace=True)
            return data






class Aki野():

  # 上传文件到Google Drive
  def _上传文到GoogleDrive(G文夾上用,件名):
        print('件名=',件名)

        # 创建Google Drive API客户端
        drive_service = build('drive', 'v3')

        # 查询文件夹中的文件，包括在垃圾箱中的文件
        response = drive_service.files().list(q=f"'{G文夾上用}' in parents and name='{件名}' and trashed=false",
                                              fields="files(id, name)").execute()
        files = response.get('files', [])


        # 打印文件列表
        print("文件夹中的文件：")
        for file in files:
            print(f"文件名: {file['name']}, 文件ID: {file['id']}")


        # 文件名和路径
        file_path = '/content/' + 件名

         # 文件元数据
        file_metadata = {'name': 件名, 'addParents': [G文夾上用]}

        # 创建媒体上传对象
        media = MediaFileUpload(file_path, resumable=True)

        # 如果存在同名文件，则更新该文件；否则，上传新文件。
        if files:
            # 更新文件
            file_id = files[0]['id']
            uploaded_file = drive_service.files().update(
                fileId=file_id,
                body=file_metadata,
                media_body=media,
                addParents=G文夾上用
            ).execute()
            況 = '更新'
        else:
            # 如果文件不存在，则上传新文件
            file_metadata['name'] = 件名
            uploaded_file = drive_service.files().create(
                body=file_metadata,
                media_body=media,
                fields='id'
            ).execute()
            況 = '上传'
        return f'$$$$ {件名} {況} 至 Google Drive 成功 , File ID:', uploaded_file.get('id'), '$$$$'






  # GoogleDrive 只用file_id下载文件
  def _GoogleDrive下载文件(G檔id下用, Col用檔名):

    # 创建Google Drive API客户端
    drive_service = build('drive', 'v3')

    # 要保存到的本地文件路径
    file_path = '/content/' + Col用檔名

    # 下载文件
    request = drive_service.files().get_media(fileId=G檔id下用)
    fh = open(file_path, 'wb')
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()
        print("Download %d%%." % int(status.progress() * 100))

    return file_path,'已保存!!'






  # 找幣料
  def _找歷(始, 幣名, 線期, G文夾上用, G檔id下用, 檔尾字):
      start_date = 始
      幣名2 = 幣名.replace('-', '_')
      下到Col用名 = 幣名2 + 檔尾字 + '.db'

      # 下载 GoogleDrive 數據庫到本機
      print(Aki野._GoogleDrive下载文件(G檔id下用, 下到Col用名))

      # 连接数据库
      conn = sqlite3.connect(下到Col用名)
      # 查询数据库表中是否存在名为 幣名2+檔尾字 的表
      cursor = conn.cursor()
      cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='{}'".format(幣名2+檔尾字))
      table_exists = cursor.fetchone()
      cursor.close()

      # 如果数据库表不存在，则将 start_date 设置为输入的起始日期
      if not table_exists:
          start_date = 始
      else:
          # 查询数据库表中最后一行的日期
          cursor = conn.cursor()
          cursor.execute("SELECT time FROM {} ORDER BY time DESC LIMIT 1".format(幣名2+檔尾字))
          last_date = cursor.fetchone()
          cursor.close()
          # 如果数据库表中存在日期数据，则将 start_date 设置为数据库表中最后一行的日期
          if last_date:
              start_date = last_date[0]

      # 初始化 end_date
      end_date = (pd.to_datetime(start_date) + pd.DateOffset(days=1)).strftime('%Y-%m-%d-%H-%M')

      while start_date <= end_date:
          # 更新下一个循环的结束日期为下一个日期
          end_date = (pd.to_datetime(start_date) + pd.DateOffset(days=1)).strftime('%Y-%m-%d-%H-%M')

          # 格式化 start_date
          start_date = pd.to_datetime(start_date.split()[0]+'-00-00').strftime('%Y-%m-%d-%H-%M')


          # 创建 HistoricalData 实例
          historical_data = HistoricalData(幣名, 線期, start_date, end_date)

          # 获取数据
          data = historical_data.retrieve_data()


          # 检查数据是否为空
          if data.empty:
              # 创建一个只包含日期列的DataFrame
              data = pd.DataFrame({"time": [start_date], "low": "0", "high": "0", "open": "0", "close": "0", "volume": "0"})
              # 将数据保存到数据库
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)
          else:
              # 将数据保存到数据库
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)

          # 將數據保存到數據庫
          if not data.empty:
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)

          # Up到 GoogleDrive
          print(Aki野._上传文到GoogleDrive(G文夾上用,下到Col用名))

          # 更新下一个循环的起始日期为下个月的第一天
          start_date = pd.to_datetime(start_date) + pd.DateOffset(days=1)
          start_date = start_date.strftime('%Y-%m-%d-%H-%M')

          # 等待一段时间，以避免过度请求 API
          time.sleep(5)

      # 關閉數據庫連接
      conn.close()





if __name__ == "__main__":

    # 找coin data
    始 = '2021-04-01-00-00'
    幣名 = 'BTC-USD'
    線期 = 60

    # GitHub仓库中的.db文件的URL
    #owner = "98672794"
    #repo = "-202405"

    # Google Drive下载文件
    G文夾上用 = "1bAuhOS0sVjVo8Zzc0epCcWd2Gxb4yeBZ"  #上
    G檔id下用 = '1CUSJ8cOw47rmO1XxAV3cWxFAAgpE9qRt'  #下

    檔尾字 = '歷'

    Aki野._找歷(始, 幣名, 線期, G文夾上用, G檔id下用, 檔尾字)

!nohup bash ping.sh &

# 33版 $$$$$$$$$$$$$$$$$$$$

# 輔

In [None]:
# 查看本機數據庫
import sqlite3
from tabulate import tabulate

def print_table_in_pages(rows, headers):
    page_size = 100  # 每页显示的行数
    num_pages = (len(rows) + page_size - 1) // page_size  # 计算总页数

    for page_num in range(num_pages):
        start_index = page_num * page_size
        end_index = min((page_num + 1) * page_size, len(rows))

        page_rows = rows[start_index:end_index]
        page_table = tabulate(page_rows, headers=headers, tablefmt="pretty")
        print(page_table)


def input(幣名0,檔尾字,按咩排序):
  幣名 = 幣名0.replace('-', '_')
  數據庫名 = 幣名+檔尾字
  #按咩排序 = 'Timestamp'

  # 連接到數據庫
  conn = sqlite3.connect(數據庫名+'.db')#('BTC_USD_歷.db')

  # 創建一個游標對象
  cursor = conn.cursor()

  # 執行 SQL 查詢語句，選擇所有的數據，按日期排序
  cursor.execute("SELECT * FROM ["+數據庫名+"] ORDER BY "+按咩排序)



  # 檢索查詢結果
  rows = cursor.fetchall()

  # 關閉游標和連接
  cursor.close()
  conn.close()

  # 定义表头
  headers = ["Time", "Low", "High", "Open", "Close", "Volume"]

  # 分页打印表格内容
  print_table_in_pages(rows, headers)

input('BTC-USD','歷','Time')#Time

In [None]:
# https://chat.openai.com/share/4ed79a0d-c4a9-4c3c-b0b5-8a047ab816f0
# cvs轉.db

import pandas as pd
import sqlite3

# 定義每個批次的大小
batch_size = 1000000

# 讀取CSV文件並處理
reader = pd.read_csv('bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv', chunksize=batch_size)

# 創建SQLite數據庫連接
conn = sqlite3.connect('output.db')

for i, chunk in enumerate(reader):
    print(f"正在處理第 {i+1} 批次...")

    # 將Timestamp列轉換為時間格式
    chunk['Timestamp'] = pd.to_datetime(chunk['Timestamp'], unit='s')

    # 調整列的順序
    chunk = chunk[["Timestamp", "Low", "High", "Open", "Close", "Volume_(BTC)"]]

    # 刪除包含NaN值的行
    chunk.dropna(inplace=True)

    # 寫入數據到數據庫
    chunk.to_sql('output', conn, if_exists='append', index=False)

print("轉換完成")

# 關閉數據庫連接
conn.close()


# 備

In [None]:
# 33版 備備備備備備備備備備備備備備備備備備備備備備

# -*- coding: utf-8 -*-

# 取幣歷
import requests
import json
import time
import pandas as pd
from datetime import datetime, timedelta
import sqlite3
import sys
from random import randint

# Google Drive上传下载文件
import os
from google.colab import auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.http import MediaFileUpload





class HistoricalData(object):
    """
    此類提供了一個方法，用於在用戶指定的時間段內收集指定加密貨幣的歷史價格數據。該類利用了CoinBase Pro API來提取歷史數據，提供了一種高效的數據提取方法。
    請注意，歷史價格數據可能不完整，因為當沒有可用的數據時（Coinbase Pro API文檔）不會發布數據。
    :param: ticker: 單一的加密貨幣代碼。 (str)
    :param: granularity: 價格數據的頻率，以秒為單位，可以是：60、300、900、3600、21600、86400之一。 (int)
    :param: start_date: 格式為YYYY-MM-DD-HH-MM的日期字符串。 (str)
    :param: end_date: 格式為YYYY-MM-DD-HH-MM的日期字符串，默認為現在。 (str)
    :param: verbose: 提取過程中的打印，默認為True。 (bool)
    :returns: data: 包含請求的加密貨幣數據的Pandas DataFrame。 (pd.DataFrame)
    """

    def __init__(self,ticker,granularity,start_date,end_date=None,verbose=True):
        if verbose:
            print("檢查輸入參數是否符合格式要求。")
        if not all(isinstance(v, str) for v in [ticker, start_date]):
            raise TypeError("ticker和start_date參數必須是字符串或None類型。")
        if not isinstance(end_date, (str, type(None))):
            raise TypeError("end_date參數必須是字符串或None類型。")
        if not isinstance(verbose, bool):
            raise TypeError("verbose參數必須是布林值。")
        if isinstance(granularity, int) is False:
            raise TypeError("granularity必須是整數對象。")
        if granularity not in [60, 300, 900, 3600, 21600, 86400]:
            raise ValueError("granularity參數必須是60、300、900、3600、21600或86400秒之一。")

        if not end_date:
            end_date = datetime.today().strftime("%Y-%m-%d-%H-%M")
        else:
            end_date_datetime = datetime.strptime(end_date, '%Y-%m-%d-%H-%M')
            start_date_datetime = datetime.strptime(start_date, '%Y-%m-%d-%H-%M')
            if start_date_datetime >= end_date_datetime:
                raise ValueError("end_date參數不能早於start_date參數。")

        self.ticker = ticker
        self.granularity = granularity
        self.start_date = start_date
        self.start_date_string = None
        self.end_date = end_date
        self.end_date_string = None
        self.verbose = verbose

    def _ticker_checker(self):
        """此輔助函數檢查CoinBase Pro API上是否有這個代碼。"""
        if self.verbose:
            print("檢查用戶提供的代碼是否在CoinBase Pro API上可用。")

        tkr_response = requests.get("https://api.pro.coinbase.com/products")
        if tkr_response.status_code in [200, 201, 202, 203, 204]:
            if self.verbose:
                print('連接到CoinBase Pro API。')
            response_data = pd.json_normalize(json.loads(tkr_response.text))
            ticker_list = response_data["id"].tolist()

        elif tkr_response.status_code in [400, 401, 404]:
            if self.verbose:
                print("狀態碼: {}，到CoinBase Pro API的請求錯誤。".format(tkr_response.status_code))
            sys.exit()
        elif tkr_response.status_code in [403, 500, 501]:
            if self.verbose:
                print("狀態碼: {}，無法連接到CoinBase Pro API。".format(tkr_response.status_code))
            sys.exit()
        else:
            if self.verbose:
                print("狀態碼: {}，連接到CoinBase Pro API時出錯。".format(tkr_response.status_code))
            sys.exit()

        if self.ticker in ticker_list:
            if self.verbose:
                print("在CoinBase Pro API上找到代碼'{}'，繼續提取。".format(self.ticker))
        else:
            raise ValueError("代碼'{}'在CoinBase Pro API上不可用。請使用Cryptocurrencies類識別正確的代碼。".format(self.ticker))

    def _date_cleaner(self, date_time: (datetime, str)):
        """此輔助函數將輸入呈現為API所需格式的日期時間。"""
        if not isinstance(date_time, (datetime, str)):
            raise TypeError("date_time參數必須是datetime類型。")
        if isinstance(date_time, str):
            output_date = datetime.strptime(date_time, '%Y-%m-%d-%H-%M').isoformat()
        else:
            #output_date = date_time.strftime("%Y-%m-%d-%H-%M")
            output_date = date_time.strftime("%Y-%m-%d, %H:%M:%S")
            output_date = output_date[:10] + 'T' + output_date[12:]
        return output_date



    def retrieve_data(self):
        """此函數返回數據。"""
        if self.verbose:
            print("格式化日期。")

        self._ticker_checker()
        self.start_date_string = self._date_cleaner(self.start_date)
        self.end_date_string = self._date_cleaner(self.end_date)
        start = datetime.strptime(self.start_date, "%Y-%m-%d-%H-%M")
        end = datetime.strptime(self.end_date, "%Y-%m-%d-%H-%M")
        request_volume = abs((end - start).total_seconds()) / self.granularity

        if request_volume <= 300:
            response = requests.get(
                "https://api.pro.coinbase.com/products/{0}/candles?start={1}&end={2}&granularity={3}".format(
                    self.ticker,
                    self.start_date_string,
                    self.end_date_string,
                    self.granularity))
            if response.status_code in [200, 201, 202, 203, 204]:
                if self.verbose:
                    print('從Coinbase Pro API檢索數據。')
                data = pd.DataFrame(json.loads(response.text))
                if not data.empty:  # 檢查 DataFrame 是否為空
                    data.columns = ["time", "low", "high", "open", "close", "volume"]
                    data["time"] = pd.to_datetime(data["time"], unit='s')
                    data = data[data['time'].between(start, end)]
                    data.set_index("time", drop=True, inplace=True)
                    data.sort_index(ascending=True, inplace=True)
                    data.drop_duplicates(subset=None, keep='first', inplace=True)
                    if self.verbose:
                        print('返回數據。')
                    return data
                else:
                    if self.verbose:
                        print("API返回的數據為空。")
                    return pd.DataFrame()  # 返回一個空的DataFrame
            else:
                if self.verbose:
                    print("未能從CoinBase Pro API獲取數據。")
                return pd.DataFrame()  # 返回一個空的DataFrame
        else:
            # API限制:
            max_per_mssg = 300
            data = pd.DataFrame()
            for i in range(int(request_volume / max_per_mssg) + 1):
                provisional_start = start + timedelta(0, i * (self.granularity * max_per_mssg))
                provisional_start = self._date_cleaner(provisional_start)
                provisional_end = start + timedelta(0, (i + 1) * (self.granularity * max_per_mssg))
                provisional_end = self._date_cleaner(provisional_end)

                print("臨時開始時間: {}".format(provisional_start))
                print("臨時結束時間: {}".format(provisional_end))
                response = requests.get(
                    "https://api.pro.coinbase.com/products/{0}/candles?start={1}&end={2}&granularity={3}".format(
                        self.ticker,
                        provisional_start,
                        provisional_end,
                        self.granularity))

                if response.status_code in [200, 201, 202, 203, 204]:
                    if self.verbose:
                        print('提取第{}部分的數據（共{}部分）'.format(i + 1, (int(request_volume / max_per_mssg) + 1)))
                    dataset = pd.DataFrame(json.loads(response.text))
                    if not dataset.empty:
                        data = pd.concat([data, dataset], ignore_index=True)
                        time.sleep(randint(0, 2))
                    else:
                        print("""CoinBase Pro API沒有在'{}'開始時提供可用的數據。
                        嘗試較晚的日期:'{}'""".format(self.ticker, self.start_date, provisional_start))
                        time.sleep(randint(0, 2))
                elif response.status_code in [400, 401, 404]:
                    if self.verbose:
                        print("狀態碼: {}，到CoinBase Pro API的請求錯誤。".format(response.status_code))
                    sys.exit()
                elif response.status_code in [403, 500, 501]:
                    if self.verbose:
                        print("狀態碼: {}，無法連接到CoinBase Pro API。".format(response.status_code))
                    sys.exit()
                else:
                    if self.verbose:
                        print("狀態碼: {}，連接到CoinBase Pro API時出錯。".format(response.status_code))
                    sys.exit()
            if not data.empty:  # 檢查 DataFrame 是否為空
                data.columns = ["time", "low", "high", "open", "close", "volume"]
                data["time"] = pd.to_datetime(data["time"], unit='s')
                data = data[data['time'].between(start, end)]
                data.set_index("time", drop=True, inplace=True)
                data.sort_index(ascending=True, inplace=True)
                data.drop_duplicates(subset=None, keep='first', inplace=True)
            return data






class Aki野():


  # 上传文件到Google Drive
  def _上传文到GoogleDrive(G文夾上用,件名):

    print('件名=',件名)


    # 创建Google Drive API客户端
    drive_service = build('drive', 'v3')

    # 文件名和路径
    file_path = '/content/' + 件名

    # 文件元数据
    file_metadata = {'name': 件名, 'addParents': [G文夾上用]}

    # 创建媒体上传对象
    media = MediaFileUpload(file_path, resumable=True)

    # 尝试找到现有文件并更新
    existing_file = None
    results = drive_service.files().list(q=f"name='{件名}' and '{G文夾上用}' in parents", fields="files(id)").execute()
    items = results.get('files', [])
    if items:
        existing_file = items[0]
        file_id = existing_file['id']
        # 更新文件
        uploaded_file = drive_service.files().update(
            fileId=file_id,
            body=file_metadata,
            media_body=media
        ).execute()
        return f'$$$$ {件名} 更新至 Google Drive 成功 , File ID:', uploaded_file.get('id') ,'$$$$'


    else:
        # 如果文件不存在，则上传新文件
        uploaded_file = drive_service.files().create(
            body=file_metadata,
            media_body=media,
            fields='id'
        ).execute()
        return f'$$$$ {件名} 上传至 Google Drive 成功 , File ID:', uploaded_file.get('id') ,'$$$$'







  # GoogleDrive 只用file_id下载文件
  def _GoogleDrive下载文件(G檔id下用, Col用檔名):

    # 创建Google Drive API客户端
    drive_service = build('drive', 'v3')

    # 要保存到的本地文件路径
    file_path = '/content/' + Col用檔名

    # 下载文件
    request = drive_service.files().get_media(fileId=G檔id下用)
    fh = open(file_path, 'wb')
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()
        print("Download %d%%." % int(status.progress() * 100))

    return file_path,'已保存!!'






  # 找幣料
  def _找歷(始, 幣名, 線期, G文夾上用, G檔id下用, 檔尾字):
      start_date = 始
      幣名2 = 幣名.replace('-', '_')
      下到Col用名 = 幣名2 + 檔尾字 + '.db'

      # 下载 GoogleDrive 數據庫到本機
      print(Aki野._GoogleDrive下载文件(G檔id下用, 下到Col用名))

      # 连接数据库
      conn = sqlite3.connect(下到Col用名)
      # 查询数据库表中是否存在名为 幣名2+檔尾字 的表
      cursor = conn.cursor()
      cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='{}'".format(幣名2+檔尾字))
      table_exists = cursor.fetchone()
      cursor.close()

      # 如果数据库表不存在，则将 start_date 设置为输入的起始日期
      if not table_exists:
          start_date = 始
      else:
          # 查询数据库表中最后一行的日期
          cursor = conn.cursor()
          cursor.execute("SELECT time FROM {} ORDER BY time DESC LIMIT 1".format(幣名2+檔尾字))
          last_date = cursor.fetchone()
          cursor.close()
          # 如果数据库表中存在日期数据，则将 start_date 设置为数据库表中最后一行的日期
          if last_date:
              start_date = last_date[0]

      # 初始化 end_date
      end_date = (pd.to_datetime(start_date) + pd.DateOffset(days=1)).strftime('%Y-%m-%d-%H-%M')

      while start_date <= end_date:
          # 更新下一个循环的结束日期为下一个日期
          end_date = (pd.to_datetime(start_date) + pd.DateOffset(days=1)).strftime('%Y-%m-%d-%H-%M')

          # 格式化 start_date
          start_date = pd.to_datetime(start_date.split()[0]+'-00-00').strftime('%Y-%m-%d-%H-%M')


          # 创建 HistoricalData 实例
          historical_data = HistoricalData(幣名, 線期, start_date, end_date)

          # 获取数据
          data = historical_data.retrieve_data()


          # 检查数据是否为空
          if data.empty:
              # 创建一个只包含日期列的DataFrame
              data = pd.DataFrame({"time": [start_date], "low": "0", "high": "0", "open": "0", "close": "0", "volume": "0"})
              # 将数据保存到数据库
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)
          else:
              # 将数据保存到数据库
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)

          # 將數據保存到數據庫
          if not data.empty:
              data.to_sql(幣名2+檔尾字, conn, if_exists='append', index=True)

          # Up到 GoogleDrive
          print(Aki野._上传文到GoogleDrive(G文夾上用,下到Col用名))

          # 更新下一个循环的起始日期为下个月的第一天
          start_date = pd.to_datetime(start_date) + pd.DateOffset(days=1)
          start_date = start_date.strftime('%Y-%m-%d-%H-%M')

          # 等待一段时间，以避免过度请求 API
          time.sleep(5)

      # 關閉數據庫連接
      conn.close()





if __name__ == "__main__":

    # 找coin data
    始 = '2021-04-01-00-00'
    幣名 = 'BTC-USD'
    線期 = 60

    # GitHub仓库中的.db文件的URL
    #owner = "98672794"
    #repo = "-202405"

    # Google Drive下载文件
    G文夾上用 = "1UdbBt6DKTcDsWnDdkyBxu7Z5EjSvyrXv"  #上
    G檔id下用 = '1-TlOvrtFWqiPfdFae6WxxYd-TJyz2_tD'  #下

    檔尾字 = '歷'

    Aki野._找歷(始, 幣名, 線期, G文夾上用, G檔id下用, 檔尾字)

!nohup bash ping.sh &

# 33版 備備備備備備備備備備備備備備備備備備備備備備