In [1]:
import sqlite3
import os
import pandas as pd
from shapely.geometry import LineString
from shapely.geometry.polygon import Polygon
from shapely.geometry import Point
import geopandas as gpd
from shapely import wkt
import folium

from shapely.geometry import MultiPolygon, Polygon
from shapely.wkt import loads
import re

import numpy as np


spatialite_path = "C:/Users/user/Documents/mod_spatialite-5.1.0-win-amd64"
os.environ["PATH"] = spatialite_path + ";" + os.environ["PATH"]

# create a new sqlite data base
conn = sqlite3.connect("BPN_1.sqlite")
conn.enable_load_extension(True)
conn.load_extension("mod_spatialite")

# mod_spatialite (recommended)
conn.execute('SELECT load_extension("mod_spatialite")')

# Create a cursor object to execute SQL queries
cursor = conn.cursor()

# initiate a spatial metadata for the sqlite database 
cursor.execute("SELECT InitSpatialMetadata(1);")



<sqlite3.Cursor at 0x2d55ccfdc00>

In [2]:
# Checking spatialite version 
res = cursor.execute("SELECT spatialite_version()")
print(res.fetchone())

# Checking Sqlite Version
res1 = cursor.execute("SELECT sqlite_version()")
print(res1.fetchone())

('5.1.0',)
('3.45.1',)


In [7]:
# Input coordinate of a parcel data 
persil= [[[740533.66898967,
        302351.947062026],
        [740524.360462776,
        302340.555654623],
        [740529.366394628,
        302337.687403908],
        [740534.690234851,
        302332.098999219],
        [740543.635490223,
        302342.565829852],
        [740533.66898967,
        302351.947062026]]]

# Your initial list
EgLinestring = [[[0, 0],[2, 2], [2, 4],[3, 5],[6,9]], [[2, 0], [3, 0], [0,2]], 
                [[15,20],[25,20],[25,10],[15,10],[15,5]], [[15,20],[25,20],[25,10],[15,10],[15,20]], 
                [[20, 20], [30, 20], [30, 10], [20, 10], [10, 30], [10, 40]], [[0.5, 0],[1, 0]], 
               [[10, 10],[20, 10], [15, 0], [15, 20]]]

EgPolygon= [[[15, 20], [25, 20], [25, 10], [15, 10], [15, 20]], 
            [[10, 20], [20, 20], [20, 10], [10, 10], [10, 20]],
           [[20, 20], [30, 20], [30, 10], [20, 10], [20, 20]], 
            [[20, 20], [30, 20], [30, 10], [20, 10], [10, 30], [10, 40]],
           [[20, 20], [30, 20], [30, 10], [20, 10], [30, 30], [30, 40]], 
            [[40, 20], [20, 20], [50, 10], [40, 10]]]

EgPolygon2= [[[10, 20], [20, 20], [20, 10], [10, 10], [10, 20]], 
            [[18, 30], [30, 30], [30, 15], [18, 15], [18, 30]],
            [[40, 30], [50, 30], [50, 15], [40, 15], [40, 30]],
            [[10, 10], [20,10], [20, 0], [10,0], [10, 10]],
            [[15, 20], [15,10], [0,10], [0,20], [15,20]]]


In [4]:
# The coordinate system here is TM 3 thus the SRID is 32651
# SELECT CreateTopology('topotest', 32651, 0, 0);

class Topology:
    
    # The constructor for the topology class should be the coordinate, topology name, and srid
    def __init__(self, topoName, Coords, SRID):
        self.Coords = Coords
        self.SRID = SRID
        self.topoName = topoName
        
#     method to check duplicate and return the location and value of the duplicate data 
    def checkDuplicate(self):
#         to store the unique value 
        seen_elements = set()

#     accessing the index through enumerate
        for i, sublist in enumerate(self.Coords):
            for j, element in enumerate(sublist):
                if tuple(element) in seen_elements:
                    print(f'Duplicate found at location ({i}, {j}): {element}')
                else:
                    seen_elements.add(tuple(element))
                    
