In [24]:
import pandas as pd
import numpy as np
import random
import time

# Set seed for reproducibility
random.seed(42)
np.random.seed(42)

# Parameters
total_devices = 200  # Total number of devices
percentage_down = 0.01  # 0.5-1.5% of devices are down
percentage_abnormal = 0.015  # 1-2% of devices are abnormal

num_down_devices = max(1, int(total_devices * percentage_down))
num_abnormal_devices = max(1, int(total_devices * percentage_abnormal))
num_healthy_devices = total_devices - num_down_devices - num_abnormal_devices

num_time_steps_healthy = 56  # Number of time steps for healthy devices (24 hours)
num_time_steps_abnormal = 168  # Number of time steps for abnormal devices (3 days)
num_time_steps_down = 336  # Number of time steps for down devices (7 days)

# Generate time steps
current_time = int(time.time())
time_steps_healthy = [current_time - i * 30 * 60 for i in range(num_time_steps_healthy)]  # 30 min intervals for 24 hours
time_steps_abnormal = [current_time - i * 30 * 60 for i in range(num_time_steps_abnormal)]  # 30 min intervals for 3 days
time_steps_down = [current_time - i * 30 * 60 for i in range(num_time_steps_down)]  # 30 min intervals for 7 days

# Function to generate device data with varying historical scores
def generate_device_data(num_devices, score_range, num_time_steps, abnormal_probability=0.05, down_probability=0.02):
    data = []
    for i in range(num_devices):
        device_id = i + 1
        
        # Handle down devices separately to avoid invalid score ranges
        if score_range == (0, 0):
            past_scores = np.zeros(num_time_steps, dtype=int)
        else:
            past_scores = np.random.randint(*score_range, size=num_time_steps)
        
        # Introduce historical anomalies for non-down devices
        if score_range != (0, 0):
            if np.random.rand() < abnormal_probability:
                anomaly_indices = np.random.choice(num_time_steps, size=int(0.1 * num_time_steps), replace=False)
                past_scores[anomaly_indices] = np.random.randint(1, 94)  # Abnormal score range

            if np.random.rand() < down_probability:
                down_indices = np.random.choice(num_time_steps, size=int(0.05 * num_time_steps), replace=False)
                past_scores[down_indices] = 0  # Down score
        
        current_op_score = past_scores[-1]  # Set current_op_score to the latest time step score
        data.append([device_id, current_op_score] + list(past_scores))
    
    columns = ['device_id', 'current_op_score'] + [f'past_time_{i+1}' for i in range(num_time_steps)]
    df = pd.DataFrame(data, columns=columns)
    return df

# Generate data for each category
healthy_devices_df = generate_device_data(num_healthy_devices, score_range=(95, 100), num_time_steps=num_time_steps_healthy)
abnormal_devices_df = generate_device_data(num_abnormal_devices, score_range=(1, 94), num_time_steps=num_time_steps_abnormal)
down_devices_df = generate_device_data(num_down_devices, score_range=(0, 0), num_time_steps=num_time_steps_down)

# Save to CSV files
healthy_devices_df.to_csv('healthy_devices.csv', index=False)
abnormal_devices_df.to_csv('abnormal_devices.csv', index=False)
down_devices_df.to_csv('down_devices.csv', index=False)

print("Synthetic data generated and saved as CSV files:")
print(f"- healthy_devices.csv: {num_healthy_devices} devices")
print(f"- abnormal_devices.csv: {num_abnormal_devices} devices")
print(f"- down_devices.csv: {num_down_devices} devices")




Synthetic data generated and saved as CSV files:
- healthy_devices.csv: 195 devices
- abnormal_devices.csv: 3 devices
- down_devices.csv: 2 devices


In [26]:
down_devices_df

Unnamed: 0,device_id,current_op_score,past_time_1,past_time_2,past_time_3,past_time_4,past_time_5,past_time_6,past_time_7,past_time_8,...,past_time_327,past_time_328,past_time_329,past_time_330,past_time_331,past_time_332,past_time_333,past_time_334,past_time_335,past_time_336
0,1,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [None]:
import aiohttp
import asyncio
import psycopg2
from psycopg2.extras import execute_values
import os

