<a href="https://colab.research.google.com/github/geosensing/streetsense2/blob/main/cloud_vision_road_condition_coder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import cv2
from tqdm import tqdm
import json
import requests
import io
import base64

class CloudVisionTokenAnalyzer:
    """
    Analyzes street conditions from Street View images using Google Cloud Vision API with API token.
    Focuses on potholes, garbage, and overall street condition.
    """

    def __init__(self, api_key=None):
        """
        Initialize the Cloud Vision API Analyzer.

        Args:
            api_key (str): Google Cloud API key
        """
        self.api_key = api_key
        self.api_url = "https://vision.googleapis.com/v1/images:annotate"

        # Define which labels might indicate problems
        self.pothole_keywords = [
            'pothole', 'hole', 'crack', 'cracked', 'damaged', 'damage',
            'broken', 'asphalt damage', 'road damage', 'pavement damage'
        ]

        self.garbage_keywords = [
            'garbage', 'trash', 'litter', 'waste', 'rubbish', 'debris',
            'bottle', 'can', 'plastic', 'paper', 'bag', 'junk'
        ]

        print("Initialized Cloud Vision Token Analyzer")

    def analyze_image(self, image_path):
        """
        Analyze a single street view image using Cloud Vision API.

        Args:
            image_path (str): Path to the image file

        Returns:
            dict: Analysis results containing detected issues
        """
        if not self.api_key:
            return {"error": "API key not provided"}

        if not os.path.exists(image_path):
            return {"error": f"Image file not found: {image_path}"}

        try:
            # Read and encode image
            with open(image_path, 'rb') as image_file:
                encoded_image = base64.b64encode(image_file.read()).decode('UTF-8')

            # Create request payload
            request_json = {
                "requests": [
                    {
                        "image": {
                            "content": encoded_image
                        },
                        "features": [
                            {"type": "LABEL_DETECTION", "maxResults": 20},
                            {"type": "OBJECT_LOCALIZATION", "maxResults": 20}
                        ]
                    }
                ]
            }

            # Make API request
            response = requests.post(
                f"{self.api_url}?key={self.api_key}",
                json=request_json
            )

            if response.status_code != 200:
                return {"error": f"API Error: {response.status_code} - {response.text}"}

            # Parse response
            api_response = response.json()["responses"][0]

            # Extract annotations
            labels = api_response.get("labelAnnotations", [])
            objects = api_response.get("localizedObjectAnnotations", [])

            # Analyze the annotations
            results = {
                "image_path": image_path,
                "has_road": self._check_if_road(labels, objects),
                "potholes": self._detect_potholes(labels, objects),
                "garbage": self._detect_garbage(labels, objects),
                "raw_labels": [{"description": label.get("description", ""), "score": label.get("score", 0)} for label in labels],
                "raw_objects": [{"name": obj.get("name", ""), "score": obj.get("score", 0)} for obj in objects]
            }

            # Calculate overall street condition score
            self._calculate_overall_condition(results)

            return results

        except Exception as e:
            return {"error": f"Error analyzing image with Cloud Vision: {str(e)}"}

    def _check_if_road(self, labels, objects):
        """Check if the image contains a road."""
        road_keywords = ['road', 'street', 'highway', 'lane', 'asphalt', 'pavement']

        # Check labels for road-related terms
        for label in labels:
            if any(keyword in label.get("description", "").lower() for keyword in road_keywords):
                return True

        # Check objects for road-related objects
        for obj in objects:
            if any(keyword in obj.get("name", "").lower() for keyword in road_keywords):
                return True

        return False

    def _detect_potholes(self, labels, objects):
        """
        Detect potholes in the image using Cloud Vision annotations.

        Args:
            labels: Label annotations from Cloud Vision
            objects: Object localization annotations from Cloud Vision

        Returns:
            dict: Pothole detection results
        """
        # Look for pothole keywords in labels
        pothole_scores = []
        for label in labels:
            if any(keyword in label.get("description", "").lower() for keyword in self.pothole_keywords):
                pothole_scores.append(label.get("score", 0))

        # Calculate severity based on scores
        if len(pothole_scores) > 0:
            max_score = max(pothole_scores)

            # Determine severity (0-3)
            severity = 0
            if max_score < 0.6:
                severity = 1  # Low confidence
            elif max_score < 0.8:
                severity = 2  # Medium confidence
            else:
                severity = 3  # High confidence

            count = len(pothole_scores)

            return {
                "count": count,
                "severity": severity,
                "confidence": max_score
            }
        else:
            return {
                "count": 0,
                "severity": 0,
                "confidence": 0
            }

    def _detect_garbage(self, labels, objects):
        """
        Detect garbage in the image using Cloud Vision annotations.

        Args:
            labels: Label annotations from Cloud Vision
            objects: Object localization annotations from Cloud Vision

        Returns:
            dict: Garbage detection results
        """
        # Look for garbage keywords in labels
        garbage_scores = []
        for label in labels:
            if any(keyword in label.get("description", "").lower() for keyword in self.garbage_keywords):
                garbage_scores.append(label.get("score", 0))

        # Look for garbage objects
        garbage_objects = []
        for obj in objects:
            if any(keyword in obj.get("name", "").lower() for keyword in self.garbage_keywords):
                garbage_objects.append({
                    "name": obj.get("name", ""),
                    "score": obj.get("score", 0)
                })

        # Calculate severity based on scores and count
        if len(garbage_scores) > 0 or len(garbage_objects) > 0:
            # Combine scores from labels and objects
            all_scores = garbage_scores + [obj["score"] for obj in garbage_objects]
            max_score = max(all_scores) if all_scores else 0

            # Determine count and severity (0-3)
            count = len(garbage_objects) if garbage_objects else len(garbage_scores)

            severity = 0
            if count == 0:
                severity = 0
            elif count < 3:
                severity = 1  # Minor litter
            elif count < 6:
                severity = 2  # Moderate litter
            else:
                severity = 3  # Severe litter

            return {
                "count": count,
                "severity": severity,
                "confidence": max_score
            }
        else:
            return {
                "count": 0,
                "severity": 0,
                "confidence": 0
            }

    def _calculate_overall_condition(self, results):
        """
        Calculate overall street condition score.

        Args:
            results (dict): Analysis results with individual conditions

        Returns:
            None: Updates results dict in-place
        """
        # If there's no road in the image, can't properly assess
        if not results["has_road"]:
            results["overall_condition"] = {
                "score": None,
                "rating": "Not applicable - No road detected"
            }
            return

        # Start with a perfect score and subtract based on issues
        score = 10.0

        # Subtract for potholes (0-5 points)
        pothole_severity = results["potholes"]["severity"]
        if pothole_severity == 1:
            score -= 1
        elif pothole_severity == 2:
            score -= 3
        elif pothole_severity == 3:
            score -= 5

        # Subtract for garbage (0-3 points)
        garbage_severity = results["garbage"]["severity"]
        if garbage_severity == 1:
            score -= 0.5
        elif garbage_severity == 2:
            score -= 1.5
        elif garbage_severity == 3:
            score -= 3

        # Ensure score is in range 0-10
        score = max(0, min(10, score))

        # Assign a rating
        if score >= 9:
            rating = "Excellent"
        elif score >= 7:
            rating = "Good"
        elif score >= 5:
            rating = "Fair"
        elif score >= 3:
            rating = "Poor"
        else:
            rating = "Very Poor"

        results["overall_condition"] = {
            "score": score,
            "rating": rating
        }

    def analyze_directory(self, images_dir, output_csv=None):
        """
        Analyze all images in a directory and output results to CSV.

        Args:
            images_dir (str): Directory containing images
            output_csv (str): Path to save CSV results

        Returns:
            pd.DataFrame: Analysis results for all images
        """
        if not self.api_key:
            print("Error: API key not provided")
            return None

        if not os.path.exists(images_dir):
            print(f"Error: Directory not found: {images_dir}")
            return None

        # Create output directory if it doesn't exist
        if output_csv:
            output_dir = os.path.dirname(output_csv)
            if output_dir and not os.path.exists(output_dir):
                os.makedirs(output_dir)

        # Find all image files
        image_extensions = ['.jpg', '.jpeg', '.png']
        image_files = []

        for root, _, files in os.walk(images_dir):
            for file in files:
                if any(file.lower().endswith(ext) for ext in image_extensions):
                    image_files.append(os.path.join(root, file))

        print(f"Found {len(image_files)} images to analyze")

        all_results = []

        # Process each image
        for image_file in tqdm(image_files, desc="Analyzing images with Cloud Vision"):
            # Analyze image
            results = self.analyze_image(image_file)

            if "error" not in results:
                # Create summary for DataFrame with only pothole, garbage and overall ratings
                summary = {
                    "image_path": image_file,
                    "has_road": results.get("has_road", False),
                    "pothole_count": results["potholes"].get("count", 0),
                    "pothole_severity": results["potholes"].get("severity", 0),
                    "pothole_confidence": results["potholes"].get("confidence", 0),
                    "garbage_count": results["garbage"].get("count", 0),
                    "garbage_severity": results["garbage"].get("severity", 0),
                    "garbage_confidence": results["garbage"].get("confidence", 0),
                    "overall_score": results.get("overall_condition", {}).get("score"),
                    "overall_rating": results.get("overall_condition", {}).get("rating", "Unknown")
                }

                all_results.append(summary)
            else:
                print(f"Error analyzing {image_file}: {results['error']}")

        # Convert results to DataFrame
        results_df = pd.DataFrame(all_results)

        # Save results to CSV if path is provided
        if output_csv:
            results_df.to_csv(output_csv, index=False)
            print(f"Results saved to {output_csv}")

        return results_df

    def generate_summary_charts(self, results_df, output_dir):
        """
        Generate summary charts based on the analysis results.

        Args:
            results_df (pd.DataFrame): Analysis results
            output_dir (str): Directory to save charts

        Returns:
            None
        """
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        if results_df is None or results_df.empty:
            print("No data to generate charts")
            return

        # Filter only images with roads
        road_df = results_df[results_df['has_road'] == True].copy()

        if road_df.empty:
            print("No road images found in the analysis")
            return

        # 1. Overall condition distribution
        plt.figure(figsize=(10, 6))
        condition_counts = road_df['overall_rating'].value_counts().sort_index()
        condition_counts.plot(kind='bar', color='skyblue')
        plt.title('Street Condition Distribution')
        plt.xlabel('Condition Rating')
        plt.ylabel('Number of Images')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'condition_distribution.png'))
        plt.close()

        # 2. Pothole severity distribution
        plt.figure(figsize=(10, 6))
        pothole_severity = road_df['pothole_severity'].value_counts().sort_index()
        pothole_severity.plot(kind='bar', color='tomato')
        plt.title('Pothole Severity Distribution')
        plt.xlabel('Severity Level (0-3)')
        plt.ylabel('Number of Images')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'pothole_severity.png'))
        plt.close()

        # 3. Garbage severity distribution
        plt.figure(figsize=(10, 6))
        garbage_severity = road_df['garbage_severity'].value_counts().sort_index()
        garbage_severity.plot(kind='bar', color='olivedrab')
        plt.title('Garbage Severity Distribution')
        plt.xlabel('Severity Level (0-3)')
        plt.ylabel('Number of Images')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'garbage_severity.png'))
        plt.close()

        print(f"Charts saved to {output_dir}")