#     method to check duplicate and return the location and value of the duplicate data         
    def remove_duplicates(self):
        result_list = []
        for sublist in self.Coords:
            seen_elements = set()
            unique_sublist = []
            for element in sublist:
                if tuple(element) not in seen_elements:
                    unique_sublist.append(element)
                    seen_elements.add(tuple(element))
            result_list.append(unique_sublist)
        return result_list
                    
                    
    # method to create new topology 
    def createTopo(self): 
        # create a new topology 
        try:
            print(f"Creating topology with name: {self.topoName}")
            cursor.execute(f"SELECT CreateTopology('{self.topoName}', {self.SRID},0 ,0);")
            res1 = cursor.fetchone()
            print(res1)
        # return if there is error 
        except sqlite3.Error as e:
            print("Error:", e)
            
            
            
    #     method to create nodes from input data
    def makeNode(self):
        for i, sublist in enumerate(self.Coords):
            for j, element in enumerate(sublist):
                print(f"SELECT ST_AddIsoNode('{self.topoName}', Null, MakePoint({sublist[j][0]}, {sublist[j][1]}, {self.SRID}));")
#                 cursor.execute(f"SELECT ST_AddIsoNode('{self.topoName}', Null, MakePoint({sublist[j][0]}, {sublist[j][1]}, {self.SRID}));")
        print("Node has been created")
    
    
#     method to create isolated edge that cannot be connected to another edge. Thcerefore, face cannot be created this way
#     def makeEdge(self):
#         edgeId = 1
#         for i in range(0, len(self.Coords)-4, 2):
# #             print(f"SELECT ST_AddIsoEdge('{self.topoName}', {edgeId}, {edgeId+1}, ST_GeomFromText('LINESTRING({self.Coords[i]} {self.Coords[i+1]}, {self.Coords[i+2]} {self.Coords[i+3]})', {self.SRID}));")
#             cursor.execute(f"SELECT ST_AddIsoEdge('{self.topoName}', {edgeId}, {edgeId+1}, ST_GeomFromText('LINESTRING({self.Coords[i]} {self.Coords[i+1]}, {self.Coords[i+2]} {self.Coords[i+3]})', {self.SRID}));")
#             edgeId+=1
# #         print(f"SELECT ST_AddIsoEdge('{self.topoName}', {edgeId}, 1, ST_GeomFromText('LINESTRING({self.Coords[len(self.Coords)-4]} {self.Coords[len(self.Coords)-3]}, {self.Coords[len(self.Coords)-2]} {self.Coords[len(self.Coords)-1]})', {self.SRID}));")
#         cursor.execute(f"SELECT ST_AddIsoEdge('{self.topoName}', {edgeId}, 0, ST_GeomFromText('LINESTRING({self.Coords[len(self.Coords)-4]} {self.Coords[len(self.Coords)-3]}, {self.Coords[len(self.Coords)-2]} {self.Coords[len(self.Coords)-1]})', {self.SRID}));")
#         print("Edge has been created")
    
    
#     method to automatically create edge and face 
    def makeFace(self):
        nodeId = 1
        for i, sublist in enumerate(data):
            for j in range(len(sublist) - 2):  # Adjust loop bounds
                print(f"SELECT ST_AddEdgeNewFaces('{self.topoName}', {nodeId}, {nodeId+1}, ST_GeomFromText('LINESTRING({sublist[j][0]} {sublist[j][1]}, {sublist[j+1][0]} {sublist[j+1][1]})', {self.SRID}));")
                cursor.execute(f"SELECT ST_AddEdgeNewFaces('{self.topoName}', {nodeId}, {nodeId+1}, ST_GeomFromText('LINESTRING({sublist[j][0]} {sublist[j][1]}, {sublist[j+1][0]} {sublist[j+1][1]})', {self.SRID}));")
                nodeId += 1
            print(f"SELECT ST_AddEdgeNewFaces('{self.topoName}', {nodeId}, {nodeId-(len(sublist) - 2)}, ST_GeomFromText('LINESTRING({sublist[j+1][0]} {sublist[j+1][1]}, {sublist[j+2][0]} {sublist[j+2][1]})', {self.SRID}));")
            cursor.execute(f"SELECT ST_AddEdgeNewFaces('{self.topoName}', {nodeId}, {nodeId-(len(sublist) - 2)}, ST_GeomFromText('LINESTRING({sublist[j+1][0]} {sublist[j+1][1]}, {sublist[j+2][0]} {sublist[j+2][1]})', {self.SRID}));")
            print("next")  
        
