In [None]:
import re
import overpy
import utils.ovp2geojson as o2p
import geojson
from geojson import FeatureCollection, Feature, Polygon

In [None]:
api = overpy.Overpass(max_retry_count=10, retry_timeout=10)
query = f"""
(
// boundary fehlt
wr["protect_class"="14"][!"boundary"];

// protection_title fehlt
wr["protect_class"="14"][!"protection_title"];

// acces Wert fehlt
wr["protect_class"="14"][!"access"][!"access:conditional"][!"access:offroad"][!"access:offroad:conditional"];

// protection_title Gebietsverbot ohne passende access Werte
wr["protect_class"="14"]["protection_title"="Gebietsverbot"]["access"!="no"]["access:conditional"!~"^no @"];

// access Werte deuten auf Gebietsverbot aber ohne passenden protection_title
wr["protect_class"="14"]["protection_title"!="Gebietsverbot"]["access"="no"];
wr["protect_class"="14"]["protection_title"!="Gebietsverbot"]["access:conditional"~"^no @"]; 

// protection_title Wegegebot ohne passende access Werte
wr["protect_class"="14"]["protection_title"="Wegegebot"]["access:offroad"!="no"]["access:offroad:conditional"!~"^no @"];

// access Werte deuten auf Wegegebot aber ohne passenden protection_title (unscharfes ~ wg. "Gebietsverbot und Wegegebot")
wr["protect_class"="14"]["protection_title"!~"Wegegebot"]["access:offroad"="no"];
wr["protect_class"="14"]["protection_title"!="Wegegebot"]["access:offroad:conditional"~"^no @"]; 

// protection_title Gebietsverbot und Wegegebot ohne passende access Werte
wr["protect_class"="14"]["protection_title"="Gebietsverbot und Wegegebot"]["access:offroad"!="no"]["access:offroad:conditional"!~"^no @"];

//  access Werte deuten auf Gebietsverbot und Wegegebot aber ohne passenden protection_title
wr["protect_class"="14"]["protection_title"!="Gebietsverbot und Wegegebot"]["access:offroad"="no"]["access:offroad:conditional"~"^no @"];

// protection_title Schongebiet ohne passende access Werte
wr["protect_class"="14"]["protection_title"="Schongebiet"]["access"!="discouraged"]["access:conditional"!~"^discouraged @"];

// access Werte deuten auf Schongebiet aber ohne passenden protection_title
wr["protect_class"="14"]["protection_title"!="Schongebiet"]["access"="discouraged"];
wr["protect_class"="14"]["protection_title"!="Schongebiet"]["access:conditional"~"^discouraged no @"]; 

// protection_title Wegempfehlung ohne passende access Werte
wr["protect_class"="14"]["protection_title"="Wegempfehlung"]["access:offroad"!="discouraged"]["access:offroad:conditional"!~"^discouraged @"];

//  access Werte deuten auf Wegempfehlung aber ohne passenden protection_title
wr["protect_class"="14"]["protection_title"!="Wegempfehlung"]["access:offroad"="discouraged"];
wr["protect_class"="14"]["protection_title"!="Wegempfehlung"]["access:offroad:conditional"~"^discouraged @"];

);

(._; >;);

// out;
out meta;
"""
result = api.query(query)

In [None]:
def parse_ovp(line):
    """
    translate an overpass query into a python condition statement
    """
    line = re.sub(r'^wr\["','', re.sub(r'"\];\s*','', line))
    conditions = []
    for cond in line.split(']['):
        cond = re.sub(r'^"', '', re.sub(r'"$', '', cond))
        notStr = ""
        var = re.sub(r'\".*','', cond)
        if var == "!":
            val = re.sub(r'.*\"(.*)', r'\1', cond)
            condStr = f"not '{val}' in tags"
        else:
            op = re.sub(r'.*\"(.*)\".*', r'\1', cond).replace("=","==").replace("!==", "!=")
            val = re.sub(r'.*\".*\"(.*)', r'\1', cond)
            if op != "~" and op != "!~":
                condStr = f"tags['{var}'] {op} \"{val}\""
            else:
                condStr = f"re.match(r'{val}', tags['{var}'])"
            # also handled by try/except for eval...
            if op == "==" or op == "~":
                condStr = f"'{var}' in tags and "+condStr
            if op == "!~":
                condStr = "not "+condStr
        conditions.append(condStr)
    return(' and '.join(conditions))

