In [None]:
from bs4 import BeautifulSoup
import asyncio
import aiohttp
import os
import json
from asyncio import Semaphore
import time
import cv2
from ultralytics import YOLO
from jupyter_dash import JupyterDash  # Instead of Dash, use JupyterDash
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import shutil
from tqdm import tqdm
import plotly.express as px

url = "https://onemotoring.lta.gov.sg/content/onemotoring/home/driving/traffic_information/traffic-cameras.html"

In [2]:
class PeriodicScraper:
    def __init__(self, num_parallel_requests, data_path):
        self.num_parallel_requests = num_parallel_requests
        self.semaphore = Semaphore(num_parallel_requests)
        self.main_page_mapping = {}
        self.data_path = data_path

    def _parse_main_page(self, soup):
        # Parse the main page and extract camera names and URLs
        scroll_div = soup.find("div", {"id": "scroll"})
        buttons = scroll_div.find_all("button")
        if not buttons:
            raise Exception("No buttons found in the main page")
        for button in buttons:
            camera_name = button["id"]
            camera_url = url.split(".html")[0] + "/" + camera_name + ".html#trafficCameras"
            self.main_page_mapping[camera_name] = camera_url

    async def _fetch(self, session, url):
        async with self.semaphore:
            try:
                async with session.get(url) as response:
                    if response.status == 404:
                        print(f"Error 404: {url} not found.")
                        return None
                    response.raise_for_status()
                    return await response.text()
            except aiohttp.ClientError as e:
                print(f"Request failed for {url}: {e}")
                return None

    async def _main_page_scraper(self):
        async with aiohttp.ClientSession() as session:
            html_content = await self._fetch(session, url)
            if html_content:
                soup = BeautifulSoup(html_content, "html.parser")
                self._parse_main_page(soup)

    async def _camera_page_scraper(self, session, camera_url, directory):
        previous_metadata = {}
        if os.path.exists(f"{directory}/metadata.json"):
            with open(f"{directory}/metadata.json", "r") as f:
                previous_metadata = json.load(f)
        # print(f"Scraping {camera_url}")
        metadata = {}
        html_content = await self._fetch(session, camera_url)
        if not html_content:
            return None
        soup = BeautifulSoup(html_content, "html.parser")
        image_container = soup.find("div", {"class": "snapshots"})
        if not image_container:
            # print(f"No snapshots found on {camera_url}")
            return None
        header = image_container.find_all("h2", {"id": "expressway-name"})
        if not header:
            # print(f"No expressway name found on {camera_url}")
            return None
        metadata["camera"] = header[0].text
        metadata_entry = {}
        cards = soup.find("div", {"class": "road-snapshots"})
        if not cards:
            # print(f"No road snapshots found on {camera_url}")
            return None
        for card in cards.find_all("div", {"class": "card"}):
            image = card.find("img")
            timestamp = card.find("div", {"class": "timestamp"})
            if timestamp:
                # In the span tag, the text is the timestamp
                timestamp = timestamp.find("span").text
                timestamp = timestamp.split(" ")[3]
            if not image:
                # print(f"No image found in card on {camera_url}")
                continue
            if previous_metadata and image["alt"] in previous_metadata.get("images", {}):
                # Check if there is a match between the timestamps
                if previous_metadata["images"][image["alt"]][1] == timestamp:
                    # Skip downloading the image if it already exists
                    metadata_entry[image["alt"]] = [image["src"], timestamp, False]
                    continue
                else:
                    # Timestamp changes
                    print("For image", image["alt"], "Timestamp changed from", previous_metadata["images"][image["alt"]][1], "to", timestamp)
            metadata_entry[image["alt"]] = [image["src"], timestamp, True]
        metadata["images"] = metadata_entry
        # Write the metadata to a file
        with open(f"{directory}/metadata.json", "w") as f:
            json.dump(metadata, f)
        return metadata

    async def _download_image(self, session, image_url, path, alt, download):
        safe_filename = "_".join(alt.replace("/", "_").split(" "))
        if not download:
            pass
        async with self.semaphore:
            try:
                async with session.get("https:" + image_url) as response:
                    if response.status == 404:
                        # print(f"Image not found: {image_url}")
                        return
                    response.raise_for_status()
                    content = await response.read()
                    with open(f"{path}/{safe_filename}.jpg", "wb") as f:
                        f.write(content)
            except aiohttp.ClientError as e:
                print(f"Failed to download {image_url}: {e}")

    async def download_images(self, session, mapping, path):
        if not mapping or "images" not in mapping:
            # print("No valid images found.")
            return
        tasks = [self._download_image(session, image_url, path, alt, download) for alt, (image_url, timestamp, download) in mapping['images'].items()]
        await asyncio.gather(*tasks)

    async def scrape(self):
        await self._main_page_scraper()
        async with aiohttp.ClientSession() as session:
            tasks = []
            for camera_name, camera_url in self.main_page_mapping.items():
                camera_directory = f"{self.data_path}/{camera_name}"
                os.makedirs(camera_directory, exist_ok=True)
                camera_data = await self._camera_page_scraper(session, camera_url, camera_directory)
                if camera_data:
                    # print("Entered!")
                    task = self.download_images(session, camera_data, camera_directory)
                    tasks.append(task)
                else:
                    print("Skipping camera", camera_name)
            await asyncio.gather(*tasks)   