#     checking the topology using table
    def nodeTopo(self):
        cnx = sqlite3.connect('BPN_Topology.sqlite')
        df = pd.read_sql_query(f"SELECT * FROM {self.topoName}_node", cnx)
        print(df)
    def edgeTopo(self):
        cnx = sqlite3.connect('BPN_Topology.sqlite')
        df2 = pd.read_sql_query(f"SELECT * FROM {self.topoName}_edge", cnx)
        print(df2)
    def edgeTopo(self):
        cnx = sqlite3.connect('BPN_Topology.sqlite')
        df3 = pd.read_sql_query(f"SELECT * FROM {self.topoName}_face", cnx)
        print(df3)
        
    def checkTopo(self, Fetch):
        self.Fetch = Fetch
        cursor.execute(Fetch)
        res = cursor.fetchall()
        print(res)
        return res
    
    def checkTopoNode(self):
        cursor.execute(f"select *, asText(geom) from {self.topoName}_node")
        res = cursor.fetchall()
        print(res)
        return res
        
    def checkTopoEdge(self):
        cursor.execute(f"select *, asText(geom) from {self.topoName}_edge")
        res = cursor.fetchall()
        print(res)
        return res
    
    def checkTopoFace(self):
        cursor.execute(f"select *, asText(geom) from {self.topoName}_face")
        res = cursor.fetchall()
        print(res)
        return res
    

In [None]:
# instanciate the class and methods of Topology Class
# input all the parameters needed into this constructor 
# Data1 = Topology("trial5",persil, 4326)

# create a topology with trial 5 as the name 
# Data1 = Data1.createTopo()

# create node from the data given 
# Data1.makeNode()

# create face and edge add the same time from the created node 
# Data1.makeFace()

# check the node table 
# Data1.nodeTopo()

# check the edge table 
# Data1.nodeTopo()

# check the face table 
# Data1.nodeTopo()

# check the node table 
# Data1.checkTopoNode()

# check the edge table 
# Data1.checkTopoEdge()

# check the face table 
# Data1.checkTopoFace()

conn.commit()



In [50]:
# this is the class for geometry
class Geometry:
###     Phase initialization 
#     constructor for geometry
    def __init__(self, tableName, Coords, SRID):
        self.Coords = Coords
        self.SRID = SRID
        self.tableName = tableName
    
    def initTable(self):
        cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.tableName} (id INTEGER PRIMARY KEY, geotypes VARCHAR, geom BLOB);")
        conn.commit()
        print(f'Table successfully created with name {self.tableName} and SRID {self.SRID}')