# Database connection setup
conn = psycopg2.connect(
    dbname=os.getenv('POSTGRES_DB'),
    user=os.getenv('POSTGRES_USER'),
    password=os.getenv('POSTGRES_PASSWORD'),
    host=os.getenv('POSTGRES_HOST'),
    port=os.getenv('POSTGRES_PORT')
)
cursor = conn.cursor()

# Step 1: Function to fetch the list of devices
async def get_device_list(session, condition, headers):
    url = f"https://api.example.com/get_device_list?condition={condition}"
    async with session.get(url, headers=headers) as response:
        response.raise_for_status()  # Ensure we catch HTTP errors
        return await response.json()

# Step 2: Function to fetch details for a specific device
async def get_device_details(session, device_id, headers):
    url = f"https://api.example.com/get_device_details?device_id={device_id}"
    async with session.get(url, headers=headers) as response:
        response.raise_for_status()
        return await response.json()

# Step 3: Insert or update device list in the database
def update_device_list_in_db(device_list):
    insert_query = """
        INSERT INTO device_list (device_id, device_name, condition, last_updated)
        VALUES %s
        ON CONFLICT (device_id) DO UPDATE
        SET device_name = EXCLUDED.device_name,
            condition = EXCLUDED.condition,
            last_updated = EXCLUDED.last_updated;
    """
    data = [(device['id'], device['name'], device['condition']) for device in device_list['devices']]
    execute_values(cursor, insert_query, data)
    conn.commit()

# Step 4: Insert or update device details in the database
def update_device_details_in_db(device_id, details):
    insert_query = """
        INSERT INTO device_details (device_id, detail_key, detail_value, last_updated)
        VALUES %s
        ON CONFLICT (device_id, detail_key) DO UPDATE
        SET detail_value = EXCLUDED.detail_value,
            last_updated = EXCLUDED.last_updated;
    """
    data = [(device_id, key, value) for key, value in details.items()]
    execute_values(cursor, insert_query, data)
    conn.commit()

# Step 5: Task to update device list every 24 hours
async def update_device_list(session, condition, headers):
    global device_list
    while True:
        print("Updating device list...")
        device_list = await get_device_list(session, condition, headers)
        update_device_list_in_db(device_list)
        print(f"Device list updated with {len(device_list['devices'])} devices")
        await asyncio.sleep(24 * 60 * 60)  # Sleep for 24 hours

# Step 6: Task to fetch device details every 20 minutes
async def update_device_details(session, headers):
    global device_list
    while True:
        if device_list:
            print("Fetching device details...")
            tasks = []
            for device in device_list['devices']:
                device_id = device['id']
                task = asyncio.ensure_future(get_device_details(session, device_id, headers))
                tasks.append(task)
            
            device_details = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle the results
            for i, detail in enumerate(device_details):
                if isinstance(detail, Exception):
                    print(f"Error fetching details for device {device_list['devices'][i]['id']}: {detail}")
                else:
                    update_device_details_in_db(device_list['devices'][i]['id'], detail)
                    print(f"Device ID: {device_list['devices'][i]['id']}, Details updated")
        
        await asyncio.sleep(20 * 60)  # Sleep for 20 minutes

# Step 7: Main function to start both tasks
async def main(condition, headers):
    async with aiohttp.ClientSession() as session:
        # Start both tasks
        task1 = asyncio.create_task(update_device_list(session, condition, headers))
        task2 = asyncio.create_task(update_device_details(session, headers))
        
        # Run both tasks concurrently
        await asyncio.gather(task1, task2)

# Entry point to run the asynchronous code
condition = "some_condition"
headers = {
    "Authorization": "Bearer YOUR_ACCESS_TOKEN",
    "Accept": "application/json",
    "Custom-Header": "CustomValue"
}

asyncio.run(main(condition, headers))
