In [1]:
import sqlite3
from datetime import datetime
import os 
import csv 
debug=1

In [2]:
# Register the adapter for datetime
sqlite3.register_adapter(datetime, lambda val: val.isoformat())
sqlite3.register_converter("DATETIME", lambda val: datetime.fromisoformat(val.decode("utf-8")))

In [3]:
class Database:
    def __init__(self,databse_name):
        # sqlite3.PARSE_DECLTYPES enables automatic type conversio
        self.conn = sqlite3.connect(databse_name,detect_types=sqlite3.PARSE_DECLTYPES)
        self.cursor = self.conn.cursor()

        #  This function should be only called ones
        # create the database and tables if they do not exist
        # id , ip_address, assigned_place, main_node
        self.cursor.execute('''
                        CREATE TABLE IF NOT EXISTS device_info (
                            id INTEGER PRIMARY KEY AUTOINCREMENT,
                            ip_address TEXT NOT NULL,
                            assigned_place TEXT NOT NULL,
                            main_node TEXT NOT NULL,
                            created_date DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
                        )
                        ''')
        # Create the timeseries_data table 
        self.cursor.execute('''
                    CREATE TABLE IF NOT EXISTS timeseries_data (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        device_id TEXT NOT NULL,
                        timestamp DATETIME NOT NULL,
                        voltage REAL NOT NULL,
                        FOREIGN KEY (device_id) REFERENCES device_info (id)
                    )
                    ''')
    # Function to insert a new device
    def insert_device(self,ip_address, assigned_place, main_node):
        self.cursor.execute('''
            INSERT INTO device_info ( ip_address, assigned_place, main_node)
            VALUES (?, ?, ?)
        ''', (ip_address,assigned_place, main_node))
        self.conn.commit()
    def upate_device_list(self,csv_file_name):
        with open(csv_file_name, 'r') as file:
            reader = csv.reader(file)
            for row in reader:
                print(row)
                if(self.device_exists(row[0])):
                    if(debug):
                        print(f'ip: {row[0]} is already exists')
                else:
                    self.insert_device(row[0], row[1], row[4])
                    if(debug):
                        print(f'ip: {row[0]} , place: {row[1]} , main_node {row[4]}')
                    print(f'Device Name: {row[0]}')
    def device_exists(self,ip_address):
        self.cursor.execute('''
        SELECT COUNT(*) FROM device_info WHERE ip_address = ?
        ''', (ip_address,))
        count = self.cursor.fetchone()[0]
        return count > 0
    # Function to insert timeseries data
    def insert_timeseries_data(self,device_id, voltage):
        # iam planning to use the ip as the device id .
        timestamp = datetime.now()
        self.cursor.execute('''
        INSERT INTO timeseries_data (device_id, timestamp, voltage)
        VALUES (?, ?, ?)
        ''', (device_id, timestamp, voltage))
        self.conn.commit()
    # device id ? 1 try using the ip as the device id?? 
    def get_timeseries_data(self,device_id):
        self.cursor.execute('''
        SELECT * FROM timeseries_data WHERE device_id = ?
        ''', (device_id,))
        rows = self.cursor.fetchall()
        for row in rows:
            print(row[3])
    '''also implement a function to retrive data acording to date , month , week etc .. args{device_id , something to specify the date range}'''
    def get_latest_data(self,device_id):
        self.cursor.execute('''
        SELECT * FROM timeseries_data WHERE device_id = ? ORDER BY timestamp DESC LIMIT 1
        ''', (device_id,))
        row = self.cursor.fetchone()
        return row
        # if row:
        #     print(row[3])
        # else:
        #     print("No data found for the given device ID.")

In [4]:
csvFile = 'devices.csv'
db = Database('battery_radings.db')
db.upate_device_list(csvFile)

['IP', 'Assigned_Place', 'Status', 'Date of Creation', 'Main_Node', 'Nearby_Nodes']
ip: IP is already exists
['192.168.1.2', 'Parassini_Kadavu', 'Active', '2024-10-01', 'Kannur', 'Dharmassala']
ip: 192.168.1.2 is already exists
['192.168.1.3', 'Kannur', 'Inactive', '2024-10-02', 'Kannur', 'Kannur']
ip: 192.168.1.3 is already exists
['192.168.1.4', 'Chittariparamba', 'Active', '2024-10-03', 'Kannur', 'Kuthuparamba']
ip: 192.168.1.4 is already exists
['192.168.1.5', 'Kuthuparamba', 'Inactive', '2024-10-04', 'Kannur', 'Kannur']
ip: 192.168.1.5 is already exists
['192.168.1.6', 'Dharmassala', 'Active', '2024-10-05', 'Kannur', 'Kannur']
ip: 192.168.1.6 is already exists
['192.168.1.7', 'Vattoli', 'Active', '2024-10-05', 'Kannur', 'Kuthuparamba']
ip: 192.168.1.7 is already exists
['192.168.1.8', 'Kolmotta', 'Active', '2024-10-05', 'Kannur', 'Dharmassala']
ip: 192.168.1.8 is already exists
['192.168.1.9', 'Podikkundu', 'Active', '2024-10-05', 'Kannur', 'Caltex']
ip: 192.168.1.9 is already exi

# Test Insert and retrieve

In [16]:
# Insert some test data

for i in range(100):
    db.insert_timeseries_data(device_id='192.168.1.1', voltage=i)


In [17]:
# Retrieve some data

db.get_timeseries_data(device_id='192.168.1.1')



