# 台北市餐廳爬蟲

使用網格化策略爬取台北市所有餐廳數據

In [2]:
import requests
import pandas as pd
import time
import json
from typing import List, Dict, Tuple
import numpy as np
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

# 設置日誌
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ModuleNotFoundError: No module named 'pandas'

In [None]:
@dataclass
class Restaurant:
    name: str
    lat: float
    lng: float
    address: str
    phone: str = ""
    rating: float = 0.0
    price_level: int = 0
    types: List[str] = None
    source: str = ""
    
    def __post_init__(self):
        if self.types is None:
            self.types = []

# 台北市邊界定義
TAIPEI_BOUNDS = {
    'north': 25.2,  # 北緯
    'south': 24.9,  # 南緯
    'east': 121.7,  # 東經
    'west': 121.4   # 西經
}

# 網格大小 (約1km)
GRID_SIZE = 0.01

print(f"台北市邊界: {TAIPEI_BOUNDS}")
print(f"網格大小: {GRID_SIZE}° (約1km)")

In [None]:
def generate_grid() -> List[Tuple[float, float, float, float]]:
    """生成台北市網格"""
    grids = []
    
    lat_start = TAIPEI_BOUNDS['south']
    lat_end = TAIPEI_BOUNDS['north']
    lng_start = TAIPEI_BOUNDS['west']
    lng_end = TAIPEI_BOUNDS['east']
    
    lat = lat_start
    while lat < lat_end:
        lng = lng_start
        while lng < lng_end:
            grids.append((lat, lat + GRID_SIZE, lng, lng + GRID_SIZE))
            lng += GRID_SIZE
        lat += GRID_SIZE
    
    return grids

grids = generate_grid()
print(f"總共生成 {len(grids)} 個網格")

In [None]:
class GooglePlacesScraper:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://maps.googleapis.com/maps/api/place"
    
    def search_restaurants(self, lat: float, lng: float, radius: int = 1000) -> List[Restaurant]:
        """搜尋指定位置的餐廳"""
        restaurants = []
        
        # 搜尋餐廳
        url = f"{self.base_url}/nearbysearch/json"
        params = {
            'location': f"{lat},{lng}",
            'radius': radius,
            'type': 'restaurant',
            'key': self.api_key
        }
        
        try:
            response = requests.get(url, params=params)
            data = response.json()
            
            if data['status'] == 'OK':
                for place in data['results']:
                    restaurant = Restaurant(
                        name=place.get('name', ''),
                        lat=place['geometry']['location']['lat'],
                        lng=place['geometry']['location']['lng'],
                        address=place.get('vicinity', ''),
                        rating=place.get('rating', 0.0),
                        price_level=place.get('price_level', 0),
                        types=place.get('types', []),
                        source='google_places'
                    )
                    restaurants.append(restaurant)
            
        except Exception as e:
            logger.error(f"Google Places API 錯誤: {e}")
        
        return restaurants

# 使用範例 (需要 API key)
# scraper = GooglePlacesScraper('YOUR_API_KEY')
# restaurants = scraper.search_restaurants(25.0330, 121.5654)  # 台北101

In [None]:
class OpenStreetMapScraper:
    """使用 Overpass API 爬取 OpenStreetMap 數據"""
    
    def __init__(self):
        self.base_url = "https://overpass-api.de/api/interpreter"
    
    def search_restaurants(self, lat_min: float, lat_max: float, 
                          lng_min: float, lng_max: float) -> List[Restaurant]:
        """搜尋指定區域的餐廳"""
        restaurants = []
        
        # Overpass QL 查詢
        query = f"""
[out:json][timeout:25];
(
  node["amenity"="restaurant"]({lat_min},{lng_min},{lat_max},{lng_max});
  way["amenity"="restaurant"]({lat_min},{lng_min},{lat_max},{lng_max});
  relation["amenity"="restaurant"]({lat_min},{lng_min},{lat_max},{lng_max});
);
out body;
>;
out skel qt;
"""
        
        try:
            response = requests.get(self.base_url, params={'data': query})
            data = response.json()
            
            for element in data.get('elements', []):
                if element['type'] == 'node' and 'tags' in element:
                    tags = element['tags']
                    restaurant = Restaurant(
                        name=tags.get('name', ''),
                        lat=element['lat'],
                        lng=element['lon'],
                        address=tags.get('addr:street', ''),
                        phone=tags.get('phone', ''),
                        types=[tags.get('cuisine', '')] if tags.get('cuisine') else [],
                        source='openstreetmap'
                    )
                    restaurants.append(restaurant)
            
        except Exception as e:
            logger.error(f"OpenStreetMap API 錯誤: {e}")
        
        return restaurants

