Skip to content
21 changes: 21 additions & 0 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import paramiko
from tqdm import tqdm

from powersimdata.data_access.profile_helper import ProfileHelper
from powersimdata.utility import server_setup
from powersimdata.utility.helpers import CommandBuilder

Expand Down Expand Up @@ -115,6 +116,15 @@ def push(self, file_name, checksum):
"""
raise NotImplementedError

def get_profile_version(self, grid_model, kind):
Comment thread
rouille marked this conversation as resolved.
"""Returns available raw profile from blob storage

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:return: (*list*) -- available profile version.
"""
return ProfileHelper.get_profile_version_cloud(grid_model, kind)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We implicitly return the versions stored on the cloud. I see how it is useful but is that intuitive in the DataAccess class? Should we return instead an instance of ProfileHelper and call the functions in the child classes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking since blob storage is the source of truth, it makes sense as the default. Another way would be to raise NotImplemented here and have the child classes contain their own implementations (SSHDataAccess is just inheriting this, while LocalDataAccess appends the local versions if there are any). Would that be more intuitive?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. It makes sense it is the default.


def close(self):
"""Perform any necessary cleanup for the object."""
pass
Expand Down Expand Up @@ -191,6 +201,17 @@ def wrap(s):
)
return wrap(None), wrap(proc.stdout), wrap(proc.stderr)

def get_profile_version(self, grid_model, kind):
"""Returns available raw profile from blob storage or local disk

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:return: (*list*) -- available profile version.
"""
blob_version = super().get_profile_version(grid_model, kind)
local_version = ProfileHelper.get_profile_version_local(grid_model, kind)
return list(set(blob_version + local_version))


class SSHDataAccess(DataAccess):
"""Interface to a remote data store, accessed via SSH."""
Expand Down
98 changes: 98 additions & 0 deletions powersimdata/data_access/profile_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import json
import os

import requests
from tqdm.auto import tqdm

from powersimdata.utility import server_setup


class ProfileHelper:
BASE_URL = "https://bescienceswebsite.blob.core.windows.net/profiles"

@staticmethod
def get_file_components(scenario_info, field_name):
"""Get the file name and relative path for the given profile and
scenario.

:param dict scenario_info: a ScenarioInfo instance
:param str field_name: the kind of profile
:return: (*tuple*) -- file name and path
"""
version = scenario_info["base_" + field_name]
file_name = field_name + "_" + version + ".csv"
grid_model = scenario_info["grid_model"]
from_dir = os.path.join("raw", grid_model)
return file_name, from_dir

@staticmethod
def download_file(file_name, from_dir):
"""Download the profile from blob storage at the given path

