In [7]:
import pandas as pd
import requests
from pymongo import MongoClient
import time
from datetime import datetime
import schedule

# MongoDB setup
client = MongoClient("mongodb://localhost:27017/")
db = client["water_management"]
collection = db["sensor_data"]

def fetch_and_store_data():
    # Read CSV file
    df = pd.read_csv("/home/ubuntu/Data_Science/Big_Data/Water-Resource-Management-Platform/data/index_of_sensors.csv")

    # Loop through each row in the CSV
    for index, row in df.iterrows():
        endpoint_url = row[-2]  # Assuming the endpoint URL is in the second last column
        response = requests.get(endpoint_url)
        
        if response.status_code == 200:
            json_data = response.json()

            # Extract and transform data
            for feature in json_data["features"]:
                geometry = feature["geometry"]
                properties = feature["properties"]

                # Retrieve additional data from the CSV
                latitude = row[0]
                longitude = row[1]
                site = row[2]
                water_body = row[3]
                uom = row[4]

                # Handle timestamp data
                timestamps_epoch = properties["data"]["timestamp"]
                values = properties["data"]["value"]

                # Ensure that timestamps and values are lists and have the same length
                if isinstance(timestamps_epoch, list) and isinstance(values, list) and len(timestamps_epoch) == len(values):
                    for timestamp_epoch, value in zip(timestamps_epoch, values):
                        process_and_store_data(timestamp_epoch, value, latitude, longitude, site, water_body, uom)
                else:
                    print(f"Data format issue in {endpoint_url}: timestamps and values are not lists or their lengths do not match.")
        else:
            print(f"Failed to fetch data from {endpoint_url}, status code: {response.status_code}")

def process_and_store_data(timestamp_epoch, value, latitude, longitude, site, water_body, uom):
    try:
        # Convert epoch to human-readable timestamp
        timestamp_human = datetime.utcfromtimestamp(timestamp_epoch).strftime('%Y-%m-%d %H:%M:%S')

        # Prepare document
        document = {
            "timestamp": timestamp_human,
            "value": value,
            "latitude": latitude,
            "longitude": longitude,
            "site": site,
            "water_body": water_body,
            "unit_of_measure": uom
        }

        # Insert document into MongoDB
        collection.update_one(
            {"timestamp": timestamp_human, "site": site, "value": value},
            {"$set": document},
            upsert=True
        )
    except Exception as e:
        print(f"Error processing data: {e}")

# Schedule the job to run every 45 minutes
schedule.every(45).minutes.do(fetch_and_store_data)

# Run the job immediately at startup
fetch_and_store_data()

while True:
    schedule.run_pending()
    time.sleep(1)


  endpoint_url = row[-2]  # Assuming the endpoint URL is in the second last column
  latitude = row[0]
  longitude = row[1]
  site = row[2]
  water_body = row[3]
  uom = row[4]
