# ETL Pipeline for Tuya Energy Consumption

### Index

- Install requierements
- Import libraries and setup key variables
- Get Tuya devices information and data
- Load data into Data Warehouse (Postgres)

## Install requierements

In [None]:
pip install -r requirements.txt

## Import libraries and setup key variables
Remember to add you own credentials in the .env file for them to be loaded here

In [1]:
import datetime, os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import pandas as pd
import tinytuya

# Load .env file credentials
load_dotenv()

# Database connection
host = os.getenv('POSTGRES_HOST')
port = os.getenv('POSTGRES_PORT')
database = os.getenv('POSTGRES_DATABASE')
username = os.getenv('POSTGRES_USERNAME')
password = os.getenv('POSTGRES_PASSWORD')
connection = f'postgresql://{username}:{password}@{host}:{port}/{database}'

# Tuya connection
tuya_api_endpoint = os.getenv('TUYA_API_ENDPOINT')
tuya_access_id = os.getenv('TUYA_ACCESS_ID')
tuya_access_key = os.getenv('TUYA_ACCESS_KEY')

# Connecto to Tuya API
tuya_cloud = tinytuya.Cloud(apiRegion = tuya_api_endpoint, apiKey = tuya_access_id, apiSecret = tuya_access_key)

## Get Tuya devices information and data

In [2]:
# Create an empty list to receive all data from the devices
all_data_devices = []

# Get information about all devices in Tuya
devices = tuya_cloud.getdevices()

# Get a list with only name and id of each device
list_devices = [(device['name'], device['id']) for device in devices]

# Extract power and timestamp for each connected device of the list
for device in list_devices:
    if tuya_cloud.getconnectstatus(device[1]): # If it is not connected it is ignored
        status_device = tuya_cloud.getstatus(device[1]) # Get information based on id
        power = status_device['result'][4]['value']/10 # Get the power
        timestamp = datetime.datetime.fromtimestamp(status_device['t']/1000) # Get the timestamp

        all_data_devices.append({'device_id': device[1], 'name': device[0], 'power': power, 'timestamp':timestamp})

df_energy_devices = pd.DataFrame(all_data_devices)

## Load data into Data Warehouse (Postgres)

In [3]:
# Set Table and schema names in Postgres
TABLE = 'energy_consumption'
SCHEMA = 'tuya'

# Connect to database and upload all new logs into table
engine = create_engine(connection)
with engine.connect() as conn:

# Start a new transaction
    trans = conn.begin()

    try:
        # Load all new activity into postgres
        df_energy_devices.to_sql(name = TABLE, schema = SCHEMA, con = conn, if_exists='append', index=False)
        # Commit the transaction
        trans.commit()

    except Exception as e:
        # Rollback the transaction on exception
        print('!!! [ERROR IN DATABASE QUERIES] !!!')
        trans.rollback()
        print('Transaction has been rolled back')
        print(f'Error occurred during transaction:\n{e}')
        raise