In [None]:
# Libraries
import requests
import json
import geopandas as gpd
import pandas as pd

In [None]:
#====================================================================================================================================
# Dictionnary containing the name of the layer, the geometry keyword (geom  or the_geom) and the column containing the info we need
#====================================================================================================================================

layers_dictionnary = {
          "ZNIEFF1": {"layer_name":"patrinat_znieff1:znieff1",
                      "geometry_key":"geom",
                      "column1":"nom_site"},
          "ZNIEFF1 Mer":{"layer_name":"patrinat_znieff1_mer:znieff1_mer",
                         "geometry_key":"geom", 
                         "column1":"nom_site"},
          "ZNIEFF2": {"layer_name":"patrinat_znieff2:znieff2", 
                       "geometry_key":"geom", 
                       "column1":"nom_site"},
          "ZNIEFF2 Mer": {"layer_name":"patrinat_znieff2_mer:znieff2_mer", 
                           "geometry_key":"geom", 
                           "column1":"nom_site"},
          "Natura2000 - ZPS" : {"layer_name":"PROTECTEDAREAS.ZPS:zps", 
                                "geometry_key":"geom", 
                                "column1":"sitename"},
          "Natura2000 - SIC" : {"layer_name":"PROTECTEDAREAS.SIC:sic", 
                                "geometry_key":"geom", 
                                "column1":"sitename"},
          "Parcs nationaux": {"layer_name":"PROTECTEDAREAS.PN:pn", 
                              "geometry_key":"geom", 
                              "column1":"nom"},
          "Parcs naturels regionaux":{"layer_name":"PROTECTEDAREAS.PNR:pnr", 
                                      "geometry_key":"geom", 
                                      "column1":"nom"},
          "Parcs naturels marins":{"layer_name":"PROTECTEDAREAS.PNM:pnm", 
                                   "geometry_key":"geom",
                                   "column1":"nom"},
          "Zonage urbain": {"layer_name":"wfs_du:zone_urba",
                            "geometry_key":"the_geom",
                            "column1":"libelle"},
          "Patrimoine": {"layer_name":"wfs_sup:assiette_sup_s",  # Monuments historiques, Protection aux abords, Site patrimonial remarquable, sites inscrits et classés. ZPPA?
                         "geometry_key":"the_geom", 
                         "column1":"nomass"}#"typeass", 
          }


In [None]:
#=============================================================================================================
# Creating a function for the xml filter. The filter to be applied changes depending on the type of geometry
#=============================================================================================================
def xml_filter_function(study_area, layer_geometry):
    #study_area_geometry = study_area.geom_type.unique()[0] # Return the type of geometry (point/multipoint, polygon/multi-polygon, line/multiline) to selecct the appropriate filter
    
    study_area_geometry = study_area.geometry.geom_type
    
    # Filter for point geometry
    if study_area_geometry == "Point":
        point = study_area.geometry
        coordinates = list(point.coords)
        swapped_coords = [(lat, lon) for lon, lat in coordinates]
        poslist = " ".join(f"{lat} {lon}" for lat, lon in swapped_coords)

        xml_filter = f"""<fes:Filter xmlns:fes="http://www.opengis.net/fes/2.0"
                                xmlns:gml="http://www.opengis.net/gml/3.2">
        <fes:Intersects>
            <fes:ValueReference>{layer_geometry}</fes:ValueReference>
            <gml:Point srsName="urn:ogc:def:crs:EPSG::4326">
            <gml:pos>{poslist}</gml:pos>
            </gml:Point>
        </fes:Intersects>
        </fes:Filter>"""

        return xml_filter
    
    # Filter for polygon geometry
    if study_area_geometry == "Polygon":
        polygon = study_area.geometry
        coordinates = list(polygon.exterior.coords)
        swapped_coords = [(lat, lon) for lon, lat in coordinates]
        poslist = " ".join(f"{lat} {lon}" for lat, lon in swapped_coords)

        xml_filter = f"""<fes:Filter xmlns:fes="http://www.opengis.net/fes/2.0"
                    xmlns:gml="http://www.opengis.net/gml/3.2">
            <fes:Intersects>
                <fes:ValueReference>{layer_geometry}</fes:ValueReference>
                <gml:Polygon srsName="urn:ogc:def:crs:EPSG::4326">
                    <gml:exterior>
                        <gml:LinearRing>
                            <gml:posList>{poslist}</gml:posList>
                        </gml:LinearRing>
                    </gml:exterior>
                </gml:Polygon>
            </fes:Intersects>
        </fes:Filter>"""

        return xml_filter


    # Filter for line geometry
    if study_area_geometry == "LineString":
        line = study_area.geometry
        coordinates = list(line.coords)
        swapped_coords = [(lat, lon) for lon, lat in coordinates]
        poslist = " ".join(f"{lat} {lon}" for lat, lon in swapped_coords)

        xml_filter = f"""<fes:Filter xmlns:fes="http://www.opengis.net/fes/2.0"
                    xmlns:gml="http://www.opengis.net/gml/3.2">
        <fes:Intersects>
            <fes:ValueReference>{layer_geometry}</fes:ValueReference>
            <gml:LineString srsName="urn:ogc:def:crs:EPSG::4326">
            <gml:posList>
                {poslist}
            </gml:posList>
            </gml:LineString>
        </fes:Intersects>
        </fes:Filter>"""

        return xml_filter

    