osm_scraper = OpenStreetMapScraper()
print("OpenStreetMap 爬蟲已初始化")

In [None]:
class TaipeiOpenDataScraper:
    """台北市政府開放資料爬蟲"""
    
    def __init__(self):
        self.base_url = "https://data.taipei/api/v1/dataset"
    
    def get_restaurants(self) -> List[Restaurant]:
        """獲取台北市餐廳資料"""
        restaurants = []
        
        # 台北市餐廳資料集 ID (需要查詢實際的資料集)
        dataset_id = "your_dataset_id"
        
        try:
            url = f"{self.base_url}/{dataset_id}?scope=resourceAquire"
            response = requests.get(url)
            data = response.json()
            
            for item in data.get('result', {}).get('results', []):
                restaurant = Restaurant(
                    name=item.get('name', ''),
                    lat=float(item.get('lat', 0)),
                    lng=float(item.get('lng', 0)),
                    address=item.get('address', ''),
                    phone=item.get('phone', ''),
                    source='taipei_open_data'
                )
                restaurants.append(restaurant)
            
        except Exception as e:
            logger.error(f"台北開放資料 API 錯誤: {e}")
        
        return restaurants

print("台北開放資料爬蟲已初始化")

In [None]:
class RestaurantCrawler:
    """整合多個數據源的餐廳爬蟲"""
    
    def __init__(self):
        self.osm_scraper = OpenStreetMapScraper()
        self.taipei_scraper = TaipeiOpenDataScraper()
        self.all_restaurants = []
    
    def crawl_grid(self, grid: Tuple[float, float, float, float]) -> List[Restaurant]:
        """爬取單個網格的餐廳"""
        lat_min, lat_max, lng_min, lng_max = grid
        restaurants = []
        
        # 從 OpenStreetMap 爬取
        osm_restaurants = self.osm_scraper.search_restaurants(lat_min, lat_max, lng_min, lng_max)
        restaurants.extend(osm_restaurants)
        
        logger.info(f"網格 ({lat_min:.4f}, {lng_min:.4f}) 找到 {len(osm_restaurants)} 家餐廳")
        
        return restaurants
    
    def crawl_all_grids(self, max_workers: int = 5) -> List[Restaurant]:
        """並行爬取所有網格"""
        all_restaurants = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有網格任務
            future_to_grid = {executor.submit(self.crawl_grid, grid): grid for grid in grids}
            
            # 收集結果
            for future in as_completed(future_to_grid):
                grid = future_to_grid[future]
                try:
                    restaurants = future.result()
                    all_restaurants.extend(restaurants)
                    
                    # 避免 API 限制
                    time.sleep(0.1)
                    
                except Exception as e:
                    logger.error(f"網格 {grid} 爬取失敗: {e}")
        
        return all_restaurants
    
    def save_to_csv(self, restaurants: List[Restaurant], filename: str = "taipei_restaurants.csv"):
        """保存到 CSV"""
        data = []
        for restaurant in restaurants:
            data.append({
                'name': restaurant.name,
                'lat': restaurant.lat,
                'lng': restaurant.lng,
                'address': restaurant.address,
                'phone': restaurant.phone,
                'rating': restaurant.rating,
                'price_level': restaurant.price_level,
                'types': ','.join(restaurant.types),
                'source': restaurant.source
            })
        
        df = pd.DataFrame(data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"已保存 {len(restaurants)} 家餐廳到 {filename}")
    
    def remove_duplicates(self, restaurants: List[Restaurant]) -> List[Restaurant]:
        """移除重複餐廳 (基於名稱和位置)"""
        seen = set()
        unique_restaurants = []
        
        for restaurant in restaurants:
            # 使用名稱和位置作為唯一標識
            key = (restaurant.name, round(restaurant.lat, 4), round(restaurant.lng, 4))
            if key not in seen:
                seen.add(key)
                unique_restaurants.append(restaurant)
        
        return unique_restaurants

crawler = RestaurantCrawler()
print("餐廳爬蟲已初始化")

In [None]:
# 開始爬取 (小規模測試)
print("開始爬取台北市餐廳...")

# 先測試前5個網格
test_grids = grids[:5]
test_restaurants = []

for grid in test_grids:
    restaurants = crawler.crawl_grid(grid)
    test_restaurants.extend(restaurants)
    time.sleep(1)  # 避免過於頻繁的請求

