diff --git a/dispaset/preprocessing/utils.py b/dispaset/preprocessing/utils.py index 88dc6c8a..1c82dfad 100644 --- a/dispaset/preprocessing/utils.py +++ b/dispaset/preprocessing/utils.py @@ -729,9 +729,8 @@ def adjust_unit_capacity(SimData, u_idx, scaling=1, value=None, singleunit=False """ Function used to modify the installed capacities in the Dispa-SET generated input data The function update the Inputs.p file in the simulation directory at each call - :param SimData: Input data dictionary - :param u_idx: names of the units to be scaled + :param u_idx: names of the units to be scaled :param scaling: Scaling factor to be applied to the installed capacity :param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !) :param singleunit: Set to true if the technology should remain lumped in a single unit @@ -744,7 +743,7 @@ def adjust_unit_capacity(SimData, u_idx, scaling=1, value=None, singleunit=False if scaling > 1E10: logging.warning('adjust_unit_capacity: scaling factor is too high (' + str(scaling) + ')') return SimData - + # find the units to be scaled: units = SimData['units'].loc[u_idx,:] cond = SimData['units'].index.isin(u_idx) @@ -787,19 +786,21 @@ def adjust_unit_capacity(SimData, u_idx, scaling=1, value=None, singleunit=False -def adjust_capacity(inputs, tech_fuel, scaling=1, value=None, singleunit=False, write_gdx=False, dest_path=''): +def adjust_capacity(inputs, tech_fuel, scaling=1, value=None, singleunit=False, sto_fp_time_range=None, write_gdx=False, dest_path='', temp_path='Inputs.gdx'): """ Function used to modify the installed capacities in the Dispa-SET generated input data The function update the Inputs.p file in the simulation directory at each call - - :param inputs: Input data dictionary OR path to the simulation directory containing Inputs.p - :param tech_fuel: tuple with the technology and fuel type for which the capacity should be modified - :param scaling: Scaling factor to be applied to the installed capacity - :param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !) - :param singleunit: Set to true if the technology should remain lumped in a single unit - :param write_gdx: boolean defining if Inputs.gdx should be also overwritten with the new data - :param dest_path: Simulation environment path to write the new input data. If unspecified, no data is written! - :return: New SimData dictionary + :param inputs: Input data dictionary OR path to the simulation directory containing Inputs.p + :param tech_fuel: tuple with the technology and fuel type for which the capacity should be modified + :param scaling: Scaling factor to be applied to the installed capacity + :param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !) + :param singleunit: Set to true if the technology should remain lumped in a single unit + :param sto_fp_time_range: Ignored for non storage units. Specifies the range of full power time contained in the storage + the units have to fit in. If none, infinite range is considered + :param temp_path: The path to the location where to temporary build the Inputs.gdx + :param write_gdx: boolean defining if Inputs.gdx should be also overwritten with the new data + :param dest_path: Simulation environment path to write the new input data. If unspecified, no data is written! + :return: New SimData dictionary """ import pickle @@ -820,9 +821,23 @@ def adjust_capacity(inputs, tech_fuel, scaling=1, value=None, singleunit=False, if not isinstance(tech_fuel, tuple): sys.exit('tech_fuel must be a tuple') + # find the units to be scaled: cond = (SimData['units']['Technology'] == tech_fuel[0]) & (SimData['units']['Fuel'] == tech_fuel[1]) + + # add condition to filter storage units outside the range of full power time + if tech_fuel[0] in ["BATS", "BEVS", "CAES", "P2GS", "THMS"]: + if sto_fp_time_range is None: + sto_fp_time_range = (-np.infty, np.infty) + elif not isinstance(sto_fp_time_range, tuple): + logging.error("The range of full power time is invalid (not a tuple)") + sys.exit(1) + + fp_time = 3600 * SimData["units"]["PowerCapacity"] / SimData["units"]["STOCapacity"] + cond &= (sto_fp_time_range[0] <= fp_time) & (fp_time < sto_fp_time_range[1]) + u_idx = SimData['units'][cond].index.tolist() + SimData = adjust_unit_capacity(SimData, u_idx, scaling=scaling, value=value, singleunit=singleunit) if dest_path == '': logging.info('Not writing any input data to the disk') @@ -834,11 +849,38 @@ def adjust_capacity(inputs, tech_fuel, scaling=1, value=None, singleunit=False, with open(os.path.join(dest_path, 'Inputs.p'), 'wb') as pfile: pickle.dump(SimData, pfile, protocol=pickle.HIGHEST_PROTOCOL) if write_gdx: - write_variables(SimData['config'], 'Inputs.gdx', [SimData['sets'], SimData['parameters']]) - shutil.copy('Inputs.gdx', dest_path + '/') - os.remove('Inputs.gdx') + write_variables(SimData['config'], temp_path, [SimData['sets'], SimData['parameters']]) + shutil.copy(temp_path, os.path.join(dest_path, 'Inputs.gdx')) + os.remove(temp_path) return SimData +def ranges_from_tresholds(thresholds, only_thresholds=False): + """ + Outputs a list of ranges as a list of tuples from a list of thresholds. + :param tresholds: A list of tresholds to separate ranges at + :param only_thresholds: If false, add a bottom 0 threshold and ceil +np.infty bound, else restrict to given values + """ + all = [] + if not only_thresholds: + if thresholds[0] != 0: + all.append(0) + + all += thresholds + + if all[-1] != np.inf: + all.append(np.inf) + else: + all = thresholds + + res = [] + for i in range(len(all)-1): + res.append((all[i], all[i+1])) + + # for x, y in zip(all[:-1], all[1:]): + # res.append((x, y)) + + return res + def adjust_flexibility(inputs, flex_units, slow_units, flex_ratio, singleunit=False, write_gdx=False, dest_path=''): """