In [12]:
import requests
import json
import pandas as pd
from transformers import pipeline
from PIL import Image, ImageDraw
from io import BytesIO

In [13]:
data = requests.get('https://api.data.gov.sg/v1/transport/traffic-images')
results = json.loads(data.text)

In [14]:
cameras = results['items'][0]['cameras']

# Flatten the nested dictionary structure
camera_data = []
for camera in cameras:
    camera_info = {
        'timestamp': camera['timestamp'],
        'image': camera['image'],
        'latitude': camera['location']['latitude'],
        'longitude': camera['location']['longitude'],
        'camera_id': camera['camera_id'],
        'image_height': camera['image_metadata']['height'],
        'image_width': camera['image_metadata']['width'],
        'image_md5': camera['image_metadata']['md5']
    }
    camera_data.append(camera_info)

# Create a DataFrame from the flattened data
df = pd.DataFrame(camera_data)
print(df)

                    timestamp  \
0   2024-06-22T16:20:07+08:00   
1   2024-06-22T16:20:07+08:00   
2   2024-06-22T16:20:07+08:00   
3   2024-06-22T16:20:07+08:00   
4   2024-06-22T16:20:07+08:00   
..                        ...   
85  2024-06-22T16:20:07+08:00   
86  2024-06-22T16:20:07+08:00   
87  2024-06-22T16:20:07+08:00   
88  2024-06-22T16:20:07+08:00   
89  2024-06-22T15:50:46+08:00   

                                                image  latitude   longitude  \
0   https://images.data.gov.sg/api/traffic-images/...  1.295313  103.871146   
1   https://images.data.gov.sg/api/traffic-images/...  1.319541  103.878563   
2   https://images.data.gov.sg/api/traffic-images/...  1.323957  103.872858   
3   https://images.data.gov.sg/api/traffic-images/...  1.319536  103.875067   
4   https://images.data.gov.sg/api/traffic-images/...  1.363520  103.905394   
..                                                ...       ...         ...   
85  https://images.data.gov.sg/api/traffic-images/

In [15]:
## Hugging face api
images = df['image'].tolist()
response = requests.get(images[1])

image = Image.open(BytesIO(response.content)).convert('RGB')

checkpoint = "google/owlv2-base-patch16-ensemble"
detector = pipeline(model=checkpoint, task="zero-shot-object-detection")

In [16]:
prediction = detector(image, candidate_labels=['cars'])
print('prediction', prediction)

prediction [{'score': 0.3506307005882263, 'label': 'cars', 'box': {'xmin': 58, 'ymin': 164, 'xmax': 89, 'ymax': 202}}, {'score': 0.29211434721946716, 'label': 'cars', 'box': {'xmin': 129, 'ymin': 127, 'xmax': 151, 'ymax': 150}}, {'score': 0.23054367303848267, 'label': 'cars', 'box': {'xmin': 15, 'ymin': 113, 'xmax': 39, 'ymax': 159}}, {'score': 0.22281529009342194, 'label': 'cars', 'box': {'xmin': 194, 'ymin': 50, 'xmax': 225, 'ymax': 66}}, {'score': 0.2053665965795517, 'label': 'cars', 'box': {'xmin': 136, 'ymin': 129, 'xmax': 150, 'ymax': 143}}, {'score': 0.18327496945858002, 'label': 'cars', 'box': {'xmin': 212, 'ymin': 62, 'xmax': 217, 'ymax': 67}}, {'score': 0.18087032437324524, 'label': 'cars', 'box': {'xmin': 213, 'ymin': 75, 'xmax': 218, 'ymax': 80}}, {'score': 0.17261463403701782, 'label': 'cars', 'box': {'xmin': 99, 'ymin': 75, 'xmax': 105, 'ymax': 84}}, {'score': 0.15984855592250824, 'label': 'cars', 'box': {'xmin': 218, 'ymin': 58, 'xmax': 224, 'ymax': 64}}, {'score': 0.158

In [17]:
def count_verified_cars(image):
    verified_cars = 0
    for car in image:
        if car['score'] > 0.12:
            verified_cars += 1
    return verified_cars

verified_cars = count_verified_cars(prediction)
print(verified_cars)

19


In [18]:
draw = ImageDraw.Draw(image)

for p in prediction:
    box = p['box']  
    label = p['label']
    score = p['score']

    xmin, ymin, xmax, ymax = box.values()
    draw.rectangle((xmin, ymin, xmax, ymax), outline='red', width=1)
    draw.text((xmin, ymin), f'{label}: {round(score, 2)}', fill='white')

image_filename = 'annotated_image.jpg'
image.save(image_filename)


In [19]:
# from flask import Flask, request, jsonify
# import requests

# app = Flask(__name__)

# @app.route('/congestion', methods=['POST'])
# def get_congestion_data():
#     data = request.json
#     lon, lat = data['longitude'], data['latitude']
    
#     # Call data.gov.sg API
#     traffic_data = requests.get('https://api.data.gov.sg/v1/transport/traffic-images').json()
#     image_url = get_image_url(traffic_data, lon, lat)  # Implement this function to get relevant image URL

#     # Call AI Model for car count
#     car_count = call_ai_model(image_url)  # Implement this function to use OpenAI/HuggingFace API
    
#     # Determine congestion level
#     congestion_level = categorize_congestion(car_count)  # Implement this function to categorize
    
#     # Call Location API for address
#     location = get_location(lon, lat)  # Implement this function to call reverse geocoding API
    
#     return jsonify({
#         'car_count': car_count,
#         'congestion_level': congestion_level,
#         'location': location
#     })

# def get_image_url(traffic_data, lon, lat):
#     # Logic to find the closest traffic camera image based on lon/lat
#     pass

# def call_ai_model(image_url):
#     # Logic to send image to AI model and get car count
#     pass

# def categorize_congestion(car_count):
#     if car_count < 10:
#         return 1
#     elif car_count < 20:
#         return 2
#     elif car_count < 30:
#         return 3
#     else:
#         return 4

# def get_location(lon, lat):
#     response = requests.get(f'https://api.location.service/reverse-geocode?lon={lon}&lat={lat}')
#     return response.json()['location']

# if __name__ == '__main__':
#     app.run(debug=True)


In [20]:
data = requests.get('https://api.data.gov.sg/v1/transport/traffic-images')
results = json.loads(data.text)

In [21]:
## Hugging face api
images = df['image'].tolist()
response = requests.get(images[1])

image = Image.open(BytesIO(response.content)).convert('RGB')

checkpoint = "google/owlv2-base-patch16-ensemble"
detector = pipeline(model=checkpoint, task="zero-shot-object-detection")

In [22]:
draw = ImageDraw.Draw(image)

for p in prediction:
    box = p['box']  
    label = p['label']
    score = p['score']

    xmin, ymin, xmax, ymax = box.values()
    draw.rectangle((xmin, ymin, xmax, ymax), outline='red', width=1)
    draw.text((xmin, ymin), f'{label}: {round(score, 2)}', fill='white')

image_filename = 'annotated_image.jpg'
image.save(image_filename)