In [3]:
def process_images_and_get_stats():
    directory_path = 'data'  # Replace with the directory path
    directories = [os.path.join(directory_path, d) for d in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, d))]

    assets_directory = 'assets'  # Directory where Dash serves static files

    # Clear previous processed images from assets folder
    if os.path.exists(assets_directory):
        shutil.rmtree(assets_directory)  # Delete the folder
    os.makedirs(assets_directory, exist_ok=True)  # Recreate assets directory

    camera_data = []
    model = YOLO('yolov10x.pt')  # You can use yolov8s.pt or yolov8m.pt for better accuracy

    # Define the vehicle classes according to COCO dataset used by YOLO
    vehicle_classes = {
        'car': 2,
        'truck': 7,
        'bus': 5,
        'motorbike': 3
    }

    # Loop through each directory and process the images
    for directory in tqdm(directories, desc='Processing images'):
        image_directory = directory
        car_count, truck_count, bus_count, motorbike_count = 0, 0, 0, 0

        # List all image files in the directory
        image_files = [f for f in os.listdir(image_directory) if f.endswith(('jpg', 'jpeg', 'png'))]
        if image_files:
            file_path = f'{directory}/metadata.json'
            # Open and read the JSON file to get camera name
            with open(file_path, 'r') as file:
                data = json.load(file)
            camera_name = data.get('camera', 'Unknown Camera')
            times = {view: details[1] for view, details in data["images"].items()}

            # Process each image
            for image_file in image_files:
                image_path = os.path.join(image_directory, image_file)
                image = cv2.imread(image_path)

                # Run YOLOv8 inference -> without verbosity
                results = model(image)

                # Draw bounding boxes and classify objects
                for box in results[0].boxes:
                    # Extract box details
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    class_id = int(box.cls[0].cpu().numpy())
                    confidence = box.conf[0].cpu().numpy()
                    
                    # Check if the detected object is a vehicle
                    if class_id == vehicle_classes['car']:
                        color = (0, 255, 0)  # Green for cars
                        car_count += 1
                    elif class_id == vehicle_classes['truck']:
                        color = (0, 0, 255)  # Red for trucks
                        truck_count += 1
                    elif class_id == vehicle_classes['bus']:
                        color = (255, 0, 0)  # Blue for buses
                        bus_count += 1
                    elif class_id == vehicle_classes['motorbike']:
                        color = (255, 255, 0)  # Yellow for motorbikes
                        motorbike_count += 1
                    else:
                        continue
                    
                    # Draw the bounding box on the image
                    cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
                    label = f"{list(vehicle_classes.keys())[list(vehicle_classes.values()).index(class_id)]} {confidence:.2f}"
                    cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

                # Save the processed image in the assets folder
                processed_image_path = os.path.join(assets_directory, f"processed_{camera_name}_{image_file}")
                cv2.imwrite(processed_image_path, image)

            # Append the data for this camera
            camera_data.append({
                'camera_name': camera_name,
                'car_count': car_count,
                'truck_count': truck_count,
                'bus_count': bus_count,
                'motorbike_count': motorbike_count,
                'image_files': [f"processed_{camera_name}_{image_file}" for image_file in image_files],  # List of processed images
                'times': times
            })
    
    return camera_data



