In [None]:
import geopandas as gpd
from shapely.geometry import Point, Polygon
import os
import xml.etree.ElementTree as ET

def osm_to_shp(file_path, output_directory):
    print(f"正在处理文件: {file_path}")
    with open(file_path, 'r', encoding='utf-8') as f:
        data = f.read()

    root = ET.fromstring(data)
    nodes = {node.get('id'): (float(node.get('lon')), float(node.get('lat'))) for node in root.findall('node')}

    geometries = []
    attributes = []

    if root.findall('way'):
        # 处理多边形
        print("检测到多边形数据，开始处理...")
        for way in root.findall('way'):
            coords = [nodes.get(nd.get('ref')) for nd in way.findall('nd')]
            if len(coords) >= 4 and coords[0] == coords[-1]:
                geometries.append(Polygon(coords))
                tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')}
                tags["id"] = way.get('id')
                attributes.append(tags)
    else:
        # 处理点
        print("检测到点数据，开始处理...")
        for node_id, coords in nodes.items():
            geometries.append(Point(coords))
            attributes.append({'id': node_id})

    gdf = gpd.GeoDataFrame(attributes, geometry=geometries)

    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    filename = os.path.splitext(os.path.basename(file_path))[0]
    output_path = os.path.join(output_directory, filename + ".shp")
    gdf.to_file(output_path)
    print(f"{file_path} 已成功转换并保存为 SHP 文件: {output_path}")

# 指定文件夹路径
source_directory = r"D:\近期需要做的\shp数据\OSM_new\new_new_OSM"  
output_directory = r"D:\近期需要做的\shp数据\OSM_new\OSM_output_new"

for file in os.listdir(source_directory):
    if file.endswith(".osm"):
        osm_file_path = os.path.join(source_directory, file)
        try:
            osm_to_shp(osm_file_path, output_directory)
        except Exception as e:
            print(f"处理 {file} 时出错: {str(e)}")


In [3]:
import geopandas as gpd
from shapely.geometry import Point, Polygon
import os
import xml.etree.ElementTree as ET
import pandas as pd

ALL_TAGS = [
    "osm_type", "water", "type", "natural", "name", "name:zh", "name:en", "leisure", "landsure", "building:material",
    "building", "source:zoomlevel", "source:tracer", "source:position", "wetland", "water_type",
    "source:name", "place", "gns:dsg", "tidal", "alt_name", "old_name", "int_name", "ele", "name:zh-Hant",
    "name:zh-Hans", "alt_name:en", "salt", "intermittent", "boat", "addr:housename", "wikipedia", "wikidata",
    "name:zh_pinyin", "code", "alias", "source_ref"
]

def initialize_attributes():
    return {tag: pd.NA for tag in ALL_TAGS}

def osm_to_gdf(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        data = f.read()

    root = ET.fromstring(data)
    nodes = {node.get('id'): (float(node.get('lon')), float(node.get('lat'))) for node in root.findall('node')}

    geometries = []
    attributes = []

    for way in root.findall('way'):
        coords = [nodes.get(nd.get('ref')) for nd in way.findall('nd')]
        if len(coords) >= 4 and coords[0] == coords[-1]:
            geometries.append(Polygon(coords))
            attr = initialize_attributes()
            attr.update({tag.get('k'): tag.get('v') for tag in way.findall('tag')})
            attr["id"] = way.get('id')
            attributes.append(attr)

    for node_id, coords in nodes.items():
        geometries.append(Point(coords))
        attr = initialize_attributes()
        attr['id'] = node_id
        attributes.append(attr)

    return gpd.GeoDataFrame(attributes, geometry=geometries)

# 指定文件夹路径
source_directory = r"D:\近期需要做的\shp数据\OSM_new\new_new_OSM"  
output_directory = r"D:\近期需要做的\shp数据\OSM_new\OSM_output_new"

all_gdfs = []
for file in os.listdir(source_directory):
    if file.endswith(".osm"):
        osm_file_path = os.path.join(source_directory, file)
        try:
            gdf = osm_to_gdf(osm_file_path)
            all_gdfs.append(gdf)
        except Exception as e:
            print(f"处理 {file} 时出错: {str(e)}")

if all_gdfs:
    combined_gdf = pd.concat(all_gdfs, ignore_index=True)
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
    output_path = os.path.join(output_directory, "combined_osm_data.shp")
    combined_gdf.to_file(output_path)
    print(f"所有 OSM 文件已合并并保存为 SHP 文件: {output_path}")


  combined_gdf.to_file(output_path)


RuntimeError: GDAL Error: Attempt to write non-polygon (POINT) geometry to POLYGON type shapefile.. Failed to write record: <fiona.model.Feature object at 0x0000018469553E80>

In [None]:
import geopandas as gpd
from shapely.geometry import Point, Polygon
import os
import xml.etree.ElementTree as ET

def collect_osm_data(file_path, all_nodes, all_ways):
    with open(file_path, 'r', encoding='utf-8') as f:
        data = f.read()

    root = ET.fromstring(data)
    nodes = {node.get('id'): (float(node.get('lon')), float(node.get('lat'))) for node in root.findall('node')}
    all_nodes.update(nodes)

    for way in root.findall('way'):
        coords = [nodes.get(nd.get('ref')) for nd in way.findall('nd')]
        if len(coords) >= 4 and coords[0] == coords[-1]:
            all_ways.append((Polygon(coords), way.attrib))

# 收集数据
all_nodes = {}
all_ways = []

source_directory = r"D:\近期需要做的\shp数据\OSM_new\new_OSM"
for file in os.listdir(source_directory):
    if file.endswith(".osm"):
        osm_file_path = os.path.join(source_directory, file)
        print(f"正在收集数据: {file}")
        collect_osm_data(osm_file_path, all_nodes, all_ways)

# 创建点的 GeoDataFrame
print("正在创建点的 SHP 文件...")
points = [Point(coord) for coord in all_nodes.values()]
point_gdf = gpd.GeoDataFrame([{'id': node_id} for node_id in all_nodes.keys()], geometry=points)

# 创建多边形的 GeoDataFrame
print("正在创建多边形的 SHP 文件...")
polygons = [way[0] for way in all_ways]
polygon_gdf = gpd.GeoDataFrame([way[1] for way in all_ways], geometry=polygons)

output_directory = r"D:\近期需要做的\shp数据\OSM_new\OSM_output"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# 保存为 SHP 文件
point_gdf.to_file(os.path.join(output_directory, "OSM_points.shp"))
polygon_gdf.to_file(os.path.join(output_directory, "OSM_polygons.shp"))

print("SHP 文件已创建完毕.")