:param str file_name: profile csv
:param str from_dir: the path relative to the blob container
:return: (*str*) -- path to downloaded file
"""
print(f"--> Downloading {file_name} from blob storage.")
url_path = "/".join(os.path.split(from_dir))
url = f"{ProfileHelper.BASE_URL}/{url_path}/{file_name}"
dest = os.path.join(server_setup.LOCAL_DIR, from_dir, file_name)
os.makedirs(os.path.dirname(dest), exist_ok=True)
resp = requests.get(url, stream=True)
content_length = int(resp.headers.get("content-length", 0))
with open(dest, "wb") as f:
with tqdm(
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
total=content_length,
) as pbar:
for chunk in resp.iter_content(chunk_size=4096):
f.write(chunk)
pbar.update(len(chunk))

print("--> Done!")
return dest

@staticmethod
def parse_version(grid_model, kind, version):
"""Parse available versions from the given spec

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:param dict version: version information per grid model
:return: (*list*) -- available profile version.
"""
if grid_model in version and kind in version[grid_model]:
return version[grid_model][kind]
print("No %s profiles available." % kind)
return []

@staticmethod
def get_profile_version_cloud(grid_model, kind):
"""Returns available raw profile from blob storage

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:return: (*list*) -- available profile version.
"""

resp = requests.get(f"{ProfileHelper.BASE_URL}/version.json")
return ProfileHelper.parse_version(grid_model, kind, resp.json())

@staticmethod
def get_profile_version_local(grid_model, kind):
"""Returns available raw profile from local file

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:return: (*list*) -- available profile version.
"""

version_file = os.path.join(server_setup.LOCAL_DIR, "version.json")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easier just to list the files in the directory and filter out the ones that match the {kind}_{version}.csv format? Then again, making a user add a new profile to version.json is more intentional, so they're less likely to accidentally add something without understanding the risks.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was kind of a trade off - it's less code to reuse the json format and provides at least one way for a user to customize. Figured it's ok for now, but definitely open to future improvements.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! We could also probably get some feedback from our users/collaborators to see what they think about usability. But agreed, this looks good for now.

if not os.path.exists(version_file):
return []
with open(version_file) as f:
version = json.load(f)
return ProfileHelper.parse_version(grid_model, kind, version)
24 changes: 24 additions & 0 deletions powersimdata/data_access/tests/test_profile_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from powersimdata.data_access.profile_helper import ProfileHelper


def test_parse_version_default():
assert [] == ProfileHelper.parse_version("usa_tamu", "solar", {})


def test_parse_version_missing_key():
version = {"solar": ["v123"]}
assert [] == ProfileHelper.parse_version("usa_tamu", "solar", version)


def test_parse_version():
expected = ["v123", "v456"]
version = {"usa_tamu": {"solar": expected}}
assert expected == ProfileHelper.parse_version("usa_tamu", "solar", version)
assert [] == ProfileHelper.parse_version("usa_tamu", "hydro", version)


def test_get_file_components():
s_info = {"base_wind": "v8", "grid_model": "europe"}
file_name, from_dir = ProfileHelper.get_file_components(s_info, "wind")
assert "wind_v8.csv" == file_name
assert "raw/europe" == from_dir
103 changes: 54 additions & 49 deletions powersimdata/input/input_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
import posixpath

import pandas as pd

from powersimdata.data_access.context import Context
from powersimdata.data_access.profile_helper import ProfileHelper
from powersimdata.utility import server_setup
from powersimdata.utility.helpers import MemoryCache, cache_key

Expand All @@ -12,6 +12,51 @@
profile_kind = {"demand", "hydro", "solar", "wind"}


_file_extension = {
**{"ct": "pkl", "grid": "mat"},
**{k: "csv" for k in profile_kind},
}


class InputHelper:
def __init__(self, data_access):
self.data_access = data_access

@staticmethod
def get_file_components(scenario_info, field_name):
"""Get the file name and relative path for either ct or grid

:param dict scenario_info: a ScenarioInfo instance
:param str field_name: the input file type
:return: (*tuple*) -- file name and path
"""
ext = _file_extension[field_name]
file_name = scenario_info["id"] + "_" + field_name + "." + ext
from_dir = server_setup.INPUT_DIR
return file_name, from_dir

def download_file(self, file_name, from_dir):
"""Download the file if using server, otherwise no-op

:param str file_name: either grid or ct file name
:param str from_dir: the path relative to the root dir
"""
self.data_access.copy_from(file_name, from_dir)


def _check_field(field_name):
"""Checks field name.

:param str field_name: *'demand'*, *'hydro'*, *'solar'*, *'wind'*,
*'ct'* or *'grid'*.
:raises ValueError: if not *'demand'*, *'hydro'*, *'solar'*, *'wind'*
*'ct'* or *'grid'*
"""
possible = list(_file_extension.keys())
if field_name not in possible:
raise ValueError("Only %s data can be loaded" % " | ".join(possible))


class InputData(object):
"""Load input data.

Expand All @@ -22,25 +67,8 @@ def __init__(self, data_loc=None):
"""Constructor."""
os.makedirs(server_setup.LOCAL_DIR, exist_ok=True)

self.file_extension = {
**{"ct": "pkl", "grid": "mat"},
**{k: "csv" for k in profile_kind},
}

self.data_access = Context.get_data_access(data_loc)

def _check_field(self, field_name):
"""Checks field name.

:param str field_name: *'demand'*, *'hydro'*, *'solar'*, *'wind'*,
*'ct'* or *'grid'*.
:raises ValueError: if not *'demand'*, *'hydro'*, *'solar'*, *'wind'*
*'ct'* or *'grid'*
"""
possible = list(self.file_extension.keys())
if field_name not in possible:
raise ValueError("Only %s data can be loaded" % " | ".join(possible))

def get_data(self, scenario_info, field_name):
"""Returns data either from server or local directory.

Expand All @@ -52,20 +80,15 @@ def get_data(self, scenario_info, field_name):
dictionary, or the path to a matfile enclosing the grid data.
:raises FileNotFoundError: if file not found on local machine.
"""
self._check_field(field_name)

_check_field(field_name)
print("--> Loading %s" % field_name)
ext = self.file_extension[field_name]

