> Step1_現在時刻データを取得してDB(SQLite)に保存

In [None]:
import sqlite3
from datetime import datetime

# データベースに接続し、カーソルを作成
conn = sqlite3.connect('sample.db')
c = conn.cursor()

# テーブルを作成（存在しない場合）
c.execute('''CREATE TABLE IF NOT EXISTS data_log (timestamp TEXT)''')
conn.commit()

# 現在時刻を取得してデータベースに保存
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"データを取得しています... {current_time}")
c.execute("INSERT INTO data_log (timestamp) VALUES (?)", (current_time,))
conn.commit()

# データベース接続を閉じる
conn.close()

>Step2_コード工夫で定期実行

In [1]:
import sqlite3
from datetime import datetime
import time

# データベースに接続し、カーソルを作成
conn = sqlite3.connect('sample.db')
c = conn.cursor()

# テーブルを作成（存在しない場合）
c.execute('''CREATE TABLE IF NOT EXISTS data_log (timestamp TEXT)''')
conn.commit()

# 10秒間、1秒ごとに現在時刻を取得してデータベースに保存
start_time = time.time()
while time.time() - start_time < 10:
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"データを取得しています... {current_time}")
    c.execute("INSERT INTO data_log (timestamp) VALUES (?)", (current_time,))
    conn.commit()
    time.sleep(1)

# データベース接続を閉じる
conn.close()

データを取得しています... 2024-08-04 14:23:44
データを取得しています... 2024-08-04 14:23:46
データを取得しています... 2024-08-04 14:23:47
データを取得しています... 2024-08-04 14:23:48
データを取得しています... 2024-08-04 14:23:49
データを取得しています... 2024-08-04 14:23:50
データを取得しています... 2024-08-04 14:23:51
データを取得しています... 2024-08-04 14:23:52
データを取得しています... 2024-08-04 14:23:53
データを取得しています... 2024-08-04 14:23:54


>Step3_関数化＆scheduleライブラリを使ってコードの可読性と再利用性を高める

In [17]:
import sqlite3
from datetime import datetime
import schedule
import time

# データベースに接続し、カーソルを作成
conn = sqlite3.connect('sample.db')
c = conn.cursor()

# テーブルを作成（存在しない場合）
c.execute('''CREATE TABLE IF NOT EXISTS data_log (timestamp TEXT)''')
conn.commit()

# 現在時刻を取得してデータベースに保存する関数
def fetch_data():
    # 現在時刻を取得
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"データを取得しています... {current_time}")
    # データベースに保存
    c.execute("INSERT INTO data_log (timestamp) VALUES (?)", (current_time,))
    conn.commit()

# 1秒ごとにfetch_data関数を実行するようにスケジュール
schedule.every(1).seconds.do(fetch_data)

# 10秒間スケジュールを実行
start_time = time.time()
while time.time() - start_time < 10:
    schedule.run_pending()
    time.sleep(1)

# データベース接続を閉じる
conn.close()

データを取得しています... 2024-08-04 14:11:41
データを取得しています... 2024-08-04 14:11:41
データを取得しています... 2024-08-04 14:11:41
データを取得しています... 2024-08-04 14:11:42
データを取得しています... 2024-08-04 14:11:43
データを取得しています... 2024-08-04 14:11:44
データを取得しています... 2024-08-04 14:11:44
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:45
データを取得しています... 2024-08-04 14:11:46
データを取得しています... 2024-08-04 14:11:46
データを取得しています... 2024-08-04 14:11:46
データを取得しています... 2024-08-04 14:11:46
データを取得しています... 2024-08-04 14:11:47
データを取得しています... 2024-08-04 14:11:48
データを取得しています... 2024-08-04 14:11:49
データを取得しています... 2024-08-04 14:11:49
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-08-04 14:11:50
データを取得しています... 2024-

>Step4_コード整理(関数化)を進める

In [6]:
import sqlite3
from datetime import datetime
import time
import schedule
import threading

# データベースに接続し、テーブルを作成
def setup_database():
    conn = sqlite3.connect('sample.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS data_log (timestamp TEXT)''')
    conn.commit()
    conn.close()

# データを取得してデータベースに保存
def fetch_data():
    global is_running
    if is_running:
        print("前回の実行がまだ完了していません。スキップします。")
        return
    is_running = True
    conn = sqlite3.connect('sample.db')
    c = conn.cursor()
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(f"データを取得しています... {current_time}")
    c.execute("INSERT INTO data_log (timestamp) VALUES (?)", (current_time,))
    conn.commit()
    conn.close()
    is_running = False

# スケジュールを設定
def run_schedule():
    while True:
        schedule.run_pending()
        time.sleep(1)

# メイン関数
def main():
    global is_running
    is_running = False
    setup_database()
    schedule.every(1).seconds.do(fetch_data)

    # スケジュールを別スレッドで実行
    thread = threading.Thread(target=run_schedule)
    thread.start()

    # 10秒後にスケジュールを停止
    time.sleep(10)
    schedule.clear()

if __name__ == "__main__":
    main()
print("スケジュールを停止しました。")

データを取得しています... 2024-08-04 14:34:58
データを取得しています... 2024-08-04 14:34:59
データを取得しています... 2024-08-04 14:35:00
データを取得しています... 2024-08-04 14:35:01
データを取得しています... 2024-08-04 14:35:02
データを取得しています... 2024-08-04 14:35:03
前回の実行がまだ完了していません。スキップします。
スケジュールを停止しました。
