diff --git a/.github/workflows/schemavalidation.yaml b/.github/workflows/schemavalidation.yaml index f50c98e5..48d6ec86 100644 --- a/.github/workflows/schemavalidation.yaml +++ b/.github/workflows/schemavalidation.yaml @@ -31,3 +31,6 @@ jobs: run: python ./tests/validate_schema.py ./tests/resources/schema.yaml --config ./tests/resources/test.yaml - name: Validate test schema itself run: python ./tests/validate_schema.py ./tests/resources/schema.yaml + # Modules + - name: Validate industry config + run: python ./tests/validate_schema.py ./modules/industry/schema.yaml --config ./modules/industry/config.yaml diff --git a/.gitignore b/.gitignore index aaee99cf..42ba3c02 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ __pycache__ # Snakemake .snakemake/ dag.pdf +**/out/* +**/tmp/* diff --git a/CHANGELOG.md b/CHANGELOG.md index b85469a4..54e6afbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Added (models) +* **ADD** industry module and steel industry energy demand processing. NOT CONNECTED TO THE MAIN WORKFLOW. Industry sectors pending: chemical, "other". (Fixes #308, #310, #347, #345 and #346) + * **ADD** Spatial resolution that aligns with the regions defined by the [e-Highway 2050 project](https://cordis.europa.eu/project/id/308908/reporting) (`ehighways`) (#370). * **ADD** fully-electrified heat demand (#284). @@ -24,7 +26,7 @@ * **ADD** configuration option to build model timeseries data over multiple years, using `first-year` and `final-year` temporal scopes. Available years are 2010-2016 at time of implementing functionality (#152). * **ADD** nuclear technology capacity allocation workflow which uses the configuration parameter `nuclear-capacity-scenario` to select whether today's capacities define limits in the model definition ("current") or whether ranges set bounds on future capacity (by linking to a configuration CSV file) (#78). * **ADD** a Snakemake rule that generates a .csv and .nc file that provide an summary of the potentials (= per-tech constraints) for each technology and location (#250). -* **ADD** ability to run on Apple silicon devices (#263). +* **ADD** ability to run on Apple silicon devices (#263).Fixes #308, #310, #347, #345 and #346. * Updated geo packages from gdal 3.2 -> 3.3. * **ADD** re-execution triggers based on config and env changes (#264). * **ADD** continuous integration test of all conda environments on both ARM macOS and Linux (#369). diff --git a/Snakefile b/Snakefile index 2a02c8a1..a69c4258 100644 --- a/Snakefile +++ b/Snakefile @@ -6,6 +6,18 @@ from snakemake.utils import validate, min_version, makedirs configfile: "config/default.yaml" validate(config, "config/schema.yaml") +# >>>>>> Include modules >>>>>> +# Industry +configfile: "modules/industry/config.yaml" +validate(config, "modules/industry/schema.yaml") + +module module_industry: + snakefile: "modules/industry/industry.smk" + config: config["industry"] +use rule * from module_industry as module_industry_* +# <<<<<< Include modules <<<<<< + + root_dir = config["root-directory"] + "/" if config["root-directory"] not in ["", "."] else "" __version__ = open(f"{root_dir}VERSION").readlines()[0].strip() test_dir = f"{root_dir}tests/" @@ -80,7 +92,6 @@ rule all: file=["example-model.yaml", "build-metadata.yaml", "summary-of-potentials.nc", "summary-of-potentials.csv"] ) - rule all_tests: message: "Generate euro-calliope pre-built models and run all tests." input: diff --git a/lib/eurocalliopelib/utils.py b/lib/eurocalliopelib/utils.py index 63f488b1..2f19a634 100644 --- a/lib/eurocalliopelib/utils.py +++ b/lib/eurocalliopelib/utils.py @@ -167,3 +167,8 @@ def pj_to_twh(array): def tj_to_twh(array): """Convert TJ to TWh""" return pj_to_twh(array) / 1000 + + +def tj_to_ktoe(array): + """Convert TJ to Ktoe""" + return array * 23.88e-3 diff --git a/modules/industry/README.md b/modules/industry/README.md new file mode 100644 index 00000000..d9c329e4 --- /dev/null +++ b/modules/industry/README.md @@ -0,0 +1,21 @@ +# About + +This module is dedicated to getting and processing energy demand for industry. + +Basic info of this module: + +- Main data sources: JRC IDEES and eurostat +- Spatial resolution: national (because of JRC IDEES) +- Temporal resolution: annual aggregated + +Some industries are specified as 'sub-module' in this module, such as iron and steel industry. This allows you to flexibly configure the decarbonisation level of each industry separately. All other industries are grouped into 'other industries'. + +For workflow users, i.e. non-developers, `config.yaml` is the main file to look into. +Here, one can specify parameters such as the years to be included in the data processing, assumptions such as the share of recycled steel, and the specific industry sector to be included (such as iron and steel industry). +The structure of the `config.yaml` file is: + +- inputs - the needed data sources from other modules in euro-calliope (for now, we assume this module depends on other modules and is not yet a stand-alone module) +- outputs - the output of the whole industry module, usually files, possibly passed on to other modules in euro-calliope. +- params - the parameters that affect the calculation process and result in this module. +By changing the value of the parameters, each user can tailor the workflow to their own needs. +- setup - anything that concerns the general data pipeline of the module. diff --git a/modules/industry/config.yaml b/modules/industry/config.yaml new file mode 100644 index 00000000..7a698bec --- /dev/null +++ b/modules/industry/config.yaml @@ -0,0 +1,13 @@ +industry: + inputs: + path-energy-balances: build/data/annual-energy-balances.csv + path-cat-names: config/energy-balances/energy-balance-category-names.csv + path-carrier-names: config/energy-balances/energy-balance-carrier-names.csv + path-jrc-industry-energy: build/data/jrc-idees/industry/processed-energy.nc + path-jrc-industry-production: build/data/jrc-idees/industry/processed-production.nc + outputs: + placeholder-out1: + placeholder-out2: + params: + steel: + recycled-steel-share: 0.5 # % of recycled scrap steel for H-DRI diff --git a/modules/industry/env_industry.yaml b/modules/industry/env_industry.yaml new file mode 100644 index 00000000..68c0bbbb --- /dev/null +++ b/modules/industry/env_industry.yaml @@ -0,0 +1,18 @@ +name: module-industry +channels: + - conda-forge + - bioconda +dependencies: + - python=3.11 + - ipdb=0.13.13 + - numpy=1.23 + - pandas=1.5.3 + - pycountry=18.12.8 + - snakemake-minimal=8.10.7 + - netcdf4=1.6.5 + - xarray=2022.9.0 + - bottleneck=1.3.8 + - pip=21.0.1 + - pip: + - styleframe==4.2 + - -e ./lib diff --git a/modules/industry/industry.smk b/modules/industry/industry.smk new file mode 100644 index 00000000..5ea09227 --- /dev/null +++ b/modules/industry/industry.smk @@ -0,0 +1,56 @@ +# Paths dependent on main Snakefile +MODULE_PATH = "modules/industry" +BUILD_PATH = f"{MODULE_PATH}/build" +DATA_PATH = f"{MODULE_PATH}/raw_data" + +# Paths relative to this snakefile (snakemake behaviour is inconsitent) +SCRIPT_PATH = "scripts" # scripts are called relative to this file +CONDA_PATH = "./env_industry.yaml" + +# Ensure rules are defined in order. +# Otherwise commands like "rules.rulename.output" won't work! +rule steel_industry: + message: "Calculate energy demand for the 'Iron and steel' sector in JRC-IDEES." + conda: CONDA_PATH + params: + config_steel = config["params"]["steel"] + input: + path_energy_balances = config["inputs"]["path-energy-balances"], + path_cat_names = config["inputs"]["path-cat-names"], + path_carrier_names = config["inputs"]["path-carrier-names"], + path_jrc_industry_energy = config["inputs"]["path-jrc-industry-energy"], + path_jrc_industry_production = config["inputs"]["path-jrc-industry-production"], + output: + path_output = f"{BUILD_PATH}/annual_demand_steel.nc" + script: f"{SCRIPT_PATH}/steel_industry.py" + +rule chemical_industry: + message: "." + conda: CONDA_PATH + params: + input: + output: + script: f"{SCRIPT_PATH}/chemicals.py" + +rule other_industry: + message: "." + conda: CONDA_PATH + params: + input: + output: f"{BUILD_PATH}/other_industry.csv" + script: f"{SCRIPT_PATH}/other_industry.py" + +# rule combine_and_scale: +# message: "." +# conda: CONDA_PATH +# params: +# input: +# output: +# script: + +# rule verify: +# message: "." +# params: +# input: +# output: +# script: diff --git a/modules/industry/schema.yaml b/modules/industry/schema.yaml new file mode 100644 index 00000000..25f58945 --- /dev/null +++ b/modules/industry/schema.yaml @@ -0,0 +1,55 @@ +$schema: https://json-schema.org/draft/2020-12/schema +type: object +additionalProperties: true +properties: + industry: + description: Module subsection to ensure no name conflicts are possible between modules. + type: object + additionalProperties: false + properties: + inputs: + type: object + additionalProperties: false + description: Inputs are paths of prerequired files. + properties: + path-energy-balances: + type: string + description: | + Annual energy balance file. + Columns [cat_code,carrier_code,unit,country,year,value]. + path-cat-names: + type: string + description: | + Category mapping file. + Columns [cat_code,top_cat,sub_cat_contribution,sub_cat_1,sub_cat_2,jrc_idees]. + path-carrier-names: + type: string + description: | + Carrier mapping file. + Columns [carrier_code,carrier_name,hh_carrier_name,com_carrier_name,ind_carrier_name,oth_carrier_name]. + path-jrc-industry-energy: + type: string + description: | + JRC processed industry energy demand .nc file. + path-jrc-industry-production: + type: string + description: | + JRC processed industrial production .nc file. + outputs: + type: object + description: Outputs are paths for the files produced by the module. + params: + type: object + additionalProperties: false + description: Parameters allow users to configure module behaviour. + properties: + steel: + type: object + additionalProperties: false + description: "Parameters specific to the 'Iron and steel' industry category." + properties: + recycled-steel-share: + type: number + description: "Share of recycled metal in the H-DRI steel process." + minimum: 0 + maximum: 1 diff --git a/modules/industry/scripts/chemical_industry.py b/modules/industry/scripts/chemical_industry.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/industry/scripts/other_industry.py b/modules/industry/scripts/other_industry.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/industry/scripts/steel_industry.py b/modules/industry/scripts/steel_industry.py new file mode 100644 index 00000000..9babe92f --- /dev/null +++ b/modules/industry/scripts/steel_industry.py @@ -0,0 +1,223 @@ +from typing import Optional + +import pandas as pd +import xarray as xr +from utils import filling +from utils import jrc_idees_parser as jrc + +CAT_NAME_STEEL = "Iron and steel" + +H2_LHV_KTOE = 0.0333 # 0.0333 TWh/kt LHV +HDRI_CONSUMPTION = 135e-6 # H-DRI: 135kWh_e/t + + +def _get_h2_to_steel(recycled_steel_share: float) -> float: + """Get t_h2/t_steel, usually for H-DRI.""" + # ASSUME: conversion factor of 0.05 t_h2/t_steel. + return (1 - recycled_steel_share) * 0.05 + + +def get_steel_demand_df( + config_steel: dict, + path_energy_balances: str, + path_cat_names: str, + path_carrier_names: str, + path_jrc_industry_energy: str, + path_jrc_industry_production: str, + path_output: Optional[str] = None, +) -> xr.DataArray: + """Execute the data processing pipeline for the "Iron and steel" sub-sector. + + Args: + config_steel (dict): steel sector configuration. + path_energy_balances (str): country energy balances (usually from eurostat). + path_cat_names (str): eurostat category mapping file. + path_carrier_names (str): eurostat carrier name mapping file. + path_jrc_industry_energy (str): jrc country-specific industrial energy demand file. + path_jrc_industry_production (str): jrc country-specific industrial production file. + path_output (str): location of steel demand output file. + + Returns: + xr.DataArray: steel demand per country. + """ + # Load data + energy_balances_df = pd.read_csv( + path_energy_balances, index_col=[0, 1, 2, 3, 4] + ).squeeze("columns") + cat_names_df = pd.read_csv(path_cat_names, header=0, index_col=0) + carrier_names_df = pd.read_csv(path_carrier_names, header=0, index_col=0) + jrc_energy = xr.open_dataset(path_jrc_industry_energy) + jrc_prod = xr.open_dataarray(path_jrc_industry_production) + + # Ensure dataframes only have data specific to this industry + cat_names_df = cat_names_df[cat_names_df["jrc_idees"] == CAT_NAME_STEEL] + jrc_energy = jrc_energy.sel(cat_name=CAT_NAME_STEEL) + jrc_prod = jrc_prod.sel(cat_name=CAT_NAME_STEEL) + + # Process data + new_steel_demand = transform_jrc_subsector_demand( + jrc_energy, jrc_prod, config_steel + ) + new_steel_demand = filling.fill_missing_countries_years( + energy_balances_df, cat_names_df, carrier_names_df, new_steel_demand + ) + + if path_output is not None: + new_steel_demand.to_netcdf(path_output) + + return new_steel_demand + + +def transform_jrc_subsector_demand( + jrc_energy: xr.Dataset, jrc_prod: xr.Dataset, config_steel: dict +) -> xr.Dataset: + """Processing of steel energy demand for different carriers. + + Calculates energy consumption in the iron and steel industry based on expected + change in processes to avoid fossil feedstocks. All process specific energy consumption + (energy/t_steel) is based on the Electric Arc process (EAF), except sintering, which + will be required for iron ore processed using H-DRI, but is not required by EAF. + + This function does the following: + 1. Finds all the specific consumption values by getting + a. process energy demand / produced steel => specific demand + b. process electrical demand / electrical consumption => electrical efficiency + c. specific demand / electricial efficiency => specific electricity consumption + 2. Gets total process specific electricity consumption by adding specific consumptions + for direct electric processes, EAF, H-DRI, smelting, sintering, refining, and finishing + 3. Gets specific hydrogen consumption for all countries that will process iron ore + 4. Gets specific space heat demand based on demand associated with EAF plants + 5. Gets total demand for electricity, hydrogen, and space heat by multiplying specific + demand by total steel production (by both EAF and BF-BOF routes). + + Args: + jrc_energy_df (xr.Dataset): jrc country-specific steel energy demand. + jrc_prod_df (xr.Dataset): jrc country-specific steel production. + config_steel (dict): configuration for the steel industry. + + Returns: + xr.Dataset: processed dataframe with the expected steel energy consumption. + """ + # Gather relevant industrial processes + sintering_intensity = jrc.get_subsection_final_intensity( + "Integrated steelworks", + "Steel: Sinter/Pellet making", + "Integrated steelworks", + "Electricity", + jrc_energy, + jrc_prod, + fill_empty=True, + ) + + eaf_smelting_intensity = jrc.get_subsection_final_intensity( + "Electric arc", + "Steel: Smelters", + "Electric arc", + "Electricity", + jrc_energy, + jrc_prod, + fill_empty=True, + ) + eaf_intensity = jrc.get_subsection_final_intensity( + "Electric arc", + "Steel: Electric arc", + "Electric arc", + "Electricity", + jrc_energy, + jrc_prod, + fill_empty=True, + ) + refining_intensity = jrc.get_subsection_final_intensity( + "Electric arc", + "Steel: Furnaces, Refining and Rolling", + "Electric arc", + "Electricity", + jrc_energy, + jrc_prod, + fill_empty=True, + ) + finishing_intensity = jrc.get_subsection_final_intensity( + "Electric arc", + "Steel: Products finishing", + "Electric arc", + "Electricity", + jrc_energy, + jrc_prod, + fill_empty=True, + ) + auxiliary_intensity = jrc.get_auxiliary_electric_final_intensity( + "Electric arc", "Electric arc", jrc_energy, jrc_prod, fill_empty=True + ) + + # Total electricity consumption: + # If the country produces steel from Iron ore (smelting): + # sintering/pelletizing * iron_ore_% + smelting * recycled_steel_% + H-DRI + EAF + refining/rolling + finishing + auxiliaries + # If the country only recycles steel: + # smelting + EAF + refining/rolling + finishing + auxiliaries + recycled_share = config_steel["recycled-steel-share"] + + # Limit sintering by the share non-recycled steel + updated_sintering = sintering_intensity * (1 - recycled_share) + + # Update smelting consumption to equal assumed recycling rate + # and add weighted H-DRI consumption to process the remaining iron ore + eaf_smelting_recycled = eaf_smelting_intensity * recycled_share + HDRI_CONSUMPTION + + # Countries with no sintering recycle 100% of steel + updated_eaf_smelting = eaf_smelting_intensity.where( + sintering_intensity == 0 + ).fillna(eaf_smelting_recycled) + + electric_intensity = ( + updated_sintering + + updated_eaf_smelting + + eaf_intensity + + refining_intensity + + finishing_intensity + + auxiliary_intensity + ) + # In case our model now says a country does produce steel, + # we give them the average of energy consumption of all other countries per year + mean_demand_per_year = electric_intensity.mean("country_code") + electric_intensity = electric_intensity.where( + electric_intensity > 0, other=mean_demand_per_year + ) + electric_intensity = electric_intensity.assign_coords(carrier_name="electricity") + + # Hydrogen consumption for H-DRI: + # only for country/year that handle iron ore and don't recycle all their steel + h_dri_h2_intensity = H2_LHV_KTOE * _get_h2_to_steel(recycled_share) + + h2_intensity = electric_intensity.where(sintering_intensity > 0).fillna(0) + h2_intensity = h2_intensity.where(h2_intensity == 0, h_dri_h2_intensity) + h2_intensity = h2_intensity.assign_coords(carrier_name="hydrogen") + + # Low heat + low_heat_intensity = jrc.get_subsection_useful_intensity( + "Electric arc", "Low enthalpy heat", "Electric arc", jrc_energy, jrc_prod + ) + low_heat_intensity = low_heat_intensity.assign_coords(carrier_name="space_heat") + + # Combine and transform to energy demand + total_intensity = xr.concat( + [electric_intensity, h2_intensity, low_heat_intensity], dim="carrier_name" + ) + steel_energy_demand = total_intensity * jrc_prod.sum("produced_material") + + # Prettify + steel_energy_demand = steel_energy_demand.assign_attrs(units="twh") + steel_energy_demand.name = "demand" + + return steel_energy_demand + + +if __name__ == "__main__": + get_steel_demand_df( + config_steel=snakemake.params.config_steel, + path_energy_balances=snakemake.input.path_energy_balances, + path_cat_names=snakemake.input.path_cat_names, + path_carrier_names=snakemake.input.path_carrier_names, + path_jrc_industry_energy=snakemake.input.path_jrc_industry_energy, + path_jrc_industry_production=snakemake.input.path_jrc_industry_production, + path_output=snakemake.output.path_output, + ) diff --git a/modules/industry/scripts/utils/filling.py b/modules/industry/scripts/utils/filling.py new file mode 100644 index 00000000..4081b4d9 --- /dev/null +++ b/modules/industry/scripts/utils/filling.py @@ -0,0 +1,99 @@ +import eurocalliopelib.utils as ec_utils +import numpy as np +import pandas as pd +import xarray as xr +from utils import jrc_idees_parser as jrc + + +def fill_missing_countries_years( + eurostat_balances: pd.DataFrame, + cat_names: pd.DataFrame, + carrier_names: pd.DataFrame, + jrc_subsector_demand: xr.DataArray, +) -> xr.DataArray: + """ + For countries without relevant data in JRC_IDEES we use their + Eurostat energy balance data to estimate future energy consumption, relative to the + energy balance data of the 28 countries for which we do have JRC IDEES data. + JRC data is available until 2015, so for future years data for all countries is filled + based on Eurostat energy balances. + Any other missing years are filled in based on average consumption of a country. + """ + # Ensure countries follow our standard alpha3 convention. + jrc_countries = jrc_subsector_demand["country_code"].values + + # Build eurostat annual industry balances + eurostat_industry_balances = ( + eurostat_balances.unstack(["year", "country"]) + .groupby( + [ + cat_names.jrc_idees.dropna().to_dict(), + carrier_names.ind_carrier_name.dropna().to_dict(), + ], + level=["cat_code", "carrier_code"], + ) + .sum(min_count=1) + .groupby(["cat_code"]) + .sum(min_count=1) + .stack("country") + .rename_axis(index=["cat_name", "country_code"]) + .apply(ec_utils.tj_to_twh) + ) + # If JRC-IDEES data exists, there will be data in 'energy_consumption' for that country + jrc_balances = eurostat_industry_balances[ + eurostat_industry_balances.index.get_level_values("country_code").isin( + jrc_countries + ) + ] + # Otherwise, there will only be data for that country in annual energy balances + nonjrc_balances = eurostat_industry_balances[ + ~eurostat_industry_balances.index.get_level_values("country_code").isin( + jrc_countries + ) + ] + # Obtain share of total energy demand in missing countries per year + nonjrc_subsector_share = nonjrc_balances.div(jrc_balances.groupby("cat_name").sum()) + nonjrc_subsector_share = nonjrc_subsector_share.stack().to_xarray() + + # Fill missing countries in relation to their total energy share. + # E.g., if CHE consumes a total share of 1% -> assume it consumes 1% of total electricity + total_annual_demand = jrc_subsector_demand.sum("country_code") + nonjrc_subsector_demand = nonjrc_subsector_share * total_annual_demand + all_subsector_demand = jrc_subsector_demand.combine_first(nonjrc_subsector_demand) + + # Sometimes JRC-IDEES has consumption, but Eurostat doesn't, leading to inf values + # E.g., RO: non ferrous metals + # Correct and fill with mean values. + eurostat_industry_balance_xr = eurostat_industry_balances.stack().to_xarray() + country_mean_demand = (all_subsector_demand / eurostat_industry_balance_xr).mean( + "year" + ) + country_mean_demand = country_mean_demand.where(lambda x: ~np.isinf(x) & (x > 0)) + country_mean_demand = country_mean_demand.fillna( + country_mean_demand.mean("country_code") + ) + + # Fill data where JRC says there is no consumption of any form in a country's industry subsector + # But where the energy balances show consumption (e.g. UK, Wood and wood products) + _to_fill = all_subsector_demand + _filler = eurostat_industry_balance_xr * country_mean_demand + + extra_years = [y for y in _filler.year if y > _to_fill.year.max()] + _to_fill = xr.concat([_to_fill, _filler.sel(year=extra_years)], dim="year") + _to_fill = _to_fill.where(_to_fill.sum("carrier_name") > 0).fillna(_filler) + + # Fill remaining missing year data + # Interpolate middle years -> backfill older years -> forwardfill newer years + _to_fill = _to_fill.interpolate_na( + dim="year", use_coordinate="year", method="linear" + ) + _to_fill = _to_fill.bfill(dim="year") + all_filled = _to_fill.ffill(dim="year") + + all_filled = jrc.ensure_standard_coordinates(all_filled) + all_filled = all_filled.assign_attrs(units="twh") + + assert ~all_filled.isnull().any(), "Filling failed, found null values." + assert ~np.isinf(all_filled).any(), "Filling failed, found inf values." + + return all_filled diff --git a/modules/industry/scripts/utils/jrc_idees_parser.py b/modules/industry/scripts/utils/jrc_idees_parser.py new file mode 100644 index 00000000..23bca7ba --- /dev/null +++ b/modules/industry/scripts/utils/jrc_idees_parser.py @@ -0,0 +1,114 @@ +from typing import Union + +import numpy as np +import xarray as xr + +STANDARD_COORDS = ["cat_name", "year", "country_code", "carrier_name"] + + +def ensure_standard_coordinates(ds: Union[xr.Dataset, xr.DataArray]): + """Remove all coordinates that do not match a predefined standard.""" + removed_coords = set(ds.coords).difference(STANDARD_COORDS) + removed_dims = removed_coords.intersection(ds.dims) + if removed_dims: + raise ValueError(f"Cannot ensure standard coordinates for {removed_dims}.") + + ds = ds.drop(removed_coords) + return ds + + +def get_auxiliary_electric_final_intensity( + section: str, + material: str, + jrc_energy: xr.Dataset, + jrc_prod: xr.DataArray, + fill_empty: bool = False, +) -> xr.DataArray: + """Wrapper for auxiliary electrical processes.""" + auxiliaries = ["Lighting", "Air compressors", "Motor drives", "Fans and pumps"] + auxiliary_intensity = sum( + get_subsection_final_intensity( + section, aux, material, "Electricity", jrc_energy, jrc_prod, fill_empty + ) + for aux in auxiliaries + ) + + return auxiliary_intensity + + +def get_subsection_final_intensity( + section: str, + subsection: str, + material: str, + carrier_name: str, + jrc_energy: xr.Dataset, + jrc_prod: xr.DataArray, + fill_empty: bool = False, +) -> xr.DataArray: + """Get final energy intensity of a given JRC section/subsection/material.""" + # Extract relevant section and subsection data. + final_demand = jrc_energy["final"].sel(section=section, subsection=subsection) + useful_demand = jrc_energy["useful"].sel(section=section, subsection=subsection) + production = jrc_prod.sel(produced_material=material) + + total_eff = useful_demand / final_demand + carrier_eff = total_eff.where(total_eff > 0).sel(carrier_name=carrier_name) + if fill_empty: + # First by country avg. (all years), then by year avg. (all countries). + carrier_eff = carrier_eff.fillna(carrier_eff.mean(dim="year")) + carrier_eff = carrier_eff.fillna(carrier_eff.mean(dim="country_code")) + + # Get the useful energy intensity of all production (e.g., twh/kt_steel) + useful_intensity = useful_demand.sum(dim="carrier_name") / production + # Then reconstruct final intensity. + final_intensity = useful_intensity / carrier_eff + + # Prettify + final_intensity = ensure_standard_coordinates(final_intensity) + final_intensity = final_intensity.assign_attrs(units="twh/kt") + final_intensity.name = "final_intensity" + + assert final_intensity.sum() >= useful_intensity.sum(), "Creating energy!" + + return final_intensity.fillna(0) + + +def get_subsection_useful_intensity( + section: str, + subsection: str, + material: str, + jrc_energy: xr.Dataset, + jrc_prod: xr.DataArray, +) -> xr.DataArray: + """Get useful energy intensity of a given section/subsection/material.""" + useful_demand = jrc_energy["useful"].sel(section=section, subsection=subsection) + production = jrc_prod.sel(produced_material=material) + useful_intensity = (useful_demand / production).sum("carrier_name") + + # Prettify + useful_intensity = ensure_standard_coordinates(useful_intensity) + useful_intensity = useful_intensity.assign_attrs(units="twh/kt") + useful_intensity.name = "useful_intensity" + + assert ~np.isinf( + useful_intensity + ).any(), f"Zero division ocurred for {section}.{subsection} and {material}!" + + return useful_intensity.fillna(0) + + +# TODO: fix me! +# def get_carrier_demand( +# carrier: str, all_demand_df: pd.DataFrame, jrc_energy: xr.Dataset +# ) -> pd.DataFrame: +# """ +# Get demand for a specific carrier, assuming all end use demand that could consume +# that carrier are completely met by that carrier. +# """ +# energy = jrc_energy.xs(carrier, level="carrier_name") +# energy_efficiency = energy.xs("demand").div(energy.xs("consumption")) +# # Fill NaNs (where there is demand, but no consumption in that country) +# # with the average efficiency a. from the country, b. from all countries +# energy_efficiency = energy_efficiency.fillna(energy_efficiency.mean()) + +# return all_demand_df.reindex(energy_efficiency.index).div(energy_efficiency) diff --git a/scripts/jrc-idees/industry.py b/scripts/jrc-idees/industry.py index 22b2a93d..9564b5ab 100644 --- a/scripts/jrc-idees/industry.py +++ b/scripts/jrc-idees/industry.py @@ -1,7 +1,7 @@ import logging from itertools import product from multiprocessing import Pool -from typing import Callable, Literal, Optional, Union +from typing import Callable, Literal, Union import numpy as np import pandas as pd @@ -54,25 +54,18 @@ def process_jrc_industry_data( if dataset == "energy": processed_data = process_sheets(paths_to_data, threads, get_jrc_idees_energy) unit = "twh" - variable_col = "energy" elif dataset == "production": processed_data = process_sheets(paths_to_data, 1, get_jrc_idees_production) unit = "kt" - variable_col = None - processed_xr_data = df_to_xr(processed_data, variable_col, unit) + processed_xr_data = df_to_xr(processed_data, unit) processed_xr_data.to_netcdf(out_path) -def df_to_xr( - df: pd.DataFrame, variable_col: Optional[str], unit: str -) -> Union[xr.Dataset, xr.DataArray]: +def df_to_xr(df: pd.DataFrame, unit: str) -> Union[xr.Dataset, xr.DataArray]: df.columns = df.columns.rename("year").astype(int) - if variable_col is not None: - xr_data = df.stack().unstack(variable_col).to_xarray() - else: - xr_data = df.stack().to_xarray() + xr_data = df.stack().unstack("variable").to_xarray() country_code_mapping = utils.convert_valid_countries( xr_data.country_code.values, errors="ignore" @@ -81,7 +74,10 @@ def df_to_xr( xr_data, country_code_mapping, dim_name="country_code" ) - return xr_data.assign_attrs(unit=unit) + for var in xr_data.data_vars: + xr_data[var] = xr_data[var].assign_attrs(units=unit) + + return xr_data def process_sheets( @@ -116,7 +112,8 @@ def get_jrc_idees_production(sheet_name: str, file: str) -> pd.DataFrame: .str.replace("(kt ", "(", regex=False) .str.strip() ) - return df_processed.set_index(["country_code", "cat_name"], append=True) + df_processed = df_processed.assign(variable="production") + return df_processed.set_index(["variable", "country_code", "cat_name"], append=True) def get_jrc_idees_energy(sheet: str, file: str) -> pd.DataFrame: @@ -125,7 +122,7 @@ def get_jrc_idees_energy(sheet: str, file: str) -> pd.DataFrame: final_energy = _get_jrc_idees_energy_sheet(f"{sheet}_fec", xls) useful_energy = _get_jrc_idees_energy_sheet(f"{sheet}_ued", xls) return pd.concat( - [final_energy, useful_energy], names=["energy"], keys=["final", "useful"] + [final_energy, useful_energy], names=["variable"], keys=["final", "useful"] )