In [1]:
import os
import time
from dotenv import load_dotenv
from tuya_connector import TuyaOpenAPI
import mysql.connector

## 환경 변수 로드

In [2]:
load_dotenv()

True

## Tuya API 설정

In [3]:
API_ENDPOINT = os.getenv('API_ENDPOINT')
ACCESS_ID = os.getenv('ACCESS_ID')
ACCESS_SECRET = os.getenv('ACCESS_SECRET')
DEVICE_ID = os.getenv('DEVICE_ID_ONLINE_8IN1_TESTER')

## Tuya API 연결 설정

In [4]:
openapi = TuyaOpenAPI(API_ENDPOINT, ACCESS_ID, ACCESS_SECRET)
openapi.connect()

{'result': {'access_token': '6244ff99094138b860b96745f96db54c',
  'expire_time': 2661,
  'refresh_token': 'fcda7e296c5fac7cdd9f57594e380cce',
  'uid': 'bay1732234083352wqie'},
 'success': True,
 't': 1733294353587,
 'tid': '79dde419b20a11efa2cae6bb15587f28'}

## MySQL 연결

In [5]:
def connect_mysql():
    return mysql.connector.connect(
        host=os.getenv('MYSQL_HOST'),
        port=int(os.getenv('MYSQL_PORT', 3306)),
        user=os.getenv('MYSQL_USER'),
        password=os.getenv('MYSQL_PASSWORD'),
        database=os.getenv('MYSQL_DATABASE')
    )

In [6]:
mysql_connection = connect_mysql()
cursor = mysql_connection.cursor()

## MySQL 연결 상태를 확인하고 끊어진 경우 재연결

In [7]:
def check_mysql_connection():
    """MySQL 연결 상태를 확인하고 끊어진 경우 재연결"""
    global mysql_connection, cursor
    try:
        mysql_connection.ping(reconnect=True, attempts=3, delay=2)
    except mysql.connector.Error:
        print("MySQL connection lost. Reconnecting...")
        mysql_connection = connect_mysql()
        cursor = mysql_connection.cursor()

## Tuya API에서 데이터를 가져오는 함수

In [8]:
def fetch_tuya_data():
    """Tuya API에서 데이터를 가져오는 함수"""
    try:
        data = openapi.get(f"/v2.0/cloud/thing/{DEVICE_ID}/shadow/properties")
        return {prop['code']: prop['value'] for prop in data['result']['properties'] if 'current' in prop['code']}
    except Exception as e:
        print(f"Error fetching data from Tuya API: {e}")
        return None

In [9]:
def save_to_mysql(result, timestamp):
    """MySQL에 데이터를 저장하는 함수"""
    check_mysql_connection()  # 연결 상태 확인 및 복구
    try:
        query = """
            INSERT INTO tuya_8in1 (t, temp_current, ph_current, tds_current, ec_current, salinity_current, pro_current, orp_current, cf_current, rh_current)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        cursor.execute(query, (timestamp, *[result.get(key, 0) for key in ["temp_current", "ph_current", "tds_current", "ec_current", "salinity_current", "pro_current", "orp_current", "cf_current", "rh_current"]]))
        mysql_connection.commit()
    except mysql.connector.Error as err:
        print(f"MySQL Error: {err}")

In [10]:
# 주기적 데이터 수집 및 저장 루프
last_time = time.time()

try:
    while True:
        if time.time() - last_time >= 10:
            result = fetch_tuya_data()
            if result:
                timestamp = result.pop('t', time.time() * 1000)
                save_to_mysql(result, timestamp)
            last_time = time.time()

except KeyboardInterrupt:
    print("프로그램이 중지되었습니다.")

finally:
    # 자원 해제
    cursor.close()
    mysql_connection.close()
    print("Connections closed.")

프로그램이 중지되었습니다.
Connections closed.
