Skip to content

Commit

Permalink
Upgrade ENTSOE exchange parsing to ExchangeList (#6293)
Browse files Browse the repository at this point in the history
* Add snapshot test for ENTSO-E exchange function

* add test case for exchange forecasts

* upgrade exchanges to ExchangeList

* combine the fetch functions

* fix

* format

* sort by datetime to match EventLists

* better handling of sorted zone keys

* fix snapshot tests merge error

* Modify raw exchange function to handle aggregate exchanges in ENTSOE (#6294)

* Modify raw exchange function to handle aggregate exchanges in ENTSOE

* format

* AI function refactoring

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Revert "AI function refactoring"

This reverts commit 8d4f4bd.

* use internal function to avoid code duplication

* add snapshot tests for aggregated exchanges

* change logic to avoid else statement

* cleanup docstrings

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* adjust test that uses path to use base_path_to_mock

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
VIKTORVAV99 and github-actions[bot] authored Jan 18, 2024
1 parent f342eac commit f80b6b0
Show file tree
Hide file tree
Showing 13 changed files with 5,097 additions and 591 deletions.
4 changes: 2 additions & 2 deletions config/exchanges/FR-COR_IT-SAR.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ lonlat:
- 9.123283
- 41.293785
parsers:
exchange: FR-COR_IT-SAR.fetch_exchange
exchangeForecast: FR-COR_IT-SAR.fetch_exchange_forecast
exchange: ENTSOE.fetch_exchange
exchangeForecast: ENTSOE.fetch_exchange_forecast
rotation: 180
298 changes: 124 additions & 174 deletions parsers/ENTSOE.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from electricitymap.contrib.config import ZoneKey
from electricitymap.contrib.lib.models.event_lists import (
ExchangeList,
PriceList,
ProductionBreakdownList,
TotalConsumptionList,
Expand Down Expand Up @@ -201,14 +202,6 @@
ENTSOE_DOMAIN_MAPPINGS["IT-SACODC"],
ENTSOE_DOMAIN_MAPPINGS["IT-CNO"],
],
"FR-COR-AC->IT-SAR": [
ENTSOE_DOMAIN_MAPPINGS["IT-SACOAC"],
ENTSOE_DOMAIN_MAPPINGS["IT-SAR"],
],
"FR-COR-DC->IT-SAR": [
ENTSOE_DOMAIN_MAPPINGS["IT-SACODC"],
ENTSOE_DOMAIN_MAPPINGS["IT-SAR"],
],
"GE->RU-1": [ENTSOE_DOMAIN_MAPPINGS["GE"], ENTSOE_DOMAIN_MAPPINGS["RU"]],
"GR->IT-SO": [ENTSOE_DOMAIN_MAPPINGS["GR"], ENTSOE_DOMAIN_MAPPINGS["IT-SO"]],
"HU->UA": [ENTSOE_DOMAIN_MAPPINGS["HU"], ENTSOE_DOMAIN_MAPPINGS["UA-IPS"]],
Expand All @@ -224,6 +217,14 @@
"RU-1->UA": [ENTSOE_DOMAIN_MAPPINGS["RU"], ENTSOE_DOMAIN_MAPPINGS["UA-IPS"]],
"SK->UA": [ENTSOE_DOMAIN_MAPPINGS["SK"], ENTSOE_DOMAIN_MAPPINGS["UA-IPS"]],
}

EXCHANGE_AGGREGATES: dict[str, list[list]] = {
"FR-COR->IT-SAR": [
[ENTSOE_DOMAIN_MAPPINGS["IT-SACOAC"], ENTSOE_DOMAIN_MAPPINGS["IT-SAR"]],
[ENTSOE_DOMAIN_MAPPINGS["IT-SACODC"], ENTSOE_DOMAIN_MAPPINGS["IT-SAR"]],
],
}

# Some zone_keys are part of bidding zone domains for price data
ENTSOE_PRICE_DOMAIN_MAPPINGS: dict[str, str] = {
**ENTSOE_DOMAIN_MAPPINGS, # Note: This has to be first so the domains are overwritten.
Expand Down Expand Up @@ -866,13 +867,12 @@ def parse_production_per_units(xml_text: str) -> Any | None:
def parse_exchange(
xml_text: str,
is_import: bool,
quantities: list[float] | None = None,
datetimes: list[datetime] | None = None,
) -> tuple[list[float], list[datetime]] | None:
if not xml_text:
return None
quantities = quantities or []
datetimes = datetimes or []
sorted_zone_keys: ZoneKey,
logger: Logger,
is_forecast: bool = False,
) -> ExchangeList:
exchange_list = ExchangeList(logger)

soup = BeautifulSoup(xml_text, "html.parser")
# Get all points
for timeseries in soup.find_all("timeseries"):
Expand All @@ -890,19 +890,22 @@ def parse_exchange(

for entry in timeseries.find_all("point"):
quantity = float(entry.find_all("quantity")[0].contents[0])
if not is_import:
if is_import:
quantity *= -1
position = int(entry.find_all("position")[0].contents[0])
datetime = datetime_from_position(datetime_start, position, resolution)
# Find out whether or not we should update the net production
try:
i = datetimes.index(datetime)
quantities[i] += quantity
except ValueError: # Not in list
quantities.append(quantity)
datetimes.append(datetime)
exchange_list.append(
zoneKey=sorted_zone_keys,
datetime=datetime,
source=SOURCE,
netFlow=quantity,
sourceType=EventSourceType.forecasted
if is_forecast
else EventSourceType.measured,
)

return quantities, datetimes
return exchange_list


def parse_prices(
Expand Down Expand Up @@ -1055,179 +1058,126 @@ def fetch_production_per_units(
return data


@refetch_frequency(timedelta(days=2))
def fetch_exchange(
zone_key1: str,
zone_key2: str,
def get_raw_exchange(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list:
forecast: bool = False,
) -> ExchangeList:
"""
Gets exchange status between two specified zones.
Removes any datapoints that are in the future.
"""
if not session:
session = Session()
sorted_zone_keys = sorted([zone_key1, zone_key2])
key = "->".join(sorted_zone_keys)
if key in ENTSOE_EXCHANGE_DOMAIN_OVERRIDE:
domain1, domain2 = ENTSOE_EXCHANGE_DOMAIN_OVERRIDE[key]
else:
domain1 = ENTSOE_DOMAIN_MAPPINGS[zone_key1]
domain2 = ENTSOE_DOMAIN_MAPPINGS[zone_key2]
# Create a hashmap with key (datetime)
exchange_hashmap = {}
# Grab exchange
# Import
try:
raw_exchange = query_exchange(domain1, domain2, session, target_datetime)
except Exception as e:
raise ParserException(
parser="ENTSOE.py",
message=f"Failed to fetch exchange for {zone_key1} -> {zone_key2}",
zone_key=key,
) from e
if raw_exchange is not None:
parsed = parse_exchange(
raw_exchange,
is_import=True,
)
if parsed:
# Export
try:
raw_exchange = query_exchange(
domain2, domain1, session, target_datetime
)
except Exception as e:
raise ParserException(
parser="ENTSOE.py",
message=f"Failed to fetch exchange for {zone_key1} -> {zone_key2}",
zone_key=key,
) from e
if raw_exchange is not None:
parsed = parse_exchange(
xml_text=raw_exchange,
is_import=False,
quantities=parsed[0],
datetimes=parsed[1],
)
if parsed:
quantities, datetimes = parsed
for i in range(len(quantities)):
exchange_hashmap[datetimes[i]] = quantities[i]

# Remove all dates in the future
exchange_dates = sorted(set(exchange_hashmap.keys()), reverse=True)
exchange_dates = list(filter(lambda x: x <= arrow.now(), exchange_dates))
if not len(exchange_dates):
raise ParserException(parser="ENTSOE.py", message="No exchange data found")
data = []
for exchange_date in exchange_dates:
net_flow = float(exchange_hashmap[exchange_date])
data.append(
{
"sortedZoneKeys": key,
"datetime": exchange_date,
"netFlow": net_flow
if zone_key1[0] == sorted_zone_keys
else -1 * net_flow,
"source": "entsoe.eu",
}
)
sorted_zone_keys = ZoneKey("->".join(sorted([zone_key1, zone_key2])))

return data
else:
raise ParserException(
parser="entsoe.eu",
message=f"No exchange data found for {zone_key1} -> {zone_key2}",
)
# This will be filled with a list of raw exchanges to merge
raw_exchange_lists: list[ExchangeList] = []

query_function = query_exchange_forecast if forecast else query_exchange

@refetch_frequency(timedelta(days=2))
def fetch_exchange_forecast(
zone_key1: str,
zone_key2: str,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list:
"""Gets exchange forecast between two specified zones."""
if not session:
session = Session()
sorted_zone_keys = sorted([zone_key1, zone_key2])
key = "->".join(sorted_zone_keys)
if key in ENTSOE_EXCHANGE_DOMAIN_OVERRIDE:
domain1, domain2 = ENTSOE_EXCHANGE_DOMAIN_OVERRIDE[key]
# This will be filled with a list of domain pairs to fetch
exchanges_to_fetch: list[list[str]] = []

if sorted_zone_keys in EXCHANGE_AGGREGATES:
for domain_pair in EXCHANGE_AGGREGATES[sorted_zone_keys]:
exchanges_to_fetch.append(domain_pair)
elif sorted_zone_keys in ENTSOE_EXCHANGE_DOMAIN_OVERRIDE:
exchanges_to_fetch.append(ENTSOE_EXCHANGE_DOMAIN_OVERRIDE[sorted_zone_keys])
else:
domain1 = ENTSOE_DOMAIN_MAPPINGS[zone_key1]
domain2 = ENTSOE_DOMAIN_MAPPINGS[zone_key2]
# Create a hashmap with key (datetime)
exchange_hashmap = {}
# Grab exchange
# Import
parsed = None
try:
raw_exchange_forecast = query_exchange_forecast(
domain1, domain2, session, target_datetime=target_datetime
)
except Exception as e:
raise ParserException(
parser="ENTSOE.py",
message=f"Failed to fetch exchange forecast for {zone_key1} -> {zone_key2}",
zone_key=key,
) from e
if raw_exchange_forecast is not None:
parsed = parse_exchange(
raw_exchange_forecast,
is_import=True,
exchanges_to_fetch.append(
[ENTSOE_DOMAIN_MAPPINGS[zone_key1], ENTSOE_DOMAIN_MAPPINGS[zone_key2]]
)
if parsed is not None:
# Export

def _fetch_and_parse_exchange(
domain_pair: list[str],
is_import: bool,
) -> ExchangeList:
"""
Internal function to fetch and parse exchange data
only used to avoid code duplication in the parent function.
"""
domain1, domain2 = domain_pair if is_import else domain_pair[::-1]
try:
raw_exchange_forecast = query_exchange_forecast(
domain2, domain1, session, target_datetime=target_datetime
)
raw_exchange = query_function(domain1, domain2, session, target_datetime)
except Exception as e:
raise ParserException(
parser="ENTSOE.py",
message=f"Failed to fetch exchange forecast for {zone_key1} -> {zone_key2}",
zone_key=key,
message=f"Failed to query {'import' if is_import else 'export'} for {domain1} -> {domain2}",
zone_key=sorted_zone_keys,
) from e
if raw_exchange_forecast is not None:
parsed = parse_exchange(
xml_text=raw_exchange_forecast,
is_import=False,
quantities=parsed[0],
datetimes=parsed[1],
if raw_exchange is None:
raise ParserException(
parser="ENTSOE.py",
message=f"No exchange data found for {domain1} -> {domain2}",
zone_key=sorted_zone_keys,
)
if parsed is not None:
quantities, datetimes = parsed
for i in range(len(quantities)):
exchange_hashmap[datetimes[i]] = quantities[i]

# Remove all dates in the future
sorted_zone_keys = sorted([zone_key1, zone_key2])
exchange_dates = list(sorted(set(exchange_hashmap.keys()), reverse=True))
if not len(exchange_dates):
raise ParserException(
parser="ENTSOE.py",
message=f"No exchange forecast data found for {zone_key1} -> {zone_key2}",
return parse_exchange(
raw_exchange,
is_import=is_import,
sorted_zone_keys=sorted_zone_keys,
logger=logger,
is_forecast=forecast,
)
data = []
for exchange_date in exchange_dates:
netFlow = exchange_hashmap[exchange_date]
data.append(
{
"sortedZoneKeys": key,
"datetime": exchange_date,
"netFlow": netFlow
if zone_key1[0] == sorted_zone_keys
else -1 * netFlow,
"source": "entsoe.eu",
}

# Grab all exchanges
for domain_pair in exchanges_to_fetch:
# First we try to get the import data
raw_exchange_lists.append(
_fetch_and_parse_exchange(domain_pair, is_import=True)
)
return data
# Then we try to get the export data
raw_exchange_lists.append(
_fetch_and_parse_exchange(domain_pair, is_import=False)
)

return ExchangeList(logger).merge_exchanges(raw_exchange_lists, logger)


@refetch_frequency(timedelta(days=2))
def fetch_exchange(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list:
"""
Gets exchange status between two specified zones.
"""
exchanges = get_raw_exchange(
zone_key1,
zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
)
return exchanges.to_list()


@refetch_frequency(timedelta(days=2))
def fetch_exchange_forecast(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list:
"""
Gets exchange forecast between two specified zones.
"""
exchanges = get_raw_exchange(
zone_key1,
zone_key2,
session=session,
target_datetime=target_datetime,
logger=logger,
forecast=True,
)
return exchanges.to_list()


@refetch_frequency(timedelta(days=2))
Expand Down
Loading

0 comments on commit f80b6b0

Please sign in to comment.