In [None]:
#================================================================
# Creating a function to construct the body of the post request
#=================================================================

def post_data_function(layer_name, filter_xml):
    body = f"""<?xml version="1.0" encoding="UTF-8"?>
<wfs:GetFeature service="WFS" version="2.0.0"
    xmlns:wfs="http://www.opengis.net/wfs/2.0"
    xmlns:fes="http://www.opengis.net/fes/2.0"
    xmlns:gml="http://www.opengis.net/gml/3.2"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.opengis.net/wfs/2.0
                        http://schemas.opengis.net/wfs/2.0/wfs.xsd">
  <wfs:Query typeNames="{layer_name}" srsName="EPSG:4326">
    {filter_xml}
  </wfs:Query>
</wfs:GetFeature>
"""
    return body

In [None]:
#============================
# Running the diagnostic
#===========================

wfs_url = "https://data.geopf.fr/wfs/ows"  # "?" is not needed at the end of the url for POST request

# Importing the shapefile
input_file = "C:/Users/ilung/Documents/Portfolio/diagnostic_tool/test_area/line_study_area.shp"
study_area = gpd.read_file(input_file)

# Checking if the study area is in WGS84. If not, a conversion is run
if study_area.crs.to_epsg() != 4326: 
    study_area = study_area.to_crs(epsg=4326)

# If there are features in the shapefile that have more than one entity, they will be exploded
study_area_exploded = study_area.explode(index_parts=True, ignore_index=True)

# The analysis will be done for each feature of the shapefile
analysis_results = {layer: [] for layer in layers_dictionnary} # The dictionnary where results will be stored
num_features = len(study_area_exploded)

for feature in range(0, num_features):

    study_area_final = study_area_exploded.iloc[feature]
    area_result = []

    for layer in layers_dictionnary:
        layer_name = layers_dictionnary[layer]["layer_name"]
        layer_geometry = layers_dictionnary[layer]["geometry_key"]
        information1 = layers_dictionnary[layer]["column1"]

        # Building and sending the POST request
        filter_xml = xml_filter_function(study_area_final, layer_geometry) # Get the OGC filter XML fragment
        headers = {"Content-Type": "text/xml"} # Preparing the headers
        data = post_data_function(layer_name, filter_xml) # Preparing the data part of the request
        params={"outputFormat": "application/json"}

        response = requests.post(wfs_url, headers=headers, data=data, params=params)  # Sending the POST request

        # Retrieving the results

        try:
            if response.status_code == 200:
                data_df = gpd.read_file(response.content)  # geopandas can read GeoJSON directly from bytes
                if information1 in data_df.columns:
                    analysis_results[layer].extend(data_df[information1].tolist())
                else:
                    analysis_results[layer].append("-")

            else:
                analysis_results[layer].append(f"Error {response.status_code}")
                print("Server error:", response.text[:200])

        except Exception as e:
            analysis_results[layer].append(f"Failed: {e}")

    print(f"Feature {feature + 1} of {num_features} completed")


In [None]:
#=================================================
# Exporting the result into a appropriate format
#=================================================

# Creating a dataframe
df = pd.DataFrame(list(analysis_results.items()), columns=["Perimètre", "Valeur"])
df.to_excel("results.xlsx", index=False)
print ("Export completed")