In [None]:
#| default_exp core

# Core

Useful functions and utilities for LibreGrid.

In [2]:
#| export
import httpx
import asyncio
import os
import requests
import random
import sqlite3
import aiosqlite
import json
from dotenv import load_dotenv
from datetime import datetime, timedelta

API keys are stored in a .env file which is not commited.

It looks like this: 
```
API_SOLAR_KEY='<>'
MPAN_import='<>'
MPAN_export='<>'
username='<>'
phil_key='<>'
serial_number='<>'
```

In [3]:
# Fetching environment variables
load_dotenv()
solar_key = os.environ['API_SOLAR_KEY']

# 1) Solar API tools

Below is code to use the Solar API but this API usage is not free. Instead use the solar response that's saved in the json in the repo.

In [11]:
# Load the JSON file
with open('solar_api_response.json', 'r') as file:
    data = json.load(file)

In [24]:
#| export
def get_solar_building_insights(solar_key, latitude, longitude):
    """
    Call Google Solar building insights API
    """
    url = 'https://solar.googleapis.com/v1/buildingInsights:findClosest'
    params = {
    'location.latitude': latitude,
    'location.longitude': longitude,
    'requiredQuality': 'LOW',
    'key':solar_key
    }
    response = requests.get(url, params=params)
    return response.json()

In [26]:
#| export
def return_energy_solar_api(building_insights):
    """
    Get the solar potential for a building.
    The solar api gives the potential for many different array setups, so we take one somewhere around the third of number of panels.
    """
    third = round(building_insights['solarPotential']['maxArrayPanelsCount']/3)
    
    energy_dc = (item for item in building_insights['solarPotential']['solarPanelConfigs'] if (item["panelsCount"] == third or item["panelsCount"] == third+1 or item["panelsCount"] == third-1))
    
    return list(energy_dc)[0]['yearlyEnergyDcKwh']

# 2) Kraken smart meter reading tools

In [4]:
#| export
def fetch_meter_usage(MPAN_import,serial_number, period_from, period_to):
    # period_from, period_to format: "YYYY-MM-dd hh:mm:ss"
    api_key = os.environ['phil_key']
    url = f"https://api.octopus.energy/v1/electricity-meter-points/{MPAN_import}/meters/{serial_number}/consumption/"
    params = {
        "period_from":f'{period_from}',
        "period_to":f'{period_to}',
        "page_size":"20"
    }
    response = requests.get(url,auth=(api_key,''), params = params)
    return response.json()
    

In [12]:
#| export
def fetch_latest_meter_data(api_key, mpan, serial_num):
    """
    Function to get last hour of meter consumption data
    """ 
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=10)
    url = f"https://api.octopus.energy/v1/electricity-meter-points/{mpan}/meters/{serial_num}/consumption/"
    params = {
        #"period_from":start_time.isoformat(),
        #"period_to":end_time.isoformat(),
        #'group_by':'hour',
        "page_size":"2"
    }
    
    response = requests.get(url, auth=(api_key, ''), params=params)
    if response.status_code == 200:
        data = response.json()
        results = data['results'] if data['results'] else None
        if results:
            for result in results:
                result['mpan_import'] = mpan
                result['serial_number'] = serial_num
        return results
    else:
        print(f"Request failed with status code: {response.status_code}")
        return None
    

In [6]:
#| export
def round_now_to_last_half_hour():
    """
    Helper function to round current time to last half hour
    """
    dt = datetime.now()
    rounded = dt.replace(second=0, microsecond=0)
    if rounded.minute >= 30:
        rounded = rounded.replace(minute=30)
    else:
        rounded = rounded.replace(minute=0)
    return rounded

In [96]:
#| export
def dummy_meter(page_size=2):
    """
    Meter data simulator that just generates random consumption values at every full half hour, at a given interval
    """
    responses = []
    for item in range(0,page_size): 
        start_time = round_now_to_last_half_hour() - timedelta(minutes=30*(item+1))
        end_time = round_now_to_last_half_hour() - timedelta(minutes=30*(item))
        entry = {
            'mpan_import': 'dummy',
            'serial_number': f'dummy:{int(random.uniform(1,100))}',
            'consumption': round(random.uniform(0, 0.3), 3),
            'interval_start': start_time.isoformat(),
            'interval_end': end_time.isoformat()
        }
        responses.append(entry)
    return responses
        
    

# 3) Store meter readings in database tools

In [16]:
#| export
async def insert_meter_data(consumption_list):
    """
    Function to insert data into the libergrid database
    Doing async in case there is a lot of data
    """
    async with aiosqlite.connect('libregrid.db') as db:
        await db.executemany('''
        INSERT OR REPLACE INTO meter_data (mpan_import, serial_number, interval_start, interval_end, consumption)
        VALUES (:mpan_import, :serial_number, :interval_start, :interval_end, :consumption)
        ''', consumption_list)
        await db.commit()

In [28]:
#| export
async def get_meter_data():
    """
    Get data from database. Again async to be able to do other stuff while it's reading.
    """
    async with aiosqlite.connect('libregrid.db') as db:
        async with db.execute('SELECT * FROM meter_data') as cursor:
            rows = await cursor.fetchall()
            columns = [description[0] for description in cursor.description]
            return [dict(zip(columns, row)) for row in rows]

## Example insert data into database workflow
I setup a local SQLite database to store the energy data from smart meters.

In [14]:
# Set up a database
connection = sqlite3.connect('libregrid.db')

In [15]:
# Set up my table for meter data
cursor = connection.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS meter_data (
mpan_import TEXT,
serial_number TEXT,
interval_start TEXT,
interval_end TEXT,
consumption REAL,
PRIMARY KEY (mpan_import, serial_number, interval_start)
)
''')
connection.commit()

In [20]:
# I fetch and insert the latest data

api_key = os.environ['phil_key']
mpan = os.environ['MPAN_import']
serial_num = os.environ['serial_number']
latest_data = fetch_latest_meter_data(api_key, mpan, serial_num)
latest_data

await insert_meter_data(latest_data)

In [22]:
# I get the latest data from the database
await get_meter_data()

[{'mpan_import': '1413985791007',
  'serial_number': '21J0016813',
  'interval_start': '2024-08-20T00:30:00+01:00',
  'interval_end': '2024-08-20T01:00:00+01:00',
  'consumption': 0.144},
 {'mpan_import': '1413985791007',
  'serial_number': '21J0016813',
  'interval_start': '2024-08-20T00:00:00+01:00',
  'interval_end': '2024-08-20T00:30:00+01:00',
  'consumption': 0.143},
 {'mpan_import': '1413985791007',
  'serial_number': '21J0016813',
  'interval_start': '2024-10-02T00:30:00+01:00',
  'interval_end': '2024-10-02T01:00:00+01:00',
  'consumption': 0.171},
 {'mpan_import': '1413985791007',
  'serial_number': '21J0016813',
  'interval_start': '2024-10-02T00:00:00+01:00',
  'interval_end': '2024-10-02T00:30:00+01:00',
  'consumption': 0.404}]

In [23]:
# I close the connection to the database 
connection.close()