print(f"測試爬取完成，找到 {len(test_restaurants)} 家餐廳")

# 移除重複
unique_restaurants = crawler.remove_duplicates(test_restaurants)
print(f"去重後剩餘 {len(unique_restaurants)} 家餐廳")

# 顯示前10家餐廳
for i, restaurant in enumerate(unique_restaurants[:10]):
    print(f"{i+1}. {restaurant.name} - {restaurant.address} ({restaurant.lat:.4f}, {restaurant.lng:.4f})")

In [None]:
# 保存測試結果
if 'unique_restaurants' in locals() and unique_restaurants:
    crawler.save_to_csv(unique_restaurants, "taipei_restaurants_test.csv")
    
    # 顯示統計信息
    df = pd.DataFrame([{
        'name': r.name,
        'lat': r.lat,
        'lng': r.lng,
        'address': r.address,
        'source': r.source
    } for r in unique_restaurants])
    
    print("\n統計信息:")
    print(f"總餐廳數: {len(df)}")
    print(f"數據源分布:\n{df['source'].value_counts()}")
    print(f"\n經緯度範圍:")
    print(f"緯度: {df['lat'].min():.4f} - {df['lat'].max():.4f}")
    print(f"經度: {df['lng'].min():.4f} - {df['lng'].max():.4f}")

In [None]:
# 完整爬取 (謹慎使用，可能需要很長時間)
def full_crawl():
    """完整爬取所有網格"""
    print("開始完整爬取...")
    print(f"總共 {len(grids)} 個網格")
    
    all_restaurants = crawler.crawl_all_grids(max_workers=3)
    
    print(f"爬取完成，總共找到 {len(all_restaurants)} 家餐廳")
    
    # 移除重複
    unique_restaurants = crawler.remove_duplicates(all_restaurants)
    print(f"去重後剩餘 {len(unique_restaurants)} 家餐廳")
    
    # 保存結果
    crawler.save_to_csv(unique_restaurants, "taipei_restaurants_full.csv")
    
    return unique_restaurants

# 取消註釋來執行完整爬取
# full_results = full_crawl()

In [None]:
# 數據分析和可視化
import matplotlib.pyplot as plt

def analyze_restaurants(restaurants: List[Restaurant]):
    """分析餐廳數據"""
    if not restaurants:
        print("沒有餐廳數據可分析")
        return
    
    # 轉換為 DataFrame
    df = pd.DataFrame([{
        'name': r.name,
        'lat': r.lat,
        'lng': r.lng,
        'address': r.address,
        'rating': r.rating,
        'source': r.source
    } for r in restaurants])
    
    # 繪製分布圖
    plt.figure(figsize=(15, 10))
    
    # 1. 餐廳位置分布
    plt.subplot(2, 2, 1)
    plt.scatter(df['lng'], df['lat'], alpha=0.6, s=10)
    plt.title('台北市餐廳分布')
    plt.xlabel('經度')
    plt.ylabel('緯度')
    plt.grid(True)
    
    # 2. 數據源分布
    plt.subplot(2, 2, 2)
    df['source'].value_counts().plot(kind='bar')
    plt.title('數據源分布')
    plt.xticks(rotation=45)
    
    # 3. 評分分布
    plt.subplot(2, 2, 3)
    if df['rating'].sum() > 0:
        df['rating'].hist(bins=20)
        plt.title('餐廳評分分布')
        plt.xlabel('評分')
        plt.ylabel('數量')
    
    # 4. 密度熱圖
    plt.subplot(2, 2, 4)
    plt.hexbin(df['lng'], df['lat'], gridsize=20, cmap='YlOrRd')
    plt.title('餐廳密度熱圖')
    plt.xlabel('經度')
    plt.ylabel('緯度')
    
    plt.tight_layout()
    plt.show()
    
    # 統計信息
    print("\n=== 統計信息 ===")
    print(f"總餐廳數: {len(df)}")
    print(f"數據源分布:\n{df['source'].value_counts()}")
    if df['rating'].sum() > 0:
        print(f"平均評分: {df['rating'].mean():.2f}")
        print(f"最高評分: {df['rating'].max():.2f}")
    print(f"經緯度範圍: 緯度({df['lat'].min():.4f}, {df['lat'].max():.4f}), 經度({df['lng'].min():.4f}, {df['lng'].max():.4f})")

# 分析測試數據
if 'unique_restaurants' in locals() and unique_restaurants:
    analyze_restaurants(unique_restaurants)