Filter, Extract, Transform Polygon Shapes into GeoJson Standard - Output: GeoJson FeatureCollection

In [None]:
import osmnx as ox
import geopandas as gpd
import json
from shapely.geometry import MultiPolygon, Polygon

# Set the default CRS to EPSG:25832 for consistent spatial reference
ox.settings.default_crs = "epsg:25832"

# Initialize a GeoDataFrame ('gdf') and a list ('mongo_data_list') for data storage
gdf = gpd.GeoDataFrame()
mongo_data_list = []

def create_postal_code_gdf(place_name, tags):
    global gdf, mongo_data_list  # Utilize global GeoDataFrame and list

    # Fetch place polygons using OpenStreetMap data
    place_polygons = ox.features_from_place(place_name, tags)

    # Filter for valid geometries: Polygons and MultiPolygons
    polygon_mask = place_polygons['geometry'].apply(lambda geom: isinstance(geom, (Polygon, MultiPolygon)))

    # Create a GeoDataFrame containing only rows with valid geometries and postal codes
    gdf = gpd.GeoDataFrame(place_polygons[polygon_mask].dropna(subset=['postal_code', 'geometry']))

    # Keep only the necessary columns
    columns_to_keep = ['geometry', 'postal_code']
    gdf = gdf[columns_to_keep]

    # List to store dictionaries with postal codes and GeoJSON geometries
    for idx, row in gdf.iterrows():
        postal_code = row['postal_code']
        geometry = row['geometry']

        # Convert Shapely geometry to GeoJSON format
        geojson_geometry = json.loads(gpd.GeoSeries(geometry).to_json())['features'][0]['geometry']

        # Create a dictionary with postal code and GeoJSON geometry
        postal_code_data = {
            "type": "Feature",
            "properties": {
                "postal_code": postal_code
            },
            "geometry": geojson_geometry
        }

        # Add the dictionary to the list
        mongo_data_list.append(postal_code_data)

# Example usage of the function
place_name = "Germany"
tags = {"boundary": "postal_code"}

# Call the function to populate the GeoDataFrame and the list
create_postal_code_gdf(place_name, tags)

# GeoDataFrame 'gdf' now contains valid geometries and associated postal codes
# List 'mongo_data_list' holds dictionaries with postal codes and GeoJSON geometries

# Save GeoJSON data to a file for further use
with open(f'{place_name}_postal_codes.geojson', 'w') as file:
    json.dump({"type": "FeatureCollection", "features": mongo_data_list}, file, indent=2)

# Plot the GeoDataFrame for visualization
gdf.plot()

# Print the length of the list 'mongo_data_list'
print(len(mongo_data_list))


Connect to MongoDB Local, Integrate Data in Chunks

In [None]:
from pymongo import MongoClient, GEO2D
import json

class MongoDBConnector:
    def __init__(self, database_name, collection_name, host='localhost', port=27017):
        # Initialize MongoDB connection parameters
        self.client = MongoClient(host, port)
        self.db = self.client[database_name]
        self.collection = self.db[collection_name]

    def connect_to_local_mongodb(self):
        # Connect to local MongoDB and return the client and collection instances
        print("Connected to local MongoDB")
        return self.client, self.collection

    def insert_geojson_data(self, geojson_file_path, chunk_size=1000):
        try:
            # Read GeoJSON file
            with open(geojson_file_path, 'r') as file:
                geojson_data = json.load(file)

            # Extract features from GeoJSON
            features = geojson_data.get('features', [])

            # Insert data into MongoDB in chunks
            for i in range(0, len(features), chunk_size):
                chunk = features[i:i + chunk_size]
                self.collection.insert_many(chunk)
                print(f"Inserted {len(chunk)} features into local MongoDB collection '{self.collection.name}'.")

        except Exception as e:
            print(f"Error inserting data into MongoDB: {e}")

    def create_2d_spatial_index(self):
        # Create a 2D spatial index on the "geometry" field
        self.collection.create_index([("geometry", "2dsphere")])
        print("Created 2D spatial index on the 'geometry' field.")

    def close_connection(self):
        try:
            # Close MongoDB connection
            self.client.close()
            print("Closed MongoDB connection.")
        except Exception as e:
            print(f"Error closing MongoDB connection: {e}")

# Example usage of the class
if __name__ == "__main__":
    # User input for the database and collection names
    database_name = "ZIP_Poly_DB_GeoJson_Test"
    collection_name = "ZIP_Poly_Collection_GeoJson_Test"

    # Create an instance of MongoDBConnector
    mongo_connector = MongoDBConnector(database_name, collection_name)

    # Connect to MongoDB
    mongo_client, mongo_collection = mongo_connector.connect_to_local_mongodb()

    # Example GeoJSON file path
    geojson_file_path = 'Germany_postal_codes.geojson'  # Update the file path accordingly

    # Insert GeoJSON data into MongoDB
    mongo_connector.insert_geojson_data(geojson_file_path, chunk_size=1000)

    # Create a 2D spatial index on the "geometry" field
    mongo_connector.create_2d_spatial_index()

    # Close MongoDB connection when done
    mongo_connector.close_connection()


---

Poly Functions


Connect to MongoDB, get Polygon Shape & Data - Input: Postal Code - Output: Polygon Shape & Data

