-
Notifications
You must be signed in to change notification settings - Fork 0
/
factor.py
executable file
·255 lines (218 loc) · 8.36 KB
/
factor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#
# SPDX-FileCopyrightText: 2023 Simularia s.r.l. <info@simualaria.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import logging
import pandas as pd
import numpy as np
import sys
import math
def odour(met, conf, ind):
"""Odour scheme for rescaling emissions."""
logger = logging.getLogger()
logger.debug("{}".format(odour.__doc__))
gamma = 0.5
dt = {
"rural": [0.07, 0.07, 0.1, 0.15, 0.35, 0.55],
"urban": [0.15, 0.15, 0.2, 0.25, 0.3, 0.3],
}
tab = pd.DataFrame(data=dt, index=["A", "B", "C", "D", "E", "F"])
if "vref" in conf["sources"][ind].keys():
vref = conf["sources"][ind]["vref"]
logger.debug("Reference velocity available from the")
logger.debug("configuration toml file.")
else:
vref = 0.3
logger.debug("Reference velocity not available from the")
logger.debug(f"configuration toml file: {vref} default value.")
rat = conf["sources"][ind]["height"] / met["z"]
if "terrain" in conf["sources"][ind].keys() and "stabclass" in met.keys():
logger.debug("Terrain type and stability class information")
logger.debug("are available: computing beta.")
beta = pd.DataFrame(
columns=["val"],
data=tab[conf["sources"][ind]["terrain"]][met["stabclass"]].to_list(),
index=met.index.values.tolist(),
)
tmp = (met["ws"] * (rat.pow(beta["val"])) / vref) ** gamma
else:
beta = 0.55 # default value
logger.debug("Terrain type or stability class information")
logger.debug(f"are missing: default beta value = {beta}.")
tmp = (met["ws"] * (rat.pow(beta)) / vref) ** gamma
colname = str(conf["sources"][ind]["id"]) + "_" + conf["sources"][ind]["species"][0]
met.insert(len(met.columns), colname, round(tmp, 2))
return met
def sympar(sym, alpha):
if sym:
ppsa = np.array([40.0, 48.0, 12.0, 0.0])
if not sym:
ppsa = np.empty((len(alpha), 4), dtype=float)
for ind in range(0, len(alpha)):
val = alpha[ind]
if (val >= 0.0) and (val <= 20.0):
ppsa[ind, :] = np.array([36.0, 50.0, 14.0, 0.0])
if (val > 20.0) and (val <= 40.0):
ppsa[ind, :] = np.array([31.0, 51.0, 15.0, 3.0])
if (val > 40.0) and (val <= 90.0):
ppsa[ind, :] = np.array([28.0, 54.0, 14.0, 4.0])
return ppsa
def inc2alpha(input):
if input > 360.0:
input = input - 360.0
if input < 0.0:
input = input + 360.0
if input >= 270.0:
alpha = input - 270.0
if (input < 270.0) and (input > 180):
alpha = 270 - input
if (input <= 180.0) and (input > 90):
alpha = input - 90.0
if input <= 90.0:
alpha = 90.0 - input
return alpha
def asymsurface(major, minor, height):
"""Computation of asymmetric shape (trapezoidal prism)."""
logger = logging.getLogger()
logger.debug("{}".format(asymsurface.__doc__))
if height >= (minor / 2):
logger.info("Invalid geometrical values of the cumulus")
logger.info(f"({height} >= {minor/2}) causes")
logger.info("lateral slope over 45°: exit.")
sys.exit()
top = minor / 2 - height
slope = 180.0 * math.atan(height / ((minor - top) / 2)) / math.pi
logger.info(f"Trapezoidal top side is {round(top,1)} meters and")
logger.info(f"the lateral slope is {round(slope,1)} degrees.")
oblique = math.sqrt(height**2 + ((minor - height) / 2) ** 2)
s = height * (minor + top) + (2 * oblique + top) * major
return s
def scheme2(met, conf, ind):
"""Cumulus scheme to set emissions."""
logger = logging.getLogger()
logger.debug("{}".format(scheme2.__doc__))
sou = conf["sources"][ind]
if set(sou["species"]) != set(["PM25", "PM10", "PTS"]):
logger.info(f"Invalid species in source {sou['id']}: exit.")
sys.exit()
listasym = ["major", "minor", "angle", "height"]
asymmetric = all(item in sou.keys() for item in listasym)
listcon = ["radius", "height"]
conical = all(item in sou.keys() for item in listcon)
if (asymmetric + conical) != 1:
logger.info(f"Undefined shape of source {sou['id']}: exit.")
sys.exit()
if asymmetric:
logger.debug(f"Source {sou['id']} has asymmetric shape.")
major = sou["major"]
minor = sou["minor"]
if abs(sou["angle"]) > 90.0:
logger.info(f"Bad definition of angle {sou['angle']}: exit.")
sys.exit()
if major <= minor:
logger.info("Bad definition of geometry (major < minor),")
logger.info(f"{major} < {minor}: exit.")
sys.exit()
# ap1 = math.sqrt((major / 5)**2 + sou['height']**2)
# ap2 = math.sqrt((minor / 3)**2 + sou['height']**2)
# s = 8 * major * ap2 / 5 + 4 * minor * ap1 / 3
s = asymsurface(major, minor, sou["height"])
base = minor
# computing angle to select EPA case
if sou["angle"] < 0.0:
ainc = met["wd"] - (-90.0 - sou["angle"])
else:
ainc = met["wd"] - (90.0 - sou["angle"])
alpha = ainc.apply(inc2alpha)
ppsa = sympar(False, alpha.to_numpy())
elif conical:
r = sou["radius"]
h = sou["height"]
s = math.pi * r * math.sqrt(r**2 + h**2)
base = 2 * r
ppsa = sympar(True, 1.0)
ppsa = np.repeat(a=ppsa, repeats=len(met["ws"]), axis=0)
logger.debug(f"Source {sou['id']} has conical shape.")
else:
logger.info(f"Undefined shape of source {sou['id']}: exit.")
sys.exit()
if sou["height"] / base <= 0.2:
psba = [1, 1, 1, 1]
else:
psba = [0.2, 0.6, 0.9, 1.1]
# roughness in cm to meters
if "roughness" in sou.keys():
z0 = sou["roughness"] / 100.0
else:
z0 = 0.005
# scale wind speed to 10 m height
ws10 = met["ws"] * (np.log(10.0 / z0)) / (np.log(met["z"] / z0))
# from wind speed to fastest mile
a = 1.6
b = 0.43
fm = a * ws10 + b
# computing friction velocity
ust = np.empty((len(fm), len(psba)))
for idx, psbai in enumerate(psba):
ust[:, idx] = 0.4 * fm / np.log(0.25 / z0) * psbai
tfv = sou["tfv"] * np.ones((len(fm), len(psba)))
ust = np.where(ust > tfv, ust, tfv)
# erosion potential
p = np.zeros_like(ust)
p = 58 * (ust - tfv) ** 2 + 25 * (ust - tfv)
# parameter for PTS, PM25, PM10
k25 = 0.075
k10 = 0.5
kpts = 1.0
# building the emission in mcg
ptot = np.zeros(len(p[:, 0]), float)
for ind in range(0, len(p)):
ptot[ind] = np.dot(p[ind, :], ppsa[ind, :]) * (s / 100.0) * 10**6.0
# building species name for met dataframe
pm25 = str(sou["id"]) + "_" + "PM25"
pm10 = str(sou["id"]) + "_" + "PM10"
pts = str(sou["id"]) + "_" + "PTS"
met.insert(len(met.columns), pm25, np.around(k25 * ptot, 2))
met.insert(len(met.columns), pm10, np.around(k10 * ptot, 2))
met.insert(len(met.columns), pts, np.around(kpts * ptot, 2))
return met
def scheme3(met, conf, ind):
"""Cumulus scheme to set emissions in absence of wind data."""
logger = logging.getLogger()
logger.debug("{}".format(scheme2.__doc__))
sou = conf["sources"][ind]
if set(sou["species"]) != set(["PM25", "PM10", "PTS"]):
logger.info(f"Invalid species in source {sou['id']}: exit.")
sys.exit()
listnw = ["radius", "height", "movh"]
conic = all(item in sou.keys() for item in listnw)
if conic:
r = sou["radius"]
h = sou["height"]
s = math.pi * r * math.sqrt(r**2 + h**2)
movh = sou["movh"]
else:
logger.info(f"Missing parameters in source {sou['id']}: exit.")
sys.exit()
if h / (2 * r) > 0.2:
logger.debug("High mounds case.")
efpm25 = 1.26e-06
efpm10 = 7.9e-06
efpts = 1.6e-05
else:
logger.debug("Low mounds case.")
efpm25 = 3.8e-05
efpm10 = 2.5e-04
efpts = 5.1e-04
epm25 = (10**9) * efpm25 * s * movh
epm10 = (10**9) * efpm10 * s * movh
epts = (10**9) * efpts * s * movh
# building species name for met dataframe
pm25 = str(sou["id"]) + "_" + "PM25"
pm10 = str(sou["id"]) + "_" + "PM10"
pts = str(sou["id"]) + "_" + "PTS"
met.insert(len(met.columns), pm25, np.around(epm25, 2))
met.insert(len(met.columns), pm10, np.around(epm10, 2))
met.insert(len(met.columns), pts, np.around(epts, 2))
return met