if field_name in profile_kind:
version = scenario_info["base_" + field_name]
file_name = field_name + "_" + version + "." + ext
from_dir = posixpath.join(
server_setup.BASE_PROFILE_DIR, scenario_info["grid_model"]
)
helper = ProfileHelper
else:
file_name = scenario_info["id"] + "_" + field_name + "." + ext
from_dir = server_setup.INPUT_DIR
helper = InputHelper(self.data_access)

file_name, from_dir = helper.get_file_components(scenario_info, field_name)

filepath = os.path.join(server_setup.LOCAL_DIR, from_dir, file_name)
key = cache_key(filepath)
Expand All @@ -79,37 +102,19 @@ def get_data(self, scenario_info, field_name):
"%s not found in %s on local machine"
% (file_name, server_setup.LOCAL_DIR)
)
self.data_access.copy_from(file_name, from_dir)
helper.download_file(file_name, from_dir)
data = _read_data(filepath)
_cache.put(key, data)
return data

def get_profile_version(self, grid_model, kind):
"""Returns available raw profile either from server or local directory.
"""Returns available raw profile from blob storage or local disk

:param str grid_model: grid model.
:param str kind: *'demand'*, *'hydro'*, *'solar'* or *'wind'*.
:return: (*list*) -- available profile version.
:raises ValueError: if kind not one of *'demand'*, *'hydro'*, *'solar'* or
*'wind'*.
"""
if kind not in profile_kind:
raise ValueError("kind must be one of %s" % " | ".join(profile_kind))

query = posixpath.join(
server_setup.DATA_ROOT_DIR,
server_setup.BASE_PROFILE_DIR,
grid_model,
kind + "_*",
)
stdin, stdout, stderr = self.data_access.execute_command("ls " + query)
if len(stderr.readlines()) != 0:
print("No %s profiles available." % kind)
version = []
else:
filename = [os.path.basename(line.rstrip()) for line in stdout.readlines()]
version = [f[f.rfind("_") + 1 : -4] for f in filename]
return version
return self.data_access.get_profile_version(grid_model, kind)


def _read_data(filepath):
Expand Down
20 changes: 20 additions & 0 deletions powersimdata/input/tests/test_input_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

from powersimdata.input.input_data import InputHelper, _check_field


def test_get_file_components():
s_info = {"id": "123"}
ct_file, _ = InputHelper.get_file_components(s_info, "ct")
grid_file, from_dir = InputHelper.get_file_components(s_info, "grid")
assert "123_ct.pkl" == ct_file
assert "123_grid.mat" == grid_file
assert "data/input" == from_dir


def test_check_field():
_check_field("demand")
_check_field("hydro")
with pytest.raises(ValueError):
_check_field("foo")
_check_field("coal")
2 changes: 1 addition & 1 deletion powersimdata/input/transform_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _get_demand_profile(self):
:return: (*pandas.DataFrame*) -- data frame of demand.
"""
zone_id = sorted(self.grid.bus.zone_id.unique())
demand = self._input_data.get_data(self.scenario_info, "demand")[zone_id]
demand = self._input_data.get_data(self.scenario_info, "demand").loc[:, zone_id]
if bool(self.ct) and "demand" in list(self.ct.keys()):
for key, value in self.ct["demand"]["zone_id"].items():
print(
Expand Down
17 changes: 0 additions & 17 deletions powersimdata/scenario/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def move_scenario(self, target="disk"):
backup = BackUpDisk(self._data_access, self._scenario_info)

backup.move_input_data()
backup.copy_base_profile()
backup.move_output_data()
backup.move_temporary_folder()

Expand Down Expand Up @@ -76,22 +75,6 @@ def move_input_data(self):
self._data_access.copy(source, target, update=True)
self._data_access.remove(source, recursive=True, force=True)

def copy_base_profile(self):
"""Copies base profile"""
print("--> Copying base profiles to backup disk")
for kind in ["demand", "hydro", "solar", "wind"]:
src = posixpath.join(
self.server_config.base_profile_dir(),
self._scenario_info["grid_model"],
kind + "_" + self._scenario_info["base_" + kind] + ".csv",
)
dest = posixpath.join(
self.backup_config.base_profile_dir(), self._scenario_info["grid_model"]
)
_, stdout, stderr = self._data_access.copy(src, dest, update=True)
print(stdout.readlines())
print(stderr.readlines())

def move_output_data(self):
"""Moves output data"""
print("--> Moving scenario output data to backup disk")
Expand Down
Loading