In [None]:
import osmnx as ox
import geopandas as gpd
from pymongo import MongoClient
from shapely.geometry import shape, Polygon, MultiPolygon
import matplotlib.pyplot as plt

# Set the default CRS to EPSG:25832
ox.settings.default_crs = "epsg:25832"

def connect_to_mongodb(database_name, collection_name):
    """
    Connects to MongoDB and returns the MongoDB client and collection.
    """
    client = MongoClient('localhost', 27017)
    db = client[database_name]
    collection = db[collection_name]
    return client, collection

def retrieve_polygon_by_postal_code(collection, postal_code):
    """
    Retrieves the polygon for a given postal code from MongoDB using a geospatial query.
    """
    document = collection.find_one({"properties.postal_code": postal_code})
    
    if document:
        coordinates = document.get('geometry', {}).get('coordinates')[0]
        
        # Check if coordinates are nested (indicating a polygon with holes)
        if isinstance(coordinates[0][0], list):
            # Create a MultiPolygon
            return MultiPolygon([Polygon(part) for part in coordinates])
        else:
            # Create a Polygon
            return Polygon(coordinates)
    else:
        print(f"No polygon found for postal code: {postal_code}")
        return None

def plot_polygon(polygon):
    """
    Plots the given polygon.
    """
    if polygon:
        gdf = gpd.GeoDataFrame(geometry=[polygon])
        gdf.plot()
        plt.title("Postal Code Polygon")
        plt.xlabel("Longitude")
        plt.ylabel("Latitude")
        plt.show()

if __name__ == "__main__":
    # MongoDB configuration
    database_name = "ZIP_Poly_DB_GeoJson_Test"
    collection_name = "ZIP_Poly_Collection_GeoJson_Test"

    # Connect to MongoDB
    mongo_client, mongo_collection = connect_to_mongodb(database_name, collection_name)

    # Input the postal code you want to query
    input_postal_code = input("Enter a postal code: ")

    # Retrieve the polygon for the specified postal code
    selected_polygon = retrieve_polygon_by_postal_code(mongo_collection, input_postal_code)

    # Plot the retrieved polygon
    plot_polygon(selected_polygon)

    # Close MongoDB connection when done
    mongo_client.close()


---

Test Polygon Data: https://geojson.io/#map=13.14/51.31281/9.48916

---

Connect to MongoDB, use Coordinate and find nearest Postal Code - Input: Latitude, Longitude - Output: Postal Code, Polygon Shape & Data

In [None]:
import osmnx as ox
import geopandas as gpd
import json
from shapely.geometry import Point
import matplotlib.pyplot as plt
from pymongo import MongoClient

# Set the default CRS to EPSG:25832
ox.settings.default_crs = "epsg:25832"

def connect_to_mongodb(database_name, collection_name):
    """
    Connects to MongoDB and returns the MongoDB client and collection.
    """
    client = MongoClient('localhost', 27017)
    db = client[database_name]
    collection = db[collection_name]
    return client, collection

def retrieve_data_from_mongodb(collection, point):
    """
    Retrieves GeoJSON data from MongoDB collection using a geospatial query based on the specified point.
    """
    query = {
        "geometry": {
            "$geoIntersects": {
                "$geometry": {
                    "type": "Point",
                    "coordinates": [point.x, point.y]
                }
            }
        }
    }

    cursor = collection.find(query)
    geojson_data = {"type": "FeatureCollection", "features": list(cursor)}

    return geojson_data

def coordinates_to_geometry(coordinates):
    """
    Converts coordinates to a Shapely Polygon or MultiPolygon.
    """
    # Check if coordinates are nested (indicating a polygon with holes)
    if isinstance(coordinates[0][0], list):
        # Create a MultiPolygon
        return MultiPolygon([Polygon(part) for part in coordinates])
    else:
        # Create a Polygon
        return Polygon(coordinates)

def plot_polygon(polygon):
    """
    Plots a Shapely Polygon.
    """
    gdf = gpd.GeoDataFrame(geometry=[polygon])
    gdf.plot()
    plt.title("Postal Code Polygon")
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")
    plt.show()

if __name__ == "__main__":
    # MongoDB configuration
    database_name = "ZIP_Poly_DB_GeoJson_Test"
    collection_name = "ZIP_Poly_Collection_GeoJson_Test"

    # Input latitude and longitude for the point
    latitude = float(51.316669)
    longitude = float(9.500000)

    # Create a Point object for the specified latitude and longitude
    point = Point(longitude, latitude)

    # Connect to MongoDB
    mongo_client, mongo_collection = connect_to_mongodb(database_name, collection_name)

    # Retrieve GeoJSON data from MongoDB based on the specified point
    geojson_data_from_mongo = retrieve_data_from_mongodb(mongo_collection, point)

    # Plot the retrieved polygon
    if geojson_data_from_mongo['features']:
        selected_polygon = coordinates_to_geometry(geojson_data_from_mongo['features'][0]['geometry']['coordinates'][0])
        plot_polygon(selected_polygon)
        print(f"The postal code for the point ({latitude}, {longitude}) is: {geojson_data_from_mongo['features'][0]['properties']['postal_code']}")
        # Access the selected polygon as needed
        print("Selected Polygon Information:")
        print(selected_polygon)
    else:
        print(f"No postal code found for the point ({latitude}, {longitude}).")

    # Close MongoDB connection when done
    mongo_client.close()