# Main function to process all images in a directory
def analyze_street_images(api_key, images_dir, output_csv=None, generate_charts=False, charts_dir=None):
    """
    Process all street view images in a directory using Google Cloud Vision API
    and output pothole, garbage and overall ratings to CSV.

    Args:
        api_key (str): Google Cloud API key
        images_dir (str): Directory containing street view images
        output_csv (str): Path to save CSV results
        generate_charts (bool): Whether to generate summary charts
        charts_dir (str): Directory to save charts if generated

    Returns:
        pd.DataFrame: Analysis results
    """
    # Initialize the analyzer
    analyzer = CloudVisionTokenAnalyzer(api_key=api_key)

    # Analyze all images
    results_df = analyzer.analyze_directory(images_dir, output_csv)

    # Generate charts if requested
    if generate_charts and charts_dir and results_df is not None and not results_df.empty:
        analyzer.generate_summary_charts(results_df, charts_dir)

    # Print summary statistics
    if results_df is not None and not results_df.empty:
        print("\nSummary Statistics:")
        print(f"Total images analyzed: {len(results_df)}")
        print(f"Images with road visible: {results_df['has_road'].sum()} ({results_df['has_road'].mean()*100:.1f}%)")

        # Filter for images with roads
        road_df = results_df[results_df['has_road'] == True]

        if not road_df.empty:
            print(f"Images with potholes: {(road_df['pothole_count'] > 0).sum()} ({(road_df['pothole_count'] > 0).mean()*100:.1f}%)")
            print(f"Images with garbage: {(road_df['garbage_count'] > 0).sum()} ({(road_df['garbage_count'] > 0).mean()*100:.1f}%)")
            print(f"Average condition score: {road_df['overall_score'].mean():.2f}/10")

            # Print condition distribution
            print("\nStreet Condition Distribution:")
            condition_counts = road_df['overall_rating'].value_counts()
            for rating, count in condition_counts.items():
                print(f"{rating}: {count} ({count/len(road_df)*100:.1f}%)")

    return results_df


# Example usage
if __name__ == "__main__":
    # Insert your Google Cloud API key here
    API_KEY = "YOUR_API_KEY"

    # Directory containing Street View images
    IMAGES_DIR = "street_view_images"

    # Output CSV file path
    OUTPUT_CSV = "street_condition_results.csv"

    # Generate charts
    GENERATE_CHARTS = True
    CHARTS_DIR = "condition_charts"

    # Run the analysis
    results = analyze_street_images(API_KEY, IMAGES_DIR, OUTPUT_CSV, GENERATE_CHARTS, CHARTS_DIR)

Initialized Cloud Vision Token Analyzer
Found 6010 images to analyze


Analyzing images with Cloud Vision:  18%|█▊        | 1068/6010 [11:26<59:21,  1.39it/s]