## Final code for aws-S3, MongoDB, telegram_alerts

In [None]:
from kafka import KafkaConsumer
from datetime import datetime
import pymongo
from pymongo import MongoClient
import json
from haversine import haversine
import requests
import boto3

# Define consumer configuration
consumer_conf = {
    'bootstrap_servers': 'localhost:29092',
    'group_id': 'vehicle-speed',
    'auto_offset_reset': 'earliest'
}

# Create consumer instance
topic = '<put your kafka topic created by kafka connect>' ## Generally format is like :'source.public.table_name'
consumer = KafkaConsumer(topic, **consumer_conf)

# Create dictionary to store previous location and timestamp of each vehicle_id
prev_data = {}

# MongoDB database configuration
db_host = 'localhost'
db_port = 27017
db_name = '<put your mongodb database name here>'
collection_name = '<put your mongodb collection name here>'

# Establish a connection to the MongoDB database
client = MongoClient(db_host, db_port)
mongo_db = client[db_name]
mongo_collection = mongo_db[collection_name]

# Telegram bot configuration
telegram_bot_token = '<put your telegram_bot_token here>'
chat_id = '<put your chat_id here>'

# S3 configuration
s3_bucket_name = '<Enter your s3 bucket name>'
s3_prefix = 'vehicle-data/'

# Create an S3 client
s3_client = boto3.client(
    service_name='s3',
    aws_access_key_id='<put your aws_access_key_id>',
    aws_secret_access_key='<put your aws_secret_access_key>'
)

# List to store records for S3 upload
s3_upload_records = []

# Function to send a Telegram alert
def send_telegram_alert(vehicle_id, speed):
    message = f"Alert: This Vehicle from RTD-Denver: {vehicle_id} exceeded speed limit! Current Speed: {speed} km/h"
    telegram_api_url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
    params = {'chat_id': chat_id, 'text': message}
    response = requests.post(telegram_api_url, params=params)
    if response.status_code != 200:
        print(f"Failed to send Telegram alert: {response.text}")

# Function to upload records to S3
def upload_records_to_s3(records):
    if records:
        timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
        s3_object_key = f"{s3_prefix}vehicle_data_{timestamp}.json"
        s3_client.put_object(Bucket=s3_bucket_name, Key=s3_object_key, Body=json.dumps(records))
        print(f"Uploaded {len(records)} records to S3: s3://{s3_bucket_name}/{s3_object_key}")

# Consume messages from the topic
for msg in consumer:
    # Decode message value as string
    value = msg.value.decode('utf-8')

    # Parse JSON string to extract values
    data = json.loads(value)
    vehicle_id = data["id"]
    timestamp = data["timestamp"]
    latitude = float(data["latitude"])
    longitude = float(data["longitude"])
    bearing = data["bearing"]

    speed = 0.0

    # Check if vehicle_id is in dictionary
    if vehicle_id in prev_data:
        # Get previous location and timestamp from dictionary
        prev_lat, prev_lon, prev_time = prev_data[vehicle_id]

        # Check if current timestamp is greater than the previous timestamp
        # Calculate distance traveled using haversine function
        distance = haversine((prev_lat, prev_lon), (latitude, longitude), unit='km')

        # Calculate time difference in seconds
        time_diff = (timestamp - prev_time) / 1000000

        # Calculate speed in km/h
        if time_diff != 0:
            speed = (distance / time_diff) * 3600

        try:
            # Insert data into MongoDB collection
            mongo_collection.insert_one({
                'vehicle_id': vehicle_id,
                'timestamp': timestamp,
                'location': {
                    'latitude': latitude,
                    'longitude': longitude
                },
                'time_diff': time_diff,
                'distance': distance,
                'speed': speed
            })

            # Check if speed exceeds 60 km/h then send a Telegram alert
            if speed > 60:
                send_telegram_alert(vehicle_id, speed)

            # Append record to the list for S3 upload
            s3_upload_records.append({
                'vehicle_id': vehicle_id,
                'timestamp': timestamp,
                'location': {
                    'latitude': latitude,
                    'longitude': longitude
                },
                'time_diff': time_diff,
                'distance': distance,
                'speed': speed
            })

            # Check if we have collected 100 records for S3 upload
            if len(s3_upload_records) >= 100:
                upload_records_to_s3(s3_upload_records)
                s3_upload_records = []  # Clear the list after upload
        except Exception as e:
            print(f"Error Inserting the data into the database: {e}")

    # Update dictionary with current location and timestamp
    prev_data[vehicle_id] = (latitude, longitude, timestamp)