In [None]:
def parse_queries(query):
    """
    parse a multiline overpass query, needs to be like
    // rule title
    wr[...][...]
    wr[...][...]

    // rule title
    wr[...][...]
    """
    rules = []
    title = ""
    qList = query.split("\n")
    for line in qList:
        if re.match(r'^//.*', line):
            title = re.sub(r'^//\s+', '', line)
            checks = []
        elif re.match(r'^wr.*', line):
            pyStr = parse_ovp(line)
            checks.append(pyStr)
        elif title != "":
            rules.append({"title": title, "checks": checks})
            title = ""
    return rules

In [None]:
rules = parse_queries(query)

def check_rules(rules, tags):
    """
    check a dict of tags against a set of rules
    """
    fails = []
    for rule in rules:
        for check in rule['checks']:
            try:
                if eval(check):
                    fails.append(rule['title'])
                else:
                    pass
            except:
                fails.append(rule['title'])
    return fails

# print(check_rules(rules, {"protect_class": "14", "protection_title": "Gebietsverbot", "boundary": "true", "access": "no"}))

In [None]:
allResults = []         # this will be a list of relations and ways which are not part of a relation - i.e. the list we want to check
allRelationWayIds = []  # list of wayIds which are part of a relation (which can be skipped form result.ways here)

for rel in result.relations:
    if 'boundary' in rel.tags:
        fails = check_rules(rules, rel.tags)
        if (len(fails)>0):
            rel.tags['FEHLER'] = fails
        allResults.append(rel)
        for member in rel.members:
            if (type(member) == overpy.RelationWay):
                way = result.get_way(member.ref, resolve_missing=True)
                allRelationWayIds.append(way.id)

for way in result.ways:
    if not way.id in allRelationWayIds and 'boundary' in way.tags:
        fails = check_rules(rules, way.tags)
        if (len(fails)>0):
            way.tags['FEHLER'] = fails
        allResults.append(way)

with open("data/TagErrorsCount.txt", "w") as file:
    file.write(str(len(allResults)))

In [None]:
outFile = "data/SchongebieteTagFehler.geojson"
style = {"stroke": "#FF0000", "stroke-width": 2, "stroke-opacity": 1, "fill": "#FF0000", "fill-opacity": 0.6}
features = []
for wayRel in allResults:
    wayRel.tags.update(style)
    lonLats = []
    multiLonLats = []
    if (type(wayRel) == overpy.Way):
        wayRel.tags['id'] = f"way/{wayRel.id}"
        for node in wayRel.nodes:
            lonLats.append((float(node.lon),float(node.lat)))
    else:
        wayRel.tags['id'] = f"relation/{wayRel.id}"
        if 'type' in wayRel.tags and wayRel.tags['type'] == "multipolygon":
            multiLonLats = o2p.createMultiPoly(result, wayRel)
        else:
            lonLats += o2p.resortWays(result, wayRel)
    if (len(lonLats)>0 or len(multiLonLats)>0):
        if (len(multiLonLats)>0):
            for lonLats in multiLonLats:
                features.append(Feature(geometry=Polygon([lonLats]),properties=wayRel.tags))
        else:
            features.append(Feature(geometry=Polygon([lonLats]),properties=wayRel.tags))
feature_collection = FeatureCollection(features)

with open(outFile, "w") as gFile:
    geojson.dump(feature_collection, gFile)

