The following code was executed inside our lambda function.
It fetches weather and traffic data from the respective API's.
The traffic and weather data are stored as a JSON in our S3 bucket 'lakebucketv3' and simultaneously loaded into a 'traffic_data' and 'weather_data' table respectively in our PostgresQL database in RDS.

The required Lambda layer was created in a VM in Cloud9, the following Python packages were installed: Requests, Boto3, Psycopg2
Environment Variables are used for authentication.
This lambda function was executed every 20 minutes, beginning Saturday, April 13th and ending Friday, May 10th.


In [None]:
import json
import os
import requests
import boto3
import time
import psycopg2
import re

def lambda_handler(event, context):
    # Generate a single timestamp to use for both traffic and weather data
    timestamp = str(int(time.time()))

    # Weather fetch and S3 upload
    api_key = os.environ['OPENWEATHER_API']
    city = "Zurich"
    units = "metric"
    url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units={units}"
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        weather_data = {
            "timestamp": timestamp,
            "temperature": data['main']['temp'],
            "feels_like": data['main']['feels_like'],
            "description": data['weather'][0]['description'],
            "humidity": data['main']['humidity'],
            "wind_speed": data['wind']['speed']
        }
        upload_to_s3(weather_data, "weather")

    else:
        print("Error fetching weather data:", response.status_code)

    # Traffic fetch and S3 upload
    here_api_key = os.environ['HERE_API']
    base_url = "https://data.traffic.hereapi.com/v7/flow"
    coordinates = "47.3769,8.5417"
    radius = 4000  # Radius in Meters around our coordinates
    params = {
        "apiKey": here_api_key,
        "locationReferencing": "shape",
        "in": f"circle:{coordinates};r={radius}"
    }

    response = requests.get(base_url, params=params)

    if response.status_code == 200:
        traffic_data = response.json()  # Get traffic data
        traffic_data["timestamp"] = timestamp
        upload_to_s3(traffic_data, "traffic")

    else:
        print("Error fetching traffic data:", response.status_code)

    # Process uploaded files and store in database
    s3_client = boto3.client('s3')
    process_files_and_store(timestamp, s3_client)

def upload_to_s3(data, prefix):
    json_data = json.dumps(data)
    try:
        s3 = boto3.client('s3')
        bucket_name = "lakebucketv3"
        file_name = f"{prefix}_data_time_{data['timestamp']}.json"
        s3.put_object(
            Body=json_data,
            Bucket=bucket_name,
            Key=file_name
        )
        print(f"{prefix.capitalize()} data uploaded to S3 as {file_name}")

    except boto3.exceptions.ClientError as e:
        print(f"Error uploading {prefix} data to S3: {e}")

def process_files_and_store(timestamp, s3_client):
    bucket_name = 'lakebucketv3'
    paginator = s3_client.get_paginator('list_objects_v2')

    # Process Weather data
    response_iterator_weather = paginator.paginate(Bucket=bucket_name, Prefix='weather')
    process_s3_files(response_iterator_weather, 'weather', timestamp, s3_client, bucket_name)

    # Process Traffic data
    response_iterator_traffic = paginator.paginate(Bucket=bucket_name, Prefix='traffic')
    process_s3_files(response_iterator_traffic, 'traffic', timestamp, s3_client, bucket_name)

def process_s3_files(response_iterator, prefix, timestamp, s3_client, bucket_name):
    for response in response_iterator:
        for obj in response.get('Contents', []):
            key = obj['Key']
            if timestamp in key:
                local_file_path = f"/tmp/{prefix}.json"
                s3_client.download_file(bucket_name, key, local_file_path)
                print(f"File downloaded successfully to: {local_file_path}")

                # Read and parse the downloaded JSON file
                with open(local_file_path, 'r') as file:
                    data = json.load(file)
                    print(f"Content of the latest {prefix.capitalize()} JSON file:")
                    print(json.dumps(data, indent=2))

                    # Connect to the database
                    conn = None
                    try:
                        conn = psycopg2.connect(
                            database=os.environ['DB_NAME'],
                            user=os.environ['USERNAME'],
                            password=os.environ['PASSWORD'],
                            host=os.environ['ENDPOINT'],
                            port='5432'
                        )
                        print("Database connection established.")
                        create_table_if_not_exists(conn, prefix)

                        # Insert data into the database
                        insert_data_into_db(conn, data, timestamp, prefix)

                    except psycopg2.Error as e:
                        print(f"Database error: {e}")
                    finally:
                        if conn:
                            conn.close()
                            print("Database connection closed.")

def create_table_if_not_exists(conn, prefix):
    cur = conn.cursor()
    try:
        if prefix == 'weather':
            cur.execute("""
                CREATE TABLE IF NOT EXISTS weather_data (
                    timestamp BIGINT PRIMARY KEY,
                    temperature NUMERIC,
                    feels_like NUMERIC,
                    description TEXT,
                    humidity INTEGER,
                    wind_speed NUMERIC
                )
            """)
            print("Weather table created successfully if not exists.")
        elif prefix == 'traffic':
            cur.execute("""
                CREATE TABLE IF NOT EXISTS traffic_data (
                    id SERIAL PRIMARY KEY,
                    timestamp BIGINT,
                    description TEXT,
                    length FLOAT,
                    speed FLOAT,
                    jam_factor FLOAT,
                    confidence FLOAT,
                    traversability TEXT
                )
            """)
            print("Traffic table created successfully if not exists.")
        else:
            print("Invalid prefix provided.")
        conn.commit()
    except psycopg2.Error as e:
        print(f"Error creating table: {e}")
    finally:
        cur.close()

def insert_data_into_db(conn, data, timestamp, prefix):
    cur = conn.cursor()
    try:
        if prefix == 'weather':
            cur.execute("""
                INSERT INTO weather_data (timestamp, temperature, feels_like, description, humidity, wind_speed)
                VALUES (%s, %s, %s, %s, %s, %s)
            """, (
                timestamp,
                data['temperature'],
                data['feels_like'],
                data['description'],
                data['humidity'],
                data['wind_speed']
            ))
            print("Record inserted successfully into Weather table.")
        elif prefix == 'traffic':
            for entry in data['results']:
                current_flow = entry.get('currentFlow', {})
                location = entry.get('location', {})
                description = location.get('description', 'no_description')  # Use a placeholder if description is missing
                length = location.get('length')
                speed = current_flow.get('speed', 0)  # Default to 0 if speed is not available
                jam_factor = current_flow.get('jamFactor', 0)  # Default to 0 if jamFactor is not available
                confidence = current_flow.get('confidence', 0)  # Default to 0 if confidence is not available
                traversability = current_flow.get('traversability', 'open')

                # Insert data into the database
                cur.execute("""
                    INSERT INTO traffic_data (timestamp, description, length, speed, jam_factor, confidence, traversability)
                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                """, (timestamp, description, length, speed, jam_factor, confidence, traversability))
                print("Record inserted successfully into Traffic table.")
        else:
            print("Invalid prefix provided.")
        conn.commit()
    except psycopg2.Error as e:
        print(f"Error inserting record into database: {e}")
    finally:
        cur.close()