In [None]:
async def periodic_scrape(scraper):
    while True:
        start_time = time.time()
        await scraper.scrape()
        print(f"Scraping completed in {time.time() - start_time} seconds")
        await asyncio.sleep(60)  # Wait for 1 minute before next scrape

# Set up JupyterDash app
app = JupyterDash(__name__)

# Dash layout
app.layout = html.Div([
    html.H1("Vehicle Counts Dashboard"),
    html.Div(id='vehicle-counts-text'),
    dcc.Interval(
        id='interval-component',
        interval=10*1000,  # Update every ten seconds
        n_intervals=0
    )
])

# Callback to update the text with vehicle counts
@app.callback(
    dash.dependencies.Output('vehicle-counts-text', 'children'),
    [dash.dependencies.Input('interval-component', 'n_intervals')]
)
def update_text(n_intervals):
    camera_data = process_images_and_get_stats()

    # Calculate total counts for each vehicle type
    total_cars = sum(data['car_count'] for data in camera_data)
    total_trucks = sum(data['truck_count'] for data in camera_data)
    total_buses = sum(data['bus_count'] for data in camera_data)
    total_motorbikes = sum(data['motorbike_count'] for data in camera_data)

    # Create a bar chart for total vehicle counts
    vehicle_counts = {
        'Vehicle Type': ['Cars', 'Trucks', 'Buses', 'Motorbikes'],
        'Count': [total_cars, total_trucks, total_buses, total_motorbikes]
    }
    
    # Generate a bar chart using Plotly Express
    fig = px.bar(vehicle_counts, x='Vehicle Type', y='Count', title='Total Vehicle Counts')

    # Prepare the layout for each camera, including images and counts
    text_elements = []
    for data in camera_data:
        image_elements = []
        camera_name = data['camera_name']
        
        for image_file in data['image_files']:
            # Generate a timestamp to append as a query string for the image URL
            cleaned_name = image_file.replace(f"processed_{camera_name}_", "").replace(".jpg", "")
            cleaned_name = cleaned_name.replace("_"," ")
            timestamp = int(time.time())  # Current time in seconds
            image_url = f"/assets/{image_file}?v={timestamp}"  # Append timestamp to force reload

            # Wrap image and its cleaned name together
            image_elements.append(html.Div([
                # Display the image
                html.Img(
                    src=image_url,  # Use the new URL with timestamp
                    style={'width': '300px', 'height': 'auto', 'padding': '10px'}
                ),
                # Display the cleaned image name
                html.P(cleaned_name, style={'text-align': 'center', 'font-size': '12px'})
            ], style={'display': 'inline-block', 'text-align': 'center'}))

        
        text_elements.append(
            html.Div([
                html.H3(f"Camera: {camera_name}"),
                html.P(f"Cars: {data['car_count']}"),
                html.P(f"Trucks: {data['truck_count']}"),
                html.P(f"Buses: {data['bus_count']}"),
                html.P(f"Motorbikes: {data['motorbike_count']}"),
                html.Div(image_elements, style={'display': 'flex', 'flex-wrap': 'wrap'}),  # Display images side by side
                html.Hr()  # Horizontal line separator
            ])
        )

    # Combine the graph and the camera data elements
    return [
        dcc.Graph(figure=fig),  # Insert the graph at the top
        *text_elements  # Followed by the detailed camera information
    ]



# Run the JupyterDash app
if __name__ == "__main__":
    scraper = PeriodicScraper(50, "data")
    loop = asyncio.get_event_loop()
    loop.create_task(periodic_scrape(scraper))  # Start periodic scraping
    app.run(jupyter_mode="tab") # Run the dashboard in Jupyter notebook