In [1]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

SQLALCHEMY_DATABASE_URL = 'sqlite:///./api_data.db'

engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={'check_same_thread': False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

## Models

In [2]:
#models
from sqlalchemy import Column, Integer, String, Text, Float, Boolean, DateTime
import datetime

Base = declarative_base()

class ElectricityPrice(Base):
    __tablename__ = 'electricity_prices'
    id = Column(Integer, primary_key=True, index=True)
    city = Column(String, index=True)
    state = Column(String, index=True)
    date_query = Column(String, index=True)
    region = Column(String, index=True)
    NOK_per_kWh = Column(Float)
    EUR_per_kWh = Column(Float)
    EXR = Column(Float)
    time_start = Column(String)
    time_end = Column(String)
    timestamp = Column(DateTime, default=datetime.datetime.now(datetime.UTC))

class WeatherData(Base):
    __tablename__ = 'weather_data'

    id = Column(Integer, primary_key=True, index=True)
    city = Column(String, index=True)
    state = Column(String, index=True)
    date_query = Column(String)
    temperature = Column(Float)
    time_start = Column(Text)
    time_end = Column(Text)
    timestamp = Column(DateTime, default=datetime.datetime.now(datetime.UTC))

In [12]:
# schemas.py
from pydantic import BaseModel, Field
from typing import List

class PriceData(BaseModel):
    NOK_per_kWh: float
    EUR_per_kWh: float
    EXR: float
    time_start: str
    time_end: str

class ElectricityPriceRequest(BaseModel):
    city: str = Field(description="City")
    state: str = Field(description="State or region")
    date_query: str = Field(description="Date query")

# Pydantic models
class WeatherDataSchema(BaseModel):
    temperature: float
    time_start: str
    time_end: str

class WeatherRequest(BaseModel):
    city: str = Field(description="City")
    state: str = Field(description="State or region")
    date_query: str = Field(description="Date query (e.g., 'today', 'yesterday', 'last 10 days', 'last 30 days', 'last three months', or 'YYYY-MM-DD/YYYY-MM-DD')")

In [4]:
# Function to get a database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

## Electricricty Price API

In [11]:
from sqlalchemy.orm import Session
from typing import Optional
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
from datetime import datetime, timedelta
import requests

# Create the database tables
Base.metadata.create_all(bind=engine)

# Regions dictionary
REGIONS = {
    "NO1": ("Oslo", (59.9139, 10.7522)),
    "NO2": ("Kristiansand", (58.1467, 7.9956)),
    "NO3": ("Trondheim", (63.4305, 10.3951)),
    "NO4": ("Tromsø", (69.6492, 18.9553)),
    "NO5": ("Bergen", (60.3928, 5.3221)),
}

# Functions
def get_coordinates(city_name: str, state: str) -> Optional[tuple]:
    geolocator = Nominatim(user_agent="electricity_price_api")
    location = geolocator.geocode(f"{city_name}, {state}")
    if location:
        return location.latitude, location.longitude
    else:
        return None

def find_nearest_region(lat: float, lon: float) -> Optional[str]:
    min_distance = float("inf")
    nearest_region = None
    for region, (city, coordinates) in REGIONS.items():
        distance = geodesic((lat, lon), coordinates).kilometers
        if distance < min_distance:
            min_distance = distance
            nearest_region = region
    return nearest_region

def fetch_electricity_prices(year: str, month: str, day: str, region: str) -> list:
    url = f"https://www.hvakosterstrommen.no/api/v1/prices/{year}/{month}-{day}_{region}.json"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error fetching data: {response.status_code}"

def store_prices(db: Session, request: ElectricityPriceRequest, region: str, prices: list):
    for price in prices:
        db_price = ElectricityPrice(
            city=request.city,
            state=request.state,
            date_query=request.date_query,
            region=region,
            NOK_per_kWh=price["NOK_per_kWh"],
            EUR_per_kWh=price["EUR_per_kWh"],
            EXR=price["EXR"],
            time_start=price["time_start"],
            time_end=price["time_end"]
        )
        db.add(db_price)
    db.commit()

def get_electricity_prices(request: ElectricityPriceRequest):
    coordinates = get_coordinates(request.city, request.state)
    if not coordinates:
        return {"error": "City not found"}
    
    lat, lon = coordinates
    end_date = datetime.now().strftime("%Y-%m-%d")
    
    if request.date_query.lower() == "today":
        start_date = end_date
    elif request.date_query.lower() == "yesterday":
        start_date = (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
        end_date = start_date
    elif "last" in request.date_query.lower():
        if "10 days" in request.date_query.lower():
            start_date = (datetime.now() - timedelta(10)).strftime("%Y-%m-%d")
        elif "30 days" in request.date_query.lower():
            start_date = (datetime.now() - timedelta(30)).strftime("%Y-%m-%d")
        elif "three months" in request.date_query.lower():
            start_date = (datetime.now() - timedelta(90)).strftime("%Y-%m-%d")
        else:
            return {"error": "Invalid date query"}
    else:
        try:
            start_date, end_date = request.date_query.split('/')
            datetime.strptime(start_date, "%Y-%m-%d")
            datetime.strptime(end_date, "%Y-%m-%d")
        except ValueError:
            return {"error": "Invalid date format. Use YYYY-MM-DD or predefined ranges."}


    nearest_region = find_nearest_region(lat, lon)
    if nearest_region:
        prices = fetch_electricity_prices(start_date[:4], start_date[5:7], start_date[8:10], nearest_region)
        if isinstance(prices, list):  # Ensure prices is a list of dictionaries
            db = next(get_db())
            store_prices(db, request, nearest_region, prices)
            return prices
        else:
            return prices  # Return the error message
    else:
        return "Could not determine the nearest region."

# Example usage
request = ElectricityPriceRequest(city="Lillehammer", state="Oslo", date_query="2024-01-01/2024-01-01")
result = get_electricity_prices(request)
print(result)

[{'NOK_per_kWh': 0.60463, 'EUR_per_kWh': 0.05379, 'EXR': 11.2405, 'time_start': '2024-01-01T00:00:00+01:00', 'time_end': '2024-01-01T01:00:00+01:00'}, {'NOK_per_kWh': 0.55056, 'EUR_per_kWh': 0.04898, 'EXR': 11.2405, 'time_start': '2024-01-01T01:00:00+01:00', 'time_end': '2024-01-01T02:00:00+01:00'}, {'NOK_per_kWh': 0.30855, 'EUR_per_kWh': 0.02745, 'EXR': 11.2405, 'time_start': '2024-01-01T02:00:00+01:00', 'time_end': '2024-01-01T03:00:00+01:00'}, {'NOK_per_kWh': 0.27517, 'EUR_per_kWh': 0.02448, 'EXR': 11.2405, 'time_start': '2024-01-01T03:00:00+01:00', 'time_end': '2024-01-01T04:00:00+01:00'}, {'NOK_per_kWh': 0.26988, 'EUR_per_kWh': 0.02401, 'EXR': 11.2405, 'time_start': '2024-01-01T04:00:00+01:00', 'time_end': '2024-01-01T05:00:00+01:00'}, {'NOK_per_kWh': 0.23864, 'EUR_per_kWh': 0.02123, 'EXR': 11.2405, 'time_start': '2024-01-01T05:00:00+01:00', 'time_end': '2024-01-01T06:00:00+01:00'}, {'NOK_per_kWh': 0.25426, 'EUR_per_kWh': 0.02262, 'EXR': 11.2405, 'time_start': '2024-01-01T06:00:00

## Weather data API

In [9]:
import openmeteo_requests
import requests_cache
import pandas as pd
from retry_requests import retry
import numpy as np
import json
from geopy.geocoders import Nominatim
from typing import Optional, List
from pydantic import BaseModel, Field
from datetime import datetime, timedelta

# Define Pydantic models
class WeatherDataSchema(BaseModel):
    temperature: float
    time_start: str
    time_end: str

class WeatherRequest(BaseModel):
    city: str = Field(description="City")
    state: str = Field(description="State or region")
    date_query: str = Field(description="Date query (e.g., 'today', 'yesterday', 'last 10 days', 'last 30 days', 'last three months', or 'YYYY-MM-DD/YYYY-MM-DD')")

def get_coordinates(city_name: str, state: str) -> Optional[tuple]:
    geolocator = Nominatim(user_agent="weather_api")
    location = geolocator.geocode(f"{city_name}, {state}")
    if location:
        return location.latitude, location.longitude
    else:
        return None

def parse_date_query(date_query: str) -> (str, str):
    today = datetime.utcnow().date()
    if date_query == 'today':
        start_date = end_date = today
    elif date_query == 'yesterday':
        start_date = end_date = today - timedelta(days=1)
    elif 'last' in date_query:
        days = int(date_query.split()[1])
        start_date = today - timedelta(days=days)
        end_date = today
    elif '/' in date_query:
        start_date, end_date = date_query.split('/')
        start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
        end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
    else:
        raise ValueError("Invalid date_query format.")
    return start_date.isoformat(), end_date.isoformat()

# Function to get weather data and format it as a list of WeatherDataSchema
def get_weather_data(request: WeatherRequest) -> List[WeatherDataSchema]:
    coordinates = get_coordinates(request.city, request.state)
    if not coordinates:
        return {"error": "Could not find coordinates for the specified city and state."}

    latitude, longitude = coordinates
    start_date, end_date = parse_date_query(request.date_query)

    # Setup the Open-Meteo API client with cache and retry on error
    cache_session = requests_cache.CachedSession('.cache', expire_after=-1)
    retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
    openmeteo = openmeteo_requests.Client(session=retry_session)

    # Make sure all required weather variables are listed here
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "hourly": "temperature_2m"
    }
    responses = openmeteo.weather_api(url, params=params)

    # Process first location. Add a for-loop for multiple locations or weather models
    response = responses[0]

    # Process hourly data. The order of variables needs to be the same as requested.
    hourly = response.Hourly()
    hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()

    hourly_dates = pd.date_range(
        start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
        end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
        freq=pd.Timedelta(seconds=hourly.Interval()),
        inclusive="left"
    ).strftime('%Y-%m-%dT%H:%M:%S%z')

    # Create a list of WeatherDataSchema for each observation
    observations = []
    for i in range(len(hourly_dates) - 1):
        observations.append(WeatherDataSchema(
            temperature=float(hourly_temperature_2m[i]),
            time_start=hourly_dates[i],
            time_end=hourly_dates[i + 1]
        ))

    return observations

# Example usage
request = WeatherRequest(city="Lillehammer", state="", date_query="2024-07-23/2024-08-06")
weather_data = get_weather_data(request)
print(json.dumps([data.dict() for data in weather_data], indent=4))

[
    {
        "temperature": 15.607000350952148,
        "time_start": "2024-07-23T00:00:00+0000",
        "time_end": "2024-07-23T01:00:00+0000"
    },
    {
        "temperature": 12.057000160217285,
        "time_start": "2024-07-23T01:00:00+0000",
        "time_end": "2024-07-23T02:00:00+0000"
    },
    {
        "temperature": 11.256999969482422,
        "time_start": "2024-07-23T02:00:00+0000",
        "time_end": "2024-07-23T03:00:00+0000"
    },
    {
        "temperature": 11.557000160217285,
        "time_start": "2024-07-23T03:00:00+0000",
        "time_end": "2024-07-23T04:00:00+0000"
    },
    {
        "temperature": 13.057000160217285,
        "time_start": "2024-07-23T04:00:00+0000",
        "time_end": "2024-07-23T05:00:00+0000"
    },
    {
        "temperature": 13.756999969482422,
        "time_start": "2024-07-23T05:00:00+0000",
        "time_end": "2024-07-23T06:00:00+0000"
    },
    {
        "temperature": 15.057000160217285,
        "time_start": "2024-07-

In [None]:
def store_weather_data(db: Session, city: str, state: str, date_query: str, weather_data: List[WeatherDataSchema]):
    for weather in weather_data:
        db_weather = WeatherData(
            city=city,
            state=state,
            date_query=date_query,
            temperature=weather.temperature,
            time_start=weather.time_start,
            time_end=weather.time_end
        )
        db.add(db_weather)
    db.commit()

def get_weather_data(request: WeatherRequest) -> WeatherResponse:
    weather_data = fetch_weather_data(request.city, request.state, request.date_query)
    if isinstance(weather_data, list):
        db = next(get_db())
        store_weather_data(db, request.city, request.state, request.date_query, weather_data)
        return WeatherResponse(city=request.city, state=request.state, date_query=request.date_query, weather_data=weather_data)
    else:
        return weather_data  # Return the error message