-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathutils.py
More file actions
134 lines (121 loc) · 4.79 KB
/
utils.py
File metadata and controls
134 lines (121 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import csv
from functools import lru_cache
import sqlalchemy as sa
from geonature.utils.env import db
from pypnnomenclature.models import (
TNomenclatures as Nomenclature,
BibNomenclaturesTypes as NomenclatureType,
)
from .models import CorSensitivityCriteria, SensitivityRule
@lru_cache(maxsize=64)
def get_nomenclature(type_mnemonique, code):
# Retro-compatibility with freezed nomenclatures
if type_mnemonique == "STATUT_BIO" and code in ["6", "7", "8", "10", "11", "12"]:
type_mnemonique = "OCC_COMPORTEMENT"
return db.session.execute(
sa.select(Nomenclature).where(
Nomenclature.active == True, # noqa: E712
Nomenclature.nomenclature_type.has(NomenclatureType.mnemonique == type_mnemonique),
Nomenclature.cd_nomenclature == code,
)
).scalar_one()
def insert_sensitivity_referential(source, csvfile):
statut_biologique_nomenclature_type = db.session.execute(
sa.select(NomenclatureType).filter_by(mnemonique="STATUT_BIO")
).scalar_one()
behaviour_nomenclature_type = db.session.execute(
sa.select(NomenclatureType).filter_by(mnemonique="OCC_COMPORTEMENT")
).scalar_one()
defaults_nomenclatures = {
statut_biologique_nomenclature_type: set(
db.session.scalars(
sa.select(Nomenclature).where(
Nomenclature.nomenclature_type == statut_biologique_nomenclature_type,
Nomenclature.mnemonique.in_(["Inconnu", "Non renseigné", "Non Déterminé"]),
)
).all()
),
behaviour_nomenclature_type: set(
db.session.scalars(
sa.select(Nomenclature).where(
Nomenclature.nomenclature_type == behaviour_nomenclature_type,
Nomenclature.mnemonique.in_(["NSP", "1"]),
)
).all()
),
}
rules = []
criterias = set()
reader = csv.DictReader(csvfile, delimiter=";")
dep_col = next(
fieldname for fieldname in reader.fieldnames if fieldname in ["CD_DEP", "CD_DEPT"]
)
for row in reader:
sensi_nomenclature = get_nomenclature("SENSIBILITE", code=row["CD_SENSIBILITE"])
if row[dep_col] == "D3":
cd_dep = "973"
elif row[dep_col] == "D4":
cd_dep = "974"
else:
cd_dep = row[dep_col]
if row["DUREE"]:
duration = int(row["DUREE"])
else:
duration = 10000
rule = {
"cd_nom": int(row["CD_NOM"]),
"nom_cite": row["NOM_CITE"],
"id_nomenclature_sensitivity": sensi_nomenclature.id_nomenclature,
"sensitivity_duration": duration,
"sensitivity_territory": "Département",
"id_territory": cd_dep,
"source": f"{source}",
"comments": row["AUTRE"],
"active": True,
}
_criterias = set()
if row["STATUT_BIOLOGIQUE"]:
criteria = get_nomenclature("STATUT_BIO", code=row["STATUT_BIOLOGIQUE"])
_criterias |= {criteria} | defaults_nomenclatures[criteria.nomenclature_type]
if row["COMPORTEMENT"]:
criteria = get_nomenclature("OCC_COMPORTEMENT", code=row["COMPORTEMENT"])
_criterias |= {criteria} | defaults_nomenclatures[criteria.nomenclature_type]
for criteria in _criterias:
criterias.add((len(rules), criteria))
rules.append(rule)
results = db.session.execute(
sa.insert(SensitivityRule).values(rules).returning(SensitivityRule.id)
)
rules_indexes = [rule_index for rule_index, in results] # flattening singleton results
db.session.execute(
sa.insert(CorSensitivityCriteria).values(
[
{
"id_sensitivity": rules_indexes[rule_idx],
"id_type_nomenclature": nomenclature.id_type,
"id_criteria": nomenclature.id_nomenclature,
}
for rule_idx, nomenclature in criterias
]
)
)
# Populate cor_sensitivity_area
db.session.connection().execute(
sa.text(
"""
INSERT INTO gn_sensitivity.cor_sensitivity_area
SELECT DISTINCT id_sensitivity, id_area
FROM gn_sensitivity.t_sensitivity_rules s
JOIN ref_geo.l_areas a
ON
s.sensitivity_territory = 'Département'
AND a.id_type = (SELECT id_type FROM ref_geo.bib_areas_types WHERE type_code ='DEP')
AND regexp_replace(s.id_territory, '^([0-9])$', '0\\1') = a.area_code
WHERE s.source = :source
"""
),
source=source,
)
return len(rules)
def remove_sensitivity_referential(source):
return db.session.execute(sa.delete(SensitivityRule).where(SensitivityRule.source == source))