<a href="https://colab.research.google.com/github/Apekshaa2908/From-Clicks-to-Deliveries-Maximizing-E-commerce-Performance-with-Real-Time-Data-Integration/blob/main/Trucks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install boto3



In [8]:
import boto3

# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb',region_name='ap-southeast-2',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key)

# Create the table with Truck_ID as partition key and Effective_Date as sort key
table_name = 'TrucksData'
table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=[
        {'AttributeName': 'Truck_ID', 'KeyType': 'HASH'},  # Partition key
        {'AttributeName': 'Effective_Date', 'KeyType': 'RANGE'}  # Sort key
    ],
    AttributeDefinitions=[
        {'AttributeName': 'Truck_ID', 'AttributeType': 'S'},
        {'AttributeName': 'Effective_Date', 'AttributeType': 'S'}
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)

# Wait until the table exists before proceeding
table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
print(f"Table {table_name} created successfully.")


Table TrucksData created successfully.


In [None]:
# Lambda Code
import json
import boto3
from datetime import datetime
from decimal import Decimal

def lambda_handler(event, context):
    try:
        if 'body' in event:
            # Parse the body as JSON
            body = json.loads(event['body'], parse_float=Decimal)

            # Initialize DynamoDB
            dynamodb = boto3.resource('dynamodb')
            table = dynamodb.Table('TrucksData')

            # Track Expiration Dates for old records
            Expiration_date = dict()
            for truck in body.get("trucks", []):
                Expiration_date[truck["Truck_ID"]] = truck["Effective_Date"]

            truck_ids = ["TRK001", "TRK002", "TRK003"]

            # Query and update existing records (SCD Type 2)
            for truck_id in truck_ids:
                response = table.query(
                    KeyConditionExpression="Truck_ID = :truck_id",
                    FilterExpression="is_active = :active",
                    ExpressionAttributeValues={':truck_id': truck_id, ':active': True}
                )

                if response['Items']:
                    current_record = response['Items'][0]
                    table.update_item(
                        Key={'Truck_ID': truck_id, 'Effective_Date': current_record["Effective_Date"]},
                        UpdateExpression='SET Expiration_Date = :d, is_active = :a',
                        ExpressionAttributeValues={':d': Expiration_date[truck_id], ':a': False}
                    )

            # Insert new records
            for data in body.get("trucks", []):
                table.put_item(Item=data)

            return {
                'statusCode': 200,
                'body': json.dumps({"message": "Data successfully updated"})
            }
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({"message": "No body found in the request"})
            }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }


In [14]:
import json
import random
import time
import requests
from datetime import datetime

# API Gateway URL (replace with your actual API Gateway endpoint)
API_URL = 'API Gateway URL'

# Sample truck IDs
truck_ids = ["TRK001", "TRK002", "TRK003"]

# Function to generate random GPS location data
def generate_gps_location():
    return {
        "latitude": round(random.uniform(-90.0, 90.0), 6),
        "longitude": round(random.uniform(-180.0, 180.0), 6),
        "altitude": round(random.uniform(0, 1000), 2),
        "speed": random.randint(0, 120)  # Vehicle speed in km/h
    }

# Function to generate random engine diagnostics data
def generate_engine_diagnostics():
    return {
        "engine_rpm": random.randint(500, 3000),
        "fuel_level": round(random.uniform(0, 100), 2),
        "temperature": random.randint(60, 120),  # Engine temperature in degrees Celsius
        "oil_pressure": round(random.uniform(20, 50), 2),
        "battery_voltage": round(random.uniform(11.5, 14.5), 2)
    }

# Function to generate random vehicle health and maintenance data
def generate_vehicle_health():
    return {
        "brake_status": random.choice(["Good", "Worn", "Replace"]),
        "tire_pressure": {
            "front_left": round(random.uniform(30, 35), 2),
            "front_right": round(random.uniform(30, 35), 2),
            "rear_left": round(random.uniform(30, 35), 2),
            "rear_right": round(random.uniform(30, 35), 2)
        },
        "transmission_status": random.choice(["Operational", "Needs Service", "Faulty"])
    }

# Function to generate random environmental conditions
def generate_environmental_conditions():
    return {
        "temperature": random.randint(-20, 50),  # Temperature in Celsius
        "humidity": random.randint(0, 100),  # Humidity percentage
        "atmospheric_pressure": random.randint(900, 1100)  # Atmospheric pressure in hPa
    }

# Function to generate telemetry data for a truck
def generate_truck_data(truck_id):
    return {
        "Truck_ID": truck_id,
        "gps_location": generate_gps_location(),
        "vehicle_speed": random.randint(0, 120),  # Vehicle speed in km/h
        "engine_diagnostics": generate_engine_diagnostics(),
        "odometer_reading": round(random.uniform(50000, 150000), 2),  # Odometer reading in km
        "fuel_consumption": round(random.uniform(5, 20), 2),  # Fuel consumption in liters/100km
        "vehicle_health_and_maintenance": generate_vehicle_health(),
        "environmental_conditions": generate_environmental_conditions(),
        "Effective_Date": datetime.now().isoformat(),
        "Expiration_Date": None,
        "is_active": True
    }

# Function to send data to the API endpoint
def send_truck_data(data):
    headers = {'Content-Type': 'application/json'}
    response = requests.post(API_URL, headers=headers, data=json.dumps(data))
    return response.status_code, response.text

# Generate and send truck telemetry data every minute
if __name__ == "__main__":
    try:
        while True:
            telemetry_data = {"trucks": [generate_truck_data(truck_id) for truck_id in truck_ids]}
            status_code, response_text = send_truck_data(telemetry_data)
            print(f"Status: {status_code}, Response: {response_text}")
            time.sleep(60)  # Sleep for 1 minute before sending the next batch
    except KeyboardInterrupt:
        print("Data generation stopped.")


Status: 200, Response: {"message": "Data successfully updated"}
Data generation stopped.
