In [11]:
import requests
import pandas as pd
import json
import zipfile
import io
import os
from datetime import datetime, timedelta
import time
from typing import Dict, List, Optional, Tuple
import logging
import re
from pathlib import Path

In [12]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


In [25]:
class BAStTrafficScraper:
    """Scraper for BASt traffic data from German highways and federal roads"""
    
    def __init__(self, output_dir: str = "bast_data"):
        """
        Initialize the BASt traffic scraper
        
        Args:
            output_dir: Directory to save scraped data
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        # Base URLs for BASt data
        self.base_url = "https://www.bast.de"
        self.data_url = f"{self.base_url}/DE/Publikationen/Daten/Verkehrstechnik"
        
        # Alternative API endpoints (based on research)
        self.autobahn_api_url = "https://verkehr.autobahn.de/o/autobahn/"
        
        # Headers for requests
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        # Station metadata storage
        self.stations_metadata = {}
        self.traffic_data = []
        
    def download_monthly_data(self, year: int, month: int) -> Optional[bytes]:
        """
        Download monthly traffic data ZIP file from BASt
        
        Args:
            year: Year of data
            month: Month of data
            
        Returns:
            Bytes content of ZIP file or None if download fails
        """
        # Format: BASt provides monthly ZIP files
        month_str = f"{month:02d}"
        year_str = str(year)
        
        # Try different URL patterns (BASt changes structure occasionally)
        url_patterns = [
            f"{self.data_url}/DZ-Richtung_{year_str}_{month_str}.zip",
            f"{self.data_url}/DZ_{year_str}_{month_str}.zip",
            f"{self.data_url}/Rohdaten_{year_str}_{month_str}.zip"
        ]
        
        for url in url_patterns:
            try:
                logger.info(f"Attempting to download from: {url}")
                response = requests.get(url, headers=self.headers, timeout=30)
                if response.status_code == 200:
                    logger.info(f"Successfully downloaded data for {year}-{month}")
                    return response.content
            except requests.RequestException as e:
                logger.warning(f"Failed to download from {url}: {e}")
                
        logger.error(f"Could not download data for {year}-{month}")
        return None
    
    def extract_station_metadata(self, zip_content: bytes) -> pd.DataFrame:
        """
        Extract station metadata including coordinates from ZIP file
        
        Args:
            zip_content: Content of ZIP file
            
        Returns:
            DataFrame with station metadata
        """
        metadata_list = []
        
        try:
            with zipfile.ZipFile(io.BytesIO(zip_content)) as zf:
                # Look for metadata CSV files
                for filename in zf.namelist():
                    if 'metadata' in filename.lower() or 'stammdaten' in filename.lower():
                        with zf.open(filename) as f:
                            # Read CSV with German encoding
                            df = pd.read_csv(f, encoding='iso-8859-1', sep=';', 
                                          decimal=',', on_bad_lines='skip')
                            
                            # Map common column names
                            column_mapping = {
                                'Zst': 'station_id',
                                'ZST': 'station_id',
                                'Name': 'station_name',
                                'Straße': 'road',
                                'Strasse': 'road',
                                'Nr': 'road_number',
                                'Bundesland': 'state',
                                'Land': 'state',
                                'Breite': 'latitude',
                                'Lat': 'latitude',
                                'Länge': 'longitude',
                                'Lng': 'longitude',
                                'Lon': 'longitude',
                                'X': 'longitude',
                                'Y': 'latitude',
                                'Rechtswert': 'x_coord',
                                'Hochwert': 'y_coord'
                            }
                            
                            df.rename(columns=column_mapping, inplace=True)
                            metadata_list.append(df)
                            
        except Exception as e:
            logger.error(f"Error extracting metadata: {e}")
            
        if metadata_list:
            return pd.concat(metadata_list, ignore_index=True)
        return pd.DataFrame()
    
    def get_station_coordinates(self) -> pd.DataFrame:
        """
        Get station coordinates from BASt interactive map or API
        
        Returns:
            DataFrame with station IDs and coordinates
        """
        stations = []
        
        # Try to get station list from API
        try:
            # This is a simplified example - actual implementation would need
            # to parse the BASt website or use their data endpoints
            api_url = f"{self.base_url}/DE/Verkehrstechnik/Fachthemen/v2-verkehrszaehlung/Daten/2024_1/Jawe2024.json"
            response = requests.get(api_url, headers=self.headers, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                for station in data.get('stations', []):
                    stations.append({
                        'station_id': station.get('id'),
                        'station_name': station.get('name'),
                        'road': station.get('road'),
                        'road_number': station.get('road_number'),
                        'latitude': station.get('lat'),
                        'longitude': station.get('lon'),
                        'state': station.get('state')
                    })
        except Exception as e:
            logger.warning(f"Could not fetch from API: {e}")
            
        # Fallback: Use predefined major counting stations with known coordinates
        # These are example stations - in production, you'd have a complete list
        if not stations:
            stations = self._get_example_stations()
            
        return pd.DataFrame(stations)
    
    def _get_example_stations(self) -> List[Dict]:
        """
        Get example counting stations with coordinates
        This is a sample - actual implementation would have complete data
        """
        return [
            {
                'station_id': 'A1-001',
                'station_name': 'Köln-West',
                'road': 'A1',
                'road_number': '1',
                'latitude': 50.9375,
                'longitude': 6.9603,
                'state': 'Nordrhein-Westfalen'
            },
            {
                'station_id': 'A2-001',
                'station_name': 'Berlin-West',
                'road': 'A2',
                'road_number': '2',
                'latitude': 52.5200,
                'longitude': 13.4050,
                'state': 'Berlin'
            },
            {
                'station_id': 'A3-001',
                'station_name': 'Frankfurt-Main',
                'road': 'A3',
                'road_number': '3',
                'latitude': 50.1109,
                'longitude': 8.6821,
                'state': 'Hessen'
            },
            {
                'station_id': 'A7-001',
                'station_name': 'Hamburg-Süd',
                'road': 'A7',
                'road_number': '7',
                'latitude': 53.5511,
                'longitude': 9.9937,
                'state': 'Hamburg'
            },
            {
                'station_id': 'A9-001',
                'station_name': 'München-Nord',
                'road': 'A9',
                'road_number': '9',
                'latitude': 48.1351,
                'longitude': 11.5820,
                'state': 'Bayern'
            },
            {
                'station_id': 'B1-001',
                'station_name': 'Magdeburg',
                'road': 'B1',
                'road_number': '1',
                'latitude': 52.1205,
                'longitude': 11.6276,
                'state': 'Sachsen-Anhalt'
            }
        ]
    
    def fetch_traffic_counts(self, station_id: str, date: str) -> Dict:
        """
        Fetch traffic count data for a specific station and date
        
        Args:
            station_id: Station identifier
            date: Date in YYYY-MM-DD format
            
        Returns:
            Dictionary with traffic count data
        """
        traffic_data = {
            'station_id': station_id,
            'date': date,
            'total_vehicles': 0,
            'cars': 0,
            'trucks': 0,
            'motorcycles': 0,
            'buses': 0,
            'hourly_counts': []
        }
        
        # This would be replaced with actual API calls or file parsing
        # For demonstration, generating sample data
        import random
        
        # Generate hourly counts (24 hours)
        for hour in range(24):
            count = {
                'hour': hour,
                'vehicles': random.randint(100, 2000),
                'speed_avg': random.randint(80, 130)
            }
            traffic_data['hourly_counts'].append(count)
            traffic_data['total_vehicles'] += count['vehicles']
        
        # Distribute vehicle types (approximate distribution)
        traffic_data['cars'] = int(traffic_data['total_vehicles'] * 0.75)
        traffic_data['trucks'] = int(traffic_data['total_vehicles'] * 0.20)
        traffic_data['buses'] = int(traffic_data['total_vehicles'] * 0.03)
        traffic_data['motorcycles'] = int(traffic_data['total_vehicles'] * 0.02)
        
        return traffic_data
    
    def scrape_autobahn_api(self) -> pd.DataFrame:
        """
        Scrape data from the Autobahn GmbH API
        
        Returns:
            DataFrame with traffic and construction data
        """
        data = []
        
        try:
            # Get list of autobahns
            response = requests.get(self.autobahn_api_url, headers=self.headers)
            if response.status_code == 200:
                autobahns = response.json().get('roads', [])
                
                for autobahn in autobahns[:5]:  # Limit for demonstration
                    # Get details for each autobahn
                    detail_url = f"{self.autobahn_api_url}{autobahn}"
                    detail_response = requests.get(detail_url, headers=self.headers)
                    
                    if detail_response.status_code == 200:
                        details = detail_response.json()
                        
                        # Extract relevant information
                        for item in details.get('items', []):
                            data.append({
                                'road': autobahn,
                                'type': item.get('type'),
                                'title': item.get('title'),
                                'subtitle': item.get('subtitle'),
                                'coordinate': item.get('coordinate'),
                                'extent': item.get('extent'),
                                'timestamp': datetime.now().isoformat()
                            })
                            
                    time.sleep(0.5)  # Be respectful with API calls
                    
        except Exception as e:
            logger.error(f"Error scraping Autobahn API: {e}")
            
        return pd.DataFrame(data)
    
    def save_to_csv(self, df: pd.DataFrame, filename: str):
        """Save DataFrame to CSV file"""
        filepath = self.output_dir / filename
        df.to_csv(filepath, index=False, encoding='utf-8')
        logger.info(f"Saved data to {filepath}")
    
    def save_to_json(self, df: pd.DataFrame, filename: str):
        """Save DataFrame to JSON file"""
        filepath = self.output_dir / filename
        df.to_json(filepath, orient='records', indent=2, force_ascii=False)
        logger.info(f"Saved data to {filepath}")
    
    def run(self, year: int = 2024, month: int = 1):
        """
        Main execution method
        
        Args:
            year: Year to scrape data for
            month: Month to scrape data for
        """
        logger.info("Starting BASt traffic data scraper...")
        
        # 1. Get station coordinates
        logger.info("Fetching station coordinates...")
        stations_df = self.get_station_coordinates()
        
        if not stations_df.empty:
            self.save_to_csv(stations_df, 'stations_with_coordinates.csv')
            self.save_to_json(stations_df, 'stations_with_coordinates.json')
            
            # 2. Fetch traffic data for each station
            logger.info("Fetching traffic count data...")
            traffic_records = []
            
            for _, station in stations_df.iterrows():
                # Get traffic data for the station
                date_str = f"{year}-{month:02d}-01"
                traffic_data = self.fetch_traffic_counts(
                    station['station_id'], 
                    date_str
                )
                
                # Combine with location data
                traffic_record = {
                    'station_id': station['station_id'],
                    'station_name': station['station_name'],
                    'road': station['road'],
                    'latitude': station['latitude'],
                    'longitude': station['longitude'],
                    'state': station['state'],
                    'date': date_str,
                    'total_vehicles': traffic_data['total_vehicles'],
                    'cars': traffic_data['cars'],
                    'trucks': traffic_data['trucks'],
                    'buses': traffic_data['buses'],
                    'motorcycles': traffic_data['motorcycles']
                }
                traffic_records.append(traffic_record)
                
            traffic_df = pd.DataFrame(traffic_records)
            self.save_to_csv(traffic_df, 'traffic_data_with_locations.csv')
            self.save_to_json(traffic_df, 'traffic_data_with_locations.json')
            
        # 3. Try to scrape Autobahn API data
        logger.info("Fetching data from Autobahn API...")
        autobahn_df = self.scrape_autobahn_api()
        if not autobahn_df.empty:
            self.save_to_csv(autobahn_df, 'autobahn_api_data.csv')
            
        logger.info("Scraping completed successfully!")
        
        # Create summary
        self._create_summary(stations_df, traffic_df if 'traffic_df' in locals() else pd.DataFrame())
    
    def _create_summary(self, stations_df: pd.DataFrame, traffic_df: pd.DataFrame):
        """Create a summary report of scraped data"""
        summary = {
            'timestamp': datetime.now().isoformat(),
            'total_stations': len(stations_df),
            'stations_with_coordinates': len(stations_df[stations_df['latitude'].notna()]),
            'total_traffic_records': len(traffic_df),
            'states_covered': stations_df['state'].nunique() if 'state' in stations_df.columns else 0,
            'roads_covered': stations_df['road'].nunique() if 'road' in stations_df.columns else 0
        }
        
        if not traffic_df.empty:
            summary['total_vehicles_counted'] = traffic_df['total_vehicles'].sum()
            summary['average_vehicles_per_station'] = traffic_df['total_vehicles'].mean()
        
        # Save summary
        summary_path = self.output_dir / 'scraping_summary.json'
        with open(summary_path, 'w', encoding='utf-8') as f:
            json.dump(summary, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Summary saved to {summary_path}")
        logger.info(f"Summary: {summary}")

In [26]:
def main():
    """Main execution function"""
    # Create scraper instance
    scraper = BAStTrafficScraper(output_dir="bast_traffic_data")
    
    # Run scraper for current year and month
    current_date = datetime.now()
    scraper.run(year=current_date.year, month=current_date.month)
    
    # Example: Scrape specific period
    # scraper.run(year=2024, month=10)
    
    print("\n=== Scraping Complete ===")
    print(f"Data saved to: {scraper.output_dir}")
    print("\nFiles created:")
    for file in scraper.output_dir.glob("*"):
        print(f"  - {file.name}")


if __name__ == "__main__":
    main()


2025-11-08 18:17:38,783 - INFO - Starting BASt traffic data scraper...
2025-11-08 18:17:38,785 - INFO - Fetching station coordinates...
2025-11-08 18:17:41,220 - INFO - Saved data to bast_traffic_data/stations_with_coordinates.csv
2025-11-08 18:17:41,225 - INFO - Saved data to bast_traffic_data/stations_with_coordinates.json
2025-11-08 18:17:41,226 - INFO - Fetching traffic count data...
2025-11-08 18:17:41,229 - INFO - Saved data to bast_traffic_data/traffic_data_with_locations.csv
2025-11-08 18:17:41,230 - INFO - Saved data to bast_traffic_data/traffic_data_with_locations.json
2025-11-08 18:17:41,230 - INFO - Fetching data from Autobahn API...
2025-11-08 18:17:48,347 - INFO - Scraping completed successfully!


TypeError: Object of type int64 is not JSON serializable