###     Phase creating point, line, and polygon
    def convertPoint(self):
        Point = ['Point({} {})'.format(x, y) for sublist in self.Coords for x, y in sublist]
        for x in Point:
            print(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Point', ST_GeomFromText('{x}',{self.SRID}));")
#             cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Point', ST_GeomFromText('{x}',{self.SRID}));")
        conn.commit()
        print(f"Point successfully inserted into {self.tableName}")
        return self

    def convertLinestring(self):
        Line = [LineString(points) for points in self.Coords]
        for x in Line:
            print(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Linestring', ST_GeomFromText('{x}',{self.SRID}));")
            cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Linestring', ST_GeomFromText('{x}',{self.SRID}));")
        conn.commit()
        print(f"LineString successfully inserted into {self.tableName}")
        return self

    def convertPolygon(self):
        Poly = [Polygon(points) for points in self.Coords]
        for x in Poly:
            print(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{x}',{self.SRID}));")
            cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{x}',{self.SRID}));")
        conn.commit()
        print(f"Polygon successfully inserted into {self.tableName}")
        return self
    
###     Custom Query and Fetch        
    def customQuery(self, Query): 
        self.Query = Query
        database = sqlite3.connect('BPN_1.sqlite')
        dataframe = pd.read_sql_query(self.Query, database)
        return dataframe
    
    def customFetchOne(self, Fetch):
        self.Fetch = Fetch
        cursor.execute(self.Fetch)
        res = cursor.fetchone()
        return res[0]
    
    def customFetchAll(self, Fetch):
        self.Fetch = Fetch
        cursor.execute(self.Fetch)
        self.res = cursor.fetchall()
        return self.res


###     Phase geometry analysis 
    #     method to check duplicate and return the location and value of the duplicate data 
    def checkDuplicate(self):
        self.duplicate = []
#       to store the unique value 
        self.seen_elements = set()
#     accessing the index through enumerate
        for i, sublist in enumerate(self.Coords):
            for j, element in enumerate(sublist):
                if tuple(element) in self.seen_elements and tuple(element) != tuple(sublist[len(sublist)-1]):
                    print(f'Duplicate found at location ({i}, {j}): {element}')
                    self.duplicate.append(tuple(element))
                else:
                    self.seen_elements.add(tuple(element))
                    
        self.Lduplicate= len(self.duplicate)
        
        return self.Lduplicate

    def checkCrosses(self, types): 
        self.resCrosses = []
        self.result_geom = []
        self.dataCross = []
        df = self.customQuery(f"select id from {self.tableName} where geotypes='{types}'")
        col_list = df["id"].values.tolist()
        for i in range(len(col_list)):
            for j in range(i + 1, len(col_list)):
                test = self.customFetchOne(f"select ST_Overlaps(r1.geom, r2.geom) as check1 from {self.tableName} r1 cross join {self.tableName} r2 where r1.id = {col_list[i]} and r2.id = {col_list[j]};")
                if test[0] == 1:
#                     print("intersect in polygon id")
#                     print(col_list[i], col_list[j])
                    data_intersect = tuple((col_list[i], col_list[j]))
                    self.dataCross.append(data_intersect)
                    self.resCrosses.append(col_list[i])

        for i in self.resCrosses:      
            dataFetch = self.customFetchOne(f"select id, asText(geom) from {self.tableName} where id={i}")
            self.result_geom.append(dataFetch[1])
#         print("objects that are crossed:")
#         print(self.result_geom)
#         print(self.dataCross)
        self.LcheckCrosses= len(self.dataCross)

        return [self.result_geom, self.dataCross, self.LcheckCrosses]
    

    def checkIsclosed(self):
        dataList = self.customFetchAll(f"SELECT id, ST_IsClosed(geom) as close, asText(geom) from {self.tableName} where geotypes='Linestring'")
        self.resClose = []
        self.resCpoint = []
         # Iterate over each tuple in the list
        for item in dataList:
            # Check if the second value (index 1) is less than 1
            if item[1] == 0:
                # If it is, append the entire tuple to the result list
                self.resClose.append(item)
        print("The line is not closed, possibility for short or over degenerate")
        print(self.resClose)

        # Iterate over each tuple in the list
        for item in self.resClose:
            # Extract the LINESTRING string
            linestring = item[2]

            # Remove the 'LINESTRING(' and ')' parts from the string
            coordinates_part = linestring.replace('LINESTRING(', '').replace(')', '')

            # Split the string by commas and spaces to get coordinates
            coordinates = coordinates_part.split(', ')

            # Take the last coordinate and split it to get x and y
            x, y = map(float, coordinates[-1].split())

            # Create a Point object
            point = Point(x, y)

            # Append the Point object to the list
            self.resCpoint.append(point)
            
        self.LcheckIsclosed= len(self.resCpoint)
        print("The end points of each unclose linestring:")
        print(self.resCpoint)
        return self.resCpoint, self.LcheckIsclosed
            
            
    def checkIsValid(self, geotypes):
        dataList = self.customFetchAll(f"SELECT id, ST_IsValid(geom), ST_IsValidReason(geom), ST_AsText(geom) FROM {self.tableName} where geotypes='{geotypes}';")
        resValid = []
        points = []
        self.return_data = []
        for item in dataList:
            if item[1] == 0:
                resValid.append((item))
                self.return_data.append(tuple((item[0], item[3])))
        print("Polygon that has error")
        print(resValid)

        # Iterate over each tuple in the list
        for item in resValid:
            # Extract the polygon string
            polygon_string = item[3]
            # Remove the 'POLYGON((' and '))' parts from the string
            coordinates_part = polygon_string.replace('POLYGON((', '').replace('))', '')
            # Split the string by commas and spaces to get coordinates
            coordinates = coordinates_part.split(', ')
            # Take the last coordinate and split it to get x and y
            x, y = map(float, coordinates[-1].split())
            # Create a Point object
            point = Point(x, y)
            # Append the Point object to the list
            points.append(point)
        
        self.LcheckIscloseds= len(self.return_data)
        print("The end points of each unclose polygon:")
        print(points)
        return self.return_data, self.LcheckIscloseds
    
    def checkShortlength(self):
        dataList = self.customFetchAll(f"SELECT id, ST_Length(geom, {self.SRID}) as length, asText(geom) from {self.tableName} where geotypes='Linestring'")    
        # Initialize an empty list to store the tuples
        self.resLength = []
        # Iterate over each tuple in the list
        for item in dataList:
            # Check if the second value (index 1) is less than 1
            if item[1] < 1:
                # If it is, append the entire tuple to the result list
                self.resLength.append(item)
                
        self.LcheckShortlength = lef(self.resLength)
        print("List of Linestring that is below 1 meter")
        print(self.resLength)
        return self.resLength, self.LcheckShortlength
    
    def checkShortArea(self):
        dataList = self.customFetchAll(f"SELECT id, ST_Area(geom, {self.SRID}) as area, asText(geom) from {self.tableName} where geotypes='Polygon'")    
        print(dataList)
        # Initialize an empty list to store the tuples
        self.resArea = []
        # Iterate over each tuple in the list
        for item in dataList:
            # Check if the second value (index 1) is less than 1
            if item[1] < 10:
                # If it is, append the entire tuple to the result list
                self.resArea.append(item)
        self.LcheckShortArea = lef(self.resArea)
        print("List of Polygon that is below 1 sqm")
        print(self.resArea)
        return self.resArea
    
    def checkCluster(self):
        cluster_data = self.customFetchAll(f"SELECT a.id AS point_id, b.id AS cluster_id, ST_Distance(a.geom, b.geom) AS distance FROM {self.tableName} AS a, {self.tableName} AS b WHERE a.id != b.id AND ST_Distance(a.geom, b.geom) < 1 AND ST_Distance(a.geom, b.geom) > 0.1")
        # Create a set to store unique tuples
        unique_tuples = set()

        # Iterate over the input data
        for tuple_value in cluster_data:
            # Check if the tuple meets the condition
            if (tuple_value[0], tuple_value[1], tuple_value[2]) not in unique_tuples and \
               (tuple_value[1], tuple_value[0], tuple_value[2]) not in unique_tuples:
                # Add the tuple to the set
                unique_tuples.add((tuple_value[0], tuple_value[1], tuple_value[2]))

        # Convert the set back to a list
        self.unique_data = list(unique_tuples)
        
        print("Object in a cluster within 1m: ", self.unique_data)
        self.LcheckCluster = lef(sself.unique_data)
        return self.unique_data
    
    
    # Function to calculate the angle between three points
    def calculate_angle(self, p1, p2, p3):
        # Create vectors
        v1 = (p1[0] - p2[0], p1[1] - p2[1])
        v2 = (p3[0] - p2[0], p3[1] - p2[1])
        # Calculate dot product and magnitudes
        dot_product = np.dot(v1, v2)
        magnitude_v1 = np.linalg.norm(v1)
        magnitude_v2 = np.linalg.norm(v2)

        # Check if magnitudes are zero to avoid division by zero
        if magnitude_v1 == 0 or magnitude_v2 == 0:
            return np.nan  # Return NaN or some other value indicating undefined angle

        # Calculate the angle
        cos_angle = dot_product / (magnitude_v1 * magnitude_v2)
        # Avoid numerical issues due to floating-point arithmetic
        cos_angle = np.clip(cos_angle, -1.0, 1.0)
        angle = np.arccos(cos_angle)
        return np.degrees(angle)


    def parse_wkt(self, wkt_polygon):

        # Regular expression to extract coordinates from WKT polygon
        coord_pattern = re.compile(r'(\d+(\.\d+)?)\s+(\d+(\.\d+)?)')

        # Find all matches of the coordinate pattern in the WKT polygon
        matches = coord_pattern.findall(wkt_polygon)

        # Convert each match to a tuple of numerical values (x, y)
        points = [(float(x), float(y)) for x, _, y, _ in matches]

        return points

    def pseudo_node(self, geotypes):
        selected_data = self.customFetchAll(f"select id, AsText(geom) from {self.tableName} where geotypes='{geotypes}'")
        # Assuming you have a table "polygons" with a geometry column "geom"
        polygon_pseudo = []
        pseudo_nodes = []

        # Fetch the WKT representation of the polygon
        for wkt in selected_data: 
            # # Assuming you have a function to parse WKT and extract points
            # # This needs to be implemented based on your specific WKT format
            points = self.parse_wkt(wkt[1])  # Implement this function to extract points from WKT

            pseudo_node_count = 0
            # Adjust the angle threshold to detect angles significantly less than 180 degrees
            angle_threshold = 10  # Degrees

            # Initialize a list to store the indices of pseudo nodes
            
            

            # Calculate angles and identify pseudo nodes
            for i in range(len(points)):
                p1 = points[i - 1]  # Previous point
                p2 = points[i]      # Current point
                p3 = points[(i + 1) % len(points)]  # Next point, wrapping around using modulo

                angle = self.calculate_angle(p1, p2, p3)
                # Look for angles significantly less than 180 degrees
#                 print(angle)
                if angle < angle_threshold or angle == 180:
                    pseudo_nodes.append(i)
                    polygon_pseudo.append(wkt)

            # The count of pseudo nodes is the length of the pseudo_nodes list

            print("Indices of pseudo nodes:", pseudo_nodes)
            print("Number of pseudo nodes:", pseudo_node_count)
        
        self.Lpseudo_node = len(pseudo_nodes)
        return [self.Lpseudo_node, polygon_pseudo]

    
## Phase: Cleaning
    
    def removeShortArea(self,data):
        for x in data:
            cursor.execute(f"delete from {self.tableName} where id={x[0]}")
        conn.commit()
        print("The polygon that below 10sqm has been deleted")
        
    def removeShortLength(self,data):
        for x in data:
            cursor.execute(f"delete from {self.tableName} where id={x[0]}")
        conn.commit()
        print("The polygon that below 10sqm has been deleted")
    
    def splitPolygons(self,data):
        for r1_id, r2_id in data:
            split_data = self.customFetchOne(f"SELECT ST_AsText(ST_Intersection(r1.geom, r2.geom)) FROM {self.tableName} r1 CROSS JOIN {self.tableName} r2 WHERE r1.id = {r1_id} AND r2.id = {r2_id};")
            remain_data1 = self.customFetchOne(f"select ST_AsText(ST_Difference(r2.geom, r1.geom)) from {self.tableName} r1 cross join {self.tableName} r2 where r1.id = {r1_id} and r2.id = {r2_id};")
            remain_data2 = self.customFetchOne(f"select ST_AsText(ST_Difference(r1.geom, r2.geom)) from {self.tableName} r1 cross join {self.tableName} r2 where r1.id = {r1_id} and r2.id = {r2_id};")
    
#             insert into the table
            cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{split_data[0]}', {self.SRID}));")
            cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{remain_data1[0]}', {self.SRID}));")
            cursor.execute(f"INSERT INTO {self.tableName} (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{remain_data2[0]}', {self.SRID}));") 
        conn.commit()
        print("Polygon successfully splitted") 
        return self
        
    def sanitizeGeom(self, data):
        for identity in data:
            sanitize_data = self.customFetchOne(f"select ST_AsText(SanitizeGeometry(geom)) from {self.tableName} where id={identity}")
    #         new_geom = self.customFetchOne(f"INSERT INTO spatial_data (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{sanitize_data}',4326));")
            self.customFetchOne(f"UPDATE {self.tableName} SET geom=ST_GeomFromText('{sanitize_data}') WHERE id={identity}")
        
    def closingRing(self,data):
        for identity in data:
            closing_data = self.customFetchOne(f"select ST_AsText(EnsureClosedRings(geom)) from {self.tableName} where id={identity}")
    #         self.customFetchOne(f"INSERT INTO spatial_data (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('{closing_data}',4326));")
            self.customFetchOne(f"UPDATE {self.tableName} SET geom=ST_GeomFromText('{closing_data}') WHERE id={identity}")
    
    def repeatPoints(self):
        df_id = self.customQuery(f"select id from {self.tableName}")
        l_id = df_id['id'].tolist()
        for item in l_id:
            repeated = self.customFetchOne(f"select ST_AsText(RemoveRepeatedPoints(geom)) from {self.tableName} where id={item}")
            self.customFetchOne(f"UPDATE {self.tableName} SET geom=ST_GeomFromText('{repeated}') WHERE id={item}")
        print("all the duplicate point has been removed") 
        
    def parseMulti2Poly(self, Data):
        # Define the multipolygon string
        multipolygon_str = str(Data)
        # Parse the multipolygon string into a Shapely geometry object
        multipolygon = loads(multipolygon_str)
        # Get the first polygon from the multipolygon
        first_polygon = list(multipolygon.geoms)[0]  # Access the first geometry object within the MultiPolygon
        return first_polygon

    def selfIntersect(self):
        short_degenarate = []
        self_int = []
        all_poly = self.customFetchAll("select id, ST_AsText(EnsureClosedRings(geom)) as CloseGeom from spatial_data where geotypes='Polygon'")
        for x in all_poly:
            C_short_inter = self.customFetchOne(f"SELECT ST_AsText(ST_IsValidDetail(ST_GeomFromText('{x[1]}'))) as selfinter;")
            if C_short_inter is None:
                short_degenarate.append(x)
            else: 
                self_int.append(x)
        ## this section repair the self intersect polygon 
        for y in self_int:
            closing_data = self.customFetchOne(f"select ST_AsText(EnsureClosedRings(ST_GeomFromText('{y[1]}'))) as closeRing")
            makevalid_Test = self.customFetchOne(f"SELECT ST_AsText(ST_MakeValid(ST_GeomFromText('{closing_data}'))) AS split_result")
            one_poly = parseMulti2Poly(makevalid_Test)
            cursor.execute(f"UPDATE spatial_data SET geom=ST_GeomFromText('{one_poly}',4326) WHERE id={y[0]}")

        ## this section repair the short degenerate
        for z in short_degenarate:
            sanitize_data = self.customFetchOne(f"select ST_AsText(SanitizeGeometry(ST_GeomFromText('{z[1]}'))) as sanitizeRing")
    #         update data
            cursor.execute(f"UPDATE spatial_data SET geom=ST_GeomFromText('{sanitize_data}',4326) WHERE id={z[0]}")

        conn.commit()
        print("all the short degenerate and self intersect has been solved")
    
    def cleanPseudoNode(self, geotypes):
        selecteddata = self.customFetchAll(f"select id from test_data where geotypes= '{geotypes}'")
        for i in selecteddata:
            fix_data = self.customFetchOne(f"SELECT AsText(ST_SimplifyPreserveTopology(geom, 0.5)) FROM test_data where id={i[0]}")
            cursor.execute(f"update test_data set geom = ST_GeomFromText('{fix_data}',4326) where id={i[0]};")
        conn.commit()
#             print(fix_data)
        print("all data which contain pseudo node has been fixed")


###     Phase checking result and delete
    def inspectTable(self, data):
        self.data = data
        print(self.data)
        
    def deleteTable(self):
        cursor.execute(f"DROP TABLE IF EXISTS {self.tableName}")
        conn.commit()
        print(f'{self.tableName} table successfully delete')
        return self
    
    def wkt2map(self, resultQuery):
#         data = self.customFetchAll(f"select id, geotypes, ST_AsText(geom) as geometry from {self.tableName} where geotypes='{geotypes}'")
        data = resultQuery
        # Fix WKT strings to ensure polygons are properly closed
        data = [(id, typ, wkt_string.rstrip(')') + ', {})'.format(wkt_string.split('((')[-1])) for id, typ, wkt_string in data]
        # Parse WKT strings and create Shapely geometry objects
        geometries = [wkt.loads(wkt_string) for _, _, wkt_string in data]
        # Create GeoDataFrame
        gdf = gpd.GeoDataFrame(data, columns=['ID', 'Type', 'WKT'], geometry=geometries, crs='EPSG:4326')
        # Create a Folium map centered at a specific location
        m = folium.Map(location=[15, 20], zoom_start=4)
        # Add GeoDataFrame to the map
        folium.GeoJson(gdf).add_to(m)
        # Display the map
        return m
    
    

In [51]:
BPNGeom_Poly = Geometry("BPN",EgPolygon2,"4326")
BPNGeom_Poly.checkDuplicate()

# EgPolygon5= [[[10, 20], [20, 20], [20, 10], [10, 10], [10, 20]], 
#             [[18, 30], [30, 30], [30, 15], [18, 15], [18, 30]],
#             [[40, 30], [50, 30], [50, 15], [40, 15], [40, 30]],
#             [[10, 10], [20,10], [20, 0], [10,0], [10, 10]],
#             [[15, 20], [15,10], [0,10], [0,20], [15,20]]]

# print(len(EgPolygon5)-1)

# if (10, 20) == (10, 20):
#     print("good")


Duplicate found at location (3, 1): [20, 10]


1

In [16]:
# instanciate the class and methods of geometry  Class

### first instanciate the geometry class by filling up the constructor
## the sequence is name of the table, coordinate, and the SRID 
# BPNGeom_line = Geometry("BPN",EgLinestring,"4326")
BPNGeom_Poly = Geometry("BPN",EgPolygon2,"4326")

## initiate the table 
# BPNGeom_line = BPNGeom_line.initTable()
# BPNGeom_Poly = BPNGeom_Poly.initTable()
 
## delete the current table 
# BPNGeom_line.deleteTable()
# BPNGeom_Poly.deleteTable()

## convert the coordinate into Point and insert the each line into the database
# BPNPoint = BPNGeom_line.convertPoint()

## convert the coordinate into linestring and insert the each line into the database
# BPNLine = BPNGeom_line.convertLinestring()

## convert the coordinate into Polygon and insert the each line into the database
# it only applied when all the coordinate inside the list is morethan 4 
# BPNPoly = BPNGeom_Poly.convertPolygon()

### Analysis part 

## these analysis are merely for linestring
## to check if there is any duplicate data point
# BPNGeom_line.checkDuplicate()

## to check the minimum length
# BPNGeom_line.checkShortlength()
## check if the linestring is not closed
# BPNGeom_line.checkIsclosed()
## to check if there is intersection between linestring 
# BPNGeom_line.checkCrosses("Linestring")

### these analysis are merely for Polygon
## to check if there is any duplicate data point
BPNGeom_Poly.checkDuplicate()

## to check the mininmum area
# BPNGeom_Poly.checkShortArea()

## to check whether the polygon close properly or not (if short degenerate or intersect or over degenerate) will return the last point 
# BPNGeom_Poly.checkIsValid("Polygon")

## to check if there is intersection between polygon 
# resultCrosses = BPNGeom_Poly.checkCrosses("Polygon")
# resultCrosses[1]

## to check if there is pseudo node
# BPNGeom_Poly.pseudo_node('Polygon')

## to check if there is object in a cluster within 1 meter 
# BPNGeom_Poly.checkCluster()


### these part are for analysis implementation(cleaning)
## to split intersect polygon
# BPNGeom_Poly.splitPolygons(resultCrosses[1])

## to resolve short degenerate and self intersect 
# BPNGeom_Poly.selfIntersect()

## to remove duplicate point 
# BPNGeom_Poly.repeatPoints()

## to clean pseudonode
# BPNGeom_Poly.cleanPseudoNode("Polygon")

## to clean the polygon that has area below 1sqm
# BPNGeom_Poly.removeShortArea

## to clean the LineString that has length below 1m
# BPNGeom_line.removeShortLength


# # inspect.table and visualization
# BPNGeom_line.inspectTable(BPNGeom_line.customQuery(f"SELECT * from BPN"))


# BPNGeom_Poly.inspectTable(BPNGeom_Poly.customQuery(f"SELECT * from BPN"))

data = BPNGeom_Poly.customFetchAll("select id, geotypes, ST_AsText(geom) as geometry from BPN where geotypes='Polygon'")
print(data)

# BPNGeom_Poly.wkt2map(data)

# data =[(1, 2), (1, 5)]
# for r1_id, r2_id in data:
#     print(r1_id, r2_id)

# cursor.execute("INSERT INTO BPN (geotypes, geom) VALUES ('Polygon', ST_GeomFromText('POLYGON((0 10, 0 20, 10 20, 10 10, 0 10))', 4326));")
# conn.commit()


Duplicate found at location (0, 4): [10, 20]
Duplicate found at location (1, 4): [18, 30]
Duplicate found at location (2, 4): [40, 30]
Duplicate found at location (3, 0): [10, 10]
Duplicate found at location (3, 1): [20, 10]
Duplicate found at location (3, 4): [10, 10]
Duplicate found at location (4, 4): [15, 20]
[(1, 'Polygon', 'POLYGON((10 20, 20 20, 20 10, 10 10, 10 20))'), (2, 'Polygon', 'POLYGON((18 30, 30 30, 30 15, 18 15, 18 30))'), (3, 'Polygon', 'POLYGON((40 30, 50 30, 50 15, 40 15, 40 30))'), (4, 'Polygon', 'POLYGON((10 10, 20 10, 20 0, 10 0, 10 10))'), (5, 'Polygon', 'POLYGON((15 20, 15 10, 0 10, 0 20, 15 20))')]