3.7
0.0
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
17.0
18.0
19.0
20.0
21.0
22.0
23.0
24.0
25.0
26.0
27.0
28.0
29.0
30.0
31.0
32.0
33.0
34.0
35.0
36.0
37.0
38.0
39.0
40.0
41.0
42.0
43.0
44.0
45.0
46.0
47.0
48.0
49.0
50.0
51.0
52.0
53.0
54.0
55.0
56.0
57.0
58.0
59.0
60.0
61.0
62.0
63.0
64.0
65.0
66.0
67.0
68.0
69.0
70.0
71.0
72.0
73.0
74.0
75.0
76.0
77.0
78.0
79.0
80.0
81.0
82.0
83.0
84.0
85.0
86.0
87.0
88.0
89.0
90.0
91.0
92.0
93.0
94.0
95.0
96.0
97.0
98.0
99.0
0.0
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
17.0
18.0
19.0
20.0
21.0
22.0
23.0
24.0
25.0
26.0
27.0
28.0
29.0
30.0
31.0
32.0
33.0
34.0
35.0
36.0
37.0
38.0
39.0
40.0
41.0
42.0
43.0
44.0
45.0
46.0
47.0
48.0
49.0
50.0
51.0
52.0
53.0
54.0
55.0
56.0
57.0
58.0
59.0
60.0
61.0
62.0
63.0
64.0
65.0
66.0
67.0
68.0
69.0
70.0
71.0
72.0
73.0
74.0
75.0
76.0
77.0
78.0
79.0
80.0
81.0
82.0
83.0
84.0
85.0
86.0
87.0
88.0
89.0
90.0
91.0
92.0
93.0
94.0
95.0
96.0
97.0
98.0
99.0
0.0
1.0
2.0
3.0


In [18]:
# Retrieve latest data 
latest =  db.get_latest_data(device_id='192.168.1.1')

In [19]:
VOLT_INDEX=3
print(latest)
print(latest[VOLT_INDEX])

(501, '192.168.1.1', datetime.datetime(2025, 5, 1, 18, 20, 33, 506896), 99.0)
99.0


# get all ip address 


In [37]:
ips = []
with open("devices.csv", newline="") as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        ip = row["IP"]
        ips.append(row["IP"])
print(ips)

['192.168.1.2', '192.168.1.3', '192.168.1.4', '192.168.1.5', '192.168.1.6', '192.168.1.7', '192.168.1.8', '192.168.1.9', '192.168.1.10', '192.168.1.11', '192.168.1.22']


In [41]:
import random
for ip in ips:
    for i in range(100):
        random_number = random.uniform(8.7, 11)
        round_random_number = round(random_number, 2)
        db.insert_timeseries_data(device_id=ip, voltage=round_random_number)

In [None]:

def get_data_by_date_range(self, device_id, start_date, end_date):
    db.cursor.execute('''
    SELECT * FROM timeseries_data 
    WHERE device_id = ? AND timestamp BETWEEN ? AND ?
    ''', (device_id, start_date, end_date))
    rows = db.cursor.fetchall()
    return rows
get_data_by_date_range(device_id='192.168.1.2')


In [20]:
def get_timeseries_data(device_id):
    db.cursor.execute('''
    SELECT * FROM timeseries_data WHERE device_id = ?
    ''', (device_id,))
    rows = db.cursor.fetchall()
    for row in rows:
        print(row[3])

In [None]:
# get the data from the database 
data = []
ip = '192.168.1.2 '.strip()
get_timeseries_data(ip)

In [None]:
# to get the todays all data 
from datetime import datetime
ip = '192.168.1.1'
def get_todays_data(device_id):
    data = {}
    db.cursor.execute('''
    SELECT * FROM timeseries_data WHERE device_id = ? AND timestamp >= date('now' + 
    '1 days')
    ''', (device_id,))
    rows = db.cursor.fetchall()
    for row in rows:
        # print(row[2])
        # row[2].split(' ')[1]
        data = {
            'timestamp': datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f'),
            'voltage': row[3]
        }
        
        # print(data)
        
        # rw = datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f')
        # print(rw.date())
        print(data['timestamp'].date())
        
get_todays_data(ip)



2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01
2025-05-01

In [None]:
from datetime import datetime

def get_todays_data(device_id,date):
    
    data = {}
    db.cursor.execute('''
    SELECT * FROM timeseries_data WHERE device_id = ? AND timestamp >= date(?)
    ''', (device_id,date))
    rows = db.cursor.fetchall()
    for row in rows:
        # print(row[2])
        # row[2].split(' ')[1]
        data = {
            'timestamp': datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f'),
            'voltage': row[3]
        }
        
        # print(data)
        
        # rw = datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f')
        # print(rw.date())
        print(data['voltage'])
        
get_todays_data(ip,date='2025-02-25')


In [None]:
def get_todays_data(device_id,date):
    
    data = {}
    db.cursor.execute('''
    SELECT * FROM timeseries_data WHERE device_id = ? AND timestamp >= date(?)
    ''', (device_id,date))
    rows = db.cursor.fetchall()
    for row in rows:
        # print(row[2])
        # row[2].split(' ')[1]
        data = {
            'timestamp': datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f'),
            'voltage': row[3]
        }
        
        # print(data)
        
        # rw = datetime.strptime(str(row[2]), '%Y-%m-%d %H:%M:%S.%f')
        # print(rw.date())
        print(data['voltage'])
        
get_todays_data(ip,date='2025-02-25')


In [None]:
print(datetime.now().date())