In [None]:
import requests
import math
import csv
import os

In [None]:
class LocationAnalyze:
    """
    用于获取和分析地理位置信息。分析的结果包括地址的经纬度、
    最近地铁站和公交车站、距离这些站点的直线距离和步行距离、附近的兴趣点数量等。
    """

    def __init__(self, address, amap_api_key):
        self.AMAP_API_KEY = amap_api_key
        self.ADDRESS = address
        self.input_lng, \
            self.input_lat, \
            self.metro_lng, \
            self.metro_lat, \
            self.bus_lng, \
            self.bus_lat, \
            self.metro_StraightDistance, \
            self.bus_StraightDistance, \
            self.metro_WalkingDistance, \
            self.bus_WalkingDistance, \
            self.metro_WalkingDuration, \
            self.bus_WalkingDuration, \
            self.resturantPOI, \
            self.collegePOI, \
            self.officePOI, \
            self.hospitalPOI, \
            self.gymPOI\
            = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0

    def find_geocode(self):
        """
        获得地址的经纬度信息
        """
        url = f"https://restapi.amap.com/v3/geocode/geo?key={self.AMAP_API_KEY}" \
              f"&address={self.ADDRESS}"
        response = requests.get(url)
        data = response.json()
        if data["status"] == "1" and int(data["count"]) > 0:
            location = data["geocodes"][0]["location"]
            self.input_lng, self.input_lat = location.split(",")
            del response
            del data
            return
        else:
            self.input_lng, self.input_lat = None, None
        
            return

    def find_nearest_metro_station(self):
        """
        获取附近1km内的地铁站信息
        """
        url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
              f"&location={self.input_lng},{self.input_lat}&types=150500&radius=1000"
        response = requests.get(url)
        
        data = response.json()
        nearest_distance = float('inf')
        nearest_station_lat = None
        nearest_station_lng = None
        if data["status"] == "1":
            metro_stations = data["pois"]
            for station in metro_stations:

                station_lng, station_lat = station["location"].split(",")
                distance = self.calculate_distance_between_locations(
                    station_lat, station_lng)
                if distance < nearest_distance:
                    nearest_distance = distance
                    nearest_station_lng, nearest_station_lat = station_lng, station_lat
            self.metro_lng, self.metro_lat = nearest_station_lng, nearest_station_lat
            del response
            del data
            return
        else:
            self.metro_lng, self.metro_lat = None, None
            return
        

    def find_nearest_bus_stop(self):
        """
        获取附近1km内的公交车站信息
        """
        url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
              f"&location={self.input_lng},{self.input_lat}&types=150700&radius=1000"
        response = requests.get(url)
        
        data = response.json()
        nearest_distance = float('inf')
        nearest_stop_lat = None
        nearest_stop_lng = None
        if data["status"] == "1":
            bus_stops = data["pois"]
            for station in bus_stops:
                station_lng, station_lat = station["location"].split(",")
                distance = self.calculate_distance_between_locations(
                    station_lat, station_lng)
                if distance < nearest_distance:
                    nearest_distance = distance
                    nearest_stop_lng, nearest_stop_lat = station_lng, station_lat
            self.bus_lng, self.bus_lat = nearest_stop_lng, nearest_stop_lat
            del response
            del data
            return
        else:
            self.bus_lng, self.bus_lat = None, None
            return

    def calculate_distance_between_locations(self, lat2, lng2):
        """计算两地之间的距离

        Args:
            lat2 (float): 第二个地点的纬度
            lng2 (float): 第二个地点的经度

        Returns:
            float: 两地之间的距离，单位为米
        """
        # 将自身的纬度和经度以及传入的纬度和经度从度转换为弧度
        lat1, lng1 = map(math.radians, [float(
            self.input_lat), float(self.input_lng)])
        lat2, lng2 = map(math.radians, [float(lat2), float(lng2)])

        # 计算两点的纬度差和经度差
        d_lat = lat2 - lat1
        d_lng = lng2 - lng1

        # 根据haversine公式计算两点之间的距离
        a = math.sin(d_lat / 2) ** 2 + \
            math.cos(lat1) * math.cos(lat2) * math.sin(d_lng / 2) ** 2
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

        # 地球平均半径，单位为千米
        earth_radius = 6371.0
        # 返回两点之间的距离，单位为米
        return c * earth_radius * 1000

    def find_metro_walking_info(self):
        """
        获取从输入地址步行至最近地铁站的距离和时间信息。
        """
        url = f"https://restapi.amap.com/v5/direction/walking?isindoor=1" \
              f"&origin={self.input_lng},{self.input_lat}" \
              f"&destination={self.metro_lng},{self.metro_lat}" \
              f"&key={self.AMAP_API_KEY}"
        response = requests.get(url)
        
        data = response.json()
        if data["status"] == "1" and data["count"] == "1":
            path = data["route"]["paths"][0]
            distance = int(path["distance"])
            duration = int(path["cost"]["duration"])
            self.metro_WalkingDistance, self.metro_WalkingDuration = distance, duration
            del response
            del data
            return
        else:
            self.metro_WalkingDistance, self.metro_WalkingDuration = None, None
            return

    def find_bus_walking_info(self):
        """
        获取从输入地址步行至最近公交车站的距离和时间信息。
        """
        url = f"https://restapi.amap.com/v5/direction/walking?isindoor=1" \
              f"&origin={self.input_lng},{self.input_lat}" \
              f"&destination={self.bus_lng},{self.bus_lat}" \
              f"&key={self.AMAP_API_KEY}"
        response = requests.get(url)
        
        data = response.json()
        if data["status"] == "1" and data["count"] == "1":
            path = data["route"]["paths"][0]
            distance = int(path["distance"])
            duration = int(path["cost"]["duration"])
            self.bus_WalkingDistance, self.bus_WalkingDuration = distance, duration
            del response
            del data
            return
        else:
            self.bus_WalkingDistance, self.bus_WalkingDuration = None, None
            return

    def search_nearby_resturant_pois(self):
        """
        搜索距离输入地址 1km 内的餐饮兴趣点（POI）。
        """
        catering_categories = [
            "050000"
        ]

        self.resturantPOI = 0

        for category in catering_categories:
            url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
                  f"&location={self.input_lng},{self.input_lat}&types={category}&radius=1000"
            response = requests.get(url)
            
            data = response.json()
            if data["status"] == "1":
                self.resturantPOI += len(data["pois"])
                del response
                del data
            else:
                return

        return

    def search_nearby_college_pois(self):
        """
        搜索距离输入地址 1km 内的大学的 POI。
        """
        College_types = ['141201',  # 高等院校
                         #  '141205',  # 成人教育
                         #  '141206'  # 职业技术学校
                         ]
        College_count = 0

        for col_type in College_types:
            url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
                  f"&location={self.input_lng},{self.input_lat}&types={col_type}&radius=1000"
            response = requests.get(url)
            
            data = response.json()
            if data["status"] == "1":
                College_count += int(data["count"])

        self.CollegePOI = College_count
        del response
        del data
        return

    def search_nearby_office_pois(self):
        """
        搜索距离输入地址 1km 内的写字楼 POI。
        """
        url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
              f"&location={self.input_lng},{self.input_lat}&types=120201&radius=1000"
        response = requests.get(url)
        
        data = response.json()
        if data["status"] == "1":
            self.officePOI = int(data["count"])
            del response
            del data
            return
        else:
            return

    def search_nearby_hospital_pois(self):
        """
        搜索距离输入地址 1km 内的医院的 POI。
        """
        Hospital_types = ['090100',  # 综合医院
                          '090101',  # 三级甲等医院
                          '090102',  # 卫生院
                          '090200',  # 专科医院
                          '090601',  # 药房
                          ]
        Hospital_count = 0

        for hos_type in Hospital_types:
            url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
                  f"&location={self.input_lng},{self.input_lat}&types={hos_type}&radius=1000"
            response = requests.get(url)
            
            data = response.json()
            if data["status"] == "1":
                Hospital_count += int(data["count"])

        self.HospitalPOI = Hospital_count
        del response
        del data
        return

    def search_nearby_gym_pois(self):
        """
        搜索距离输入地址 1km 内的体育馆 POI。
        """
        url = f"https://restapi.amap.com/v3/place/around?key={self.AMAP_API_KEY}" \
              f"&location={self.input_lng},{self.input_lat}&types=080100&radius=1000"
        response = requests.get(url)
        
        data = response.json()
        if data["status"] == "1":
            self.gymPOI = int(data["count"])
            del response
            del data
            return
        else:
            return

    def save_to_csv(self, file_path):
        """
        保存分析结果到CSV文件。

        Args:
            file_path (str): 保存的CSV文件路径。
        """
        headers = [
            "地址", "所在经度", "所在纬度",
            "最近的地铁站经度", "最近的地铁站纬度",
            "最近的公交站经度", "最近的公交站纬度",
            "地铁直线距离", "公交直线距离",
            "地铁步行距离", "公交步行距离",
            "地铁步行时长", "公交步行时长",
            "1km内餐厅", "1km内大学或大专", "1km内办公室", "1km内医院", "1km内健身房"
        ]

        row = [
            self.ADDRESS, self.input_lng, self.input_lat,
            self.metro_lng, self.metro_lat,
            self.bus_lng, self.bus_lat,
            self.metro_StraightDistance, self.bus_StraightDistance,
            self.metro_WalkingDistance, self.bus_WalkingDistance,
            self.metro_WalkingDuration, self.bus_WalkingDuration,
            self.resturantPOI, self.collegePOI, self.officePOI, self.hospitalPOI, self.gymPOI
        ]
        file_exists = os.path.isfile(
            file_path) and os.path.getsize(file_path) > 0

        with open(file_path, 'a', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            # 如果文件不存在或为空，则写入表头
            if not file_exists:
                writer.writerow(headers)
            writer.writerow(row)

    def analyze(self):
        """
        执行上述所有方法，分析输入地址附近的地铁站、公交车站、兴趣点等信息。
        """
        self.find_geocode()
        self.find_nearest_metro_station()
        self.find_metro_walking_info()
        self.find_nearest_bus_stop()
        self.find_bus_walking_info()
        self.search_nearby_resturant_pois()
        self.search_nearby_college_pois()
        self.search_nearby_office_pois()
        self.search_nearby_hospital_pois()
        self.search_nearby_gym_pois()
        if self.metro_lat is not None and self.metro_lng is not None:
            self.metro_StraightDistance = self.calculate_distance_between_locations(
                self.metro_lat, self.metro_lng)
        else:
            self.metro_StraightDistance = None

        if self.bus_lat is not None and self.bus_lng is not None:
            self.bus_StraightDistance = self.calculate_distance_between_locations(
                self.bus_lat, self.bus_lng)
        else:
            self.bus_StraightDistance = None

In [None]:
communities_list = "./input/我爱我家-小区.csv"
addresses = []

with open(communities_list, 'r', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile)
    # 假设你想读取某一特定列，比如第一列为地址
    for row in reader:
        addresses.append(row[0])  # 假设地址在每行的第一列

# 为每个地址添加"天津市"前缀
addresses = ["天津市" + address for address in addresses]

In [None]:
for address in addresses:
    a = LocationAnalyze(address)
    a.analyze()
    file_path = "./output/Result.csv"
    a.save_to_csv(file_path)
    del a  # 显式删除实例