In [1]:
import time
from smbus2 import SMBus, i2c_msg
from adafruit_ads1x15.ads1115 import ADS1115
from adafruit_ads1x15.analog_in import AnalogIn
from IPython.display import clear_output

class SMBusI2C:
    def __init__(self, bus_num: int):
        self._bus = SMBus(bus_num)
        self._locked = False

    # --- CircuitPython lock API ---
    def try_lock(self) -> bool:
        if self._locked:
            return False
        self._locked = True
        return True

    def unlock(self) -> None:
        self._locked = False

    # --- CircuitPython I2C API used by Adafruit drivers ---
    def writeto(self, address: int, buffer: bytes, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None:
            end = len(buffer)
        data = bytes(buffer[start:end])
        if len(data) == 0:
            return
        self._bus.i2c_rdwr(i2c_msg.write(address, data))

    def readfrom_into(self, address: int, buffer: bytearray, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None:
            end = len(buffer)
        n = end - start
        msg = i2c_msg.read(address, n)
        self._bus.i2c_rdwr(msg)
        buffer[start:end] = list(msg)

    def writeto_then_readfrom(
        self,
        address: int,
        out_buffer: bytes,
        in_buffer: bytearray,
        *,
        out_start: int = 0,
        out_end: int = None,
        in_start: int = 0,
        in_end: int = None,
        stop: bool = False
    ) -> None:
        if out_end is None:
            out_end = len(out_buffer)
        if in_end is None:
            in_end = len(in_buffer)

        out_data = bytes(out_buffer[out_start:out_end])
        n_in = in_end - in_start

        # Combined transaction: write register pointer, then read bytes
        msgs = [i2c_msg.write(address, out_data), i2c_msg.read(address, n_in)]
        self._bus.i2c_rdwr(*msgs)

        in_buffer[in_start:in_end] = list(msgs[1])

    def deinit(self) -> None:
        self._bus.close()

# ---- TEST ADS1115 on bus 0, addr 0x48 ----
print("1. Initializing I2C Bus 0 (/dev/i2c-0)...")
i2c = SMBusI2C(0)

print("2. Connecting to ADS1115 (0x48)...")
ads = ADS1115(i2c, address=0x48)
ads.gain = 1

ch0 = AnalogIn(ads, 0)

print("Success: Sensor initialized!")
time.sleep(1)

try:
    while True:
        clear_output(wait=True)
        print("Raw:", ch0.value, " Voltage:", f"{ch0.voltage:.4f} V")
        time.sleep(0.5)
except KeyboardInterrupt:
    pass
finally:
    i2c.deinit()


Raw: 946  Voltage: 0.1189 V


In [None]:
import time
import requests
from smbus2 import SMBus, i2c_msg
from adafruit_ads1x15.ads1115 import ADS1115
from adafruit_ads1x15.analog_in import AnalogIn
from IPython.display import clear_output

# --- Your Working I2C Wrapper ---
class SMBusI2C:
    def __init__(self, bus_num: int):
        self._bus = SMBus(bus_num)
        self._locked = False

    def try_lock(self) -> bool:
        if self._locked: return False
        self._locked = True
        return True

    def unlock(self) -> None:
        self._locked = False

    def writeto(self, address: int, buffer: bytes, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None: end = len(buffer)
        data = bytes(buffer[start:end])
        if len(data) == 0: return
        self._bus.i2c_rdwr(i2c_msg.write(address, data))

    def readfrom_into(self, address: int, buffer: bytearray, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None: end = len(buffer)
        n = end - start
        msg = i2c_msg.read(address, n)
        self._bus.i2c_rdwr(msg)
        buffer[start:end] = list(msg)

    def writeto_then_readfrom(self, address: int, out_buffer: bytes, in_buffer: bytearray, *, out_start: int = 0, out_end: int = None, in_start: int = 0, in_end: int = None, stop: bool = False) -> None:
        if out_end is None: out_end = len(out_buffer)
        if in_end is None: in_end = len(in_buffer)
        out_data = bytes(out_buffer[out_start:out_end])
        n_in = in_end - in_start
        msgs = [i2c_msg.write(address, out_data), i2c_msg.read(address, n_in)]
        self._bus.i2c_rdwr(*msgs)
        in_buffer[in_start:in_end] = list(msgs[1])

    def deinit(self) -> None:
        self._bus.close()

# --- Configuration ---
WRITE_API_KEY = ""  # API Key for Thinkspeak
CHANNEL_FIELD = 1                   # We are updating Field 1 from Thinkspeak

# --- Setup Sensor ---
print("Initializing I2C")
i2c = SMBusI2C(0)
ads = ADS1115(i2c, address=0x48)
ads.gain = 1
ch0 = AnalogIn(ads, 0)

print("Sensor Ready. Starting ThingSpeak Loop")
time.sleep(1)


# --- Threshold Configuration ---
THRESHOLD_WARNING = 0.5  # Volts (Raw ~4000)
THRESHOLD_DANGER  = 1.5  # Volts (Raw ~12000)

# --- ThingSpeak Config ---
WRITE_API_KEY = ""

# --- Setup Sensor ---
print("Initializing I2C Bus 0...")
# Try Bus 1 if Bus 0 fails
try:
    i2c = SMBusI2C(0)
except:
    i2c = SMBusI2C(1)

ads = ADS1115(i2c, address=0x48)
ads.gain = 1
ch0 = AnalogIn(ads, 0)

print("Sensor Ready. Monitoring Air Quality...")
time.sleep(1)

# --- Main Loop ---
try:
    while True:
        # 1. Read Data
        voltage = ch0.voltage
        raw_value = ch0.value  # Getting the raw integer too
        
        # 2. Decide Status
        status_msg = "UNKNOWN"
        if voltage < THRESHOLD_WARNING:
            status_msg = "SAFE (No Smoke)"
        elif voltage < THRESHOLD_DANGER:
            status_msg = "WARNING (Low Level/Fake)"
        else:
            status_msg = "DANGER (Real Smoke Detected!)"

        # 3. Local Display
        clear_output(wait=True)
        print("-" * 30)
        print(f"Status:  {status_msg}")
        print(f"Voltage: {voltage:.4f} V")
        print(f"Raw:     {raw_value}")
        print("-" * 30)
        
        # 4. Send to ThingSpeak
        # We send Voltage to Field 1
        # Optional: You could send the 'raw_value' to Field 2 if you wanted
        url = f"https://api.thingspeak.com/update?api_key={WRITE_API_KEY}&field1={voltage}"
        
        try:
            r = requests.get(url, timeout=5)
            if r.status_code == 200 and int(r.text) > 0:
                print(f"Data Sent! Entry ID: {r.text}")
            else:
                print("ThingSpeak Update Failed.")
        except Exception as e:
            print(f"Network Error: {e}")

        # 5. Wait
        # ThingSpeak needs 15s. We wait 20s to be safe.
        # If 'DANGER' is detected, you might want to print faster locally, 
        # but you still can't send to ThingSpeak faster than 15s on free tier.
        for i in range(20, 0, -1):
            print(f"Next update in {i} seconds...", end='\r')
            time.sleep(1)

except KeyboardInterrupt:
    print("\nStopped by user.")
finally:
    i2c.deinit()

Initializing I2C
Sensor Ready. Starting ThingSpeak Loop


In [None]:
!pip install ultralytics
!pip3 install opencv-python

In [None]:
#COMPLETE EXECUTABLE CODE (ABOVE CODE IS FOR TEST INDIVIDUAL MODULES)

import time
import requests
import cv2
import os
from datetime import datetime
from ultralytics import YOLO
from smbus2 import SMBus, i2c_msg
from adafruit_ads1x15.ads1115 import ADS1115
from adafruit_ads1x15.analog_in import AnalogIn
from IPython.display import clear_output

# --- 1. CRITICAL I2C HELPER ---
class SMBusI2C:
    def __init__(self, bus_num: int):
        self._bus = SMBus(bus_num)
        self._locked = False
    def try_lock(self) -> bool:
        if self._locked: return False
        self._locked = True
        return True
    def unlock(self) -> None:
        self._locked = False
    def writeto(self, address: int, buffer: bytes, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None: end = len(buffer)
        data = bytes(buffer[start:end])
        self._bus.i2c_rdwr(i2c_msg.write(address, data))
    def readfrom_into(self, address: int, buffer: bytearray, *, start: int = 0, end: int = None, stop: bool = True) -> None:
        if end is None: end = len(buffer)
        n = end - start
        msg = i2c_msg.read(address, n)
        self._bus.i2c_rdwr(msg)
        buffer[start:end] = list(msg)
    def writeto_then_readfrom(self, address: int, out_buffer: bytes, in_buffer: bytearray, *, out_start: int = 0, out_end: int = None, in_start: int = 0, in_end: int = None, stop: bool = False) -> None:
        if out_end is None: out_end = len(out_buffer)
        if in_end is None: in_end = len(in_buffer)
        out_data = bytes(out_buffer[out_start:out_end])
        n_in = in_end - in_start
        msgs = [i2c_msg.write(address, out_data), i2c_msg.read(address, n_in)]
        self._bus.i2c_rdwr(*msgs)
        in_buffer[in_start:in_end] = list(msgs[1])
    def deinit(self) -> None:
        try: self._bus.close()
        except: pass

# --- 2. CONFIGURATION ---
WRITE_API_KEY = "APIHERE"    
WHATSAPP_PHONE = "WHATSAPPNUMHERE"        
WHATSAPP_API_KEY = "CALLMEBOTAPI"           
MODEL_PATH = "best.pt"                
CONFIDENCE_THRESHOLD = 0.5            

THRESHOLD_WARNING = 0.5 
THRESHOLD_DANGER  = 1.5
PHOTO_DIR = "alert_photos"
os.makedirs(PHOTO_DIR, exist_ok=True)

# --- 3. HARDWARE SETUP ---
print("Initializing System...")
try: i2c = SMBusI2C(0)
except: i2c = SMBusI2C(1)

ads = ADS1115(i2c, address=0x48)
ads.gain = 1
ch0 = AnalogIn(ads, 0)

print("Loading AI Model...")
model = YOLO(MODEL_PATH)

# --- 4. HELPER FUNCTIONS ---

def send_whatsapp(message):
    url = f"https://api.callmebot.com/whatsapp.php?phone={WHATSAPP_PHONE}&text={message}&apikey={WHATSAPP_API_KEY}"
    try:
        requests.get(url, timeout=10)
        return True
    except:
        return False

def print_dashboard(voltage, status, cam_status, last_photo, ts_status):
    clear_output(wait=True)
    icon = "^"
    if "WARNING" in status: icon = "!!!"
    if "CRITICAL" in status: icon = "#"
    
    # Voltage Bar
    bar_length = 20
    filled_len = int(bar_length * voltage / 3.3)
    bar = '--' * filled_len + '-' * (bar_length - filled_len)

    print(f"""
         HDSS (MONITORING MODE)
    SENSOR STATUS
    Current State    : {icon} {status}
    MQ-2 Voltage     : [{bar}] {voltage:.4f} V
    
    CAMERA OPERATION
    Action           : {cam_status}
    Last Evidence    : {last_photo}
    
    CLOUD (ThingSpeak)
    Status           : {ts_status}
    """)

def run_continuous_monitoring(voltage_trigger):
    """
    Keeps camera open for 60 seconds.
    - Saves image every 15 seconds.
    - Scans for fire continuously.
    - Returns True if fire was detected at ANY point.
    """
    monitor_duration = 60  # seconds
    save_interval = 15     # seconds
    
    start_time = time.time()
    next_save_time = start_time # Save immediately at start
    
    fire_confirmed = False
    alert_sent_this_session = False
    last_saved_file = "None"

    # Open Camera
    cap = cv2.VideoCapture(0)
    if not cap.isOpened(): cap = cv2.VideoCapture(1)
    if not cap.isOpened(): return False, "Error: No Cam"

    print("CAMERA OPEN: VISIUAL CONFIRMATION FOR 60s...")

    while (time.time() - start_time) < monitor_duration:
        ret, frame = cap.read()
        if not ret: break

        # 1. Run AI Inference
        results = model(frame, verbose=False)
        current_frame_has_fire = False
        
        # Check results
        for result in results:
            for box in result.boxes:
                conf = float(box.conf[0])
                if conf > CONFIDENCE_THRESHOLD:
                    current_frame_has_fire = True
                    fire_confirmed = True
                    
                    # Draw box for evidence
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                    cv2.putText(frame, f"FIRE {conf:.2f}", (x1, y1-10), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)

        # 2. Alert Logic (Send once per session)
        if current_frame_has_fire and not alert_sent_this_session:
            msg = f"EMERGENCY: Fire Confirmed by AI! Gas Level: {voltage_trigger:.2f}V"
            send_whatsapp(msg)
            alert_sent_this_session = True
            print(">> ALERT SENT TO WHATSAPP!")

        # 3. Save Image Logic (Every 30 seconds OR if it's the specific frame that triggered alert)
        now = time.time()
        
        # Save if 30s interval passed OR this is the first moment we saw fire
        should_save = False
        if now >= next_save_time:
            should_save = True
            next_save_time += save_interval
        elif current_frame_has_fire and last_saved_file == "None":
             # Ensure we catch the very first fire frame even if not at :30
            should_save = True

        if should_save:
            timestamp = datetime.now().strftime("%H%M%S")
            filename = f"{PHOTO_DIR}/monitor_{timestamp}.jpg"
            cv2.imwrite(filename, frame)
            last_saved_file = filename
            print(f">> Evidence Saved: {filename}")

        # Small delay to reduce CPU load slightly
        time.sleep(0.1)

    cap.release()
    return fire_confirmed, last_saved_file

# --- 5. MAIN LOOP ---
last_photo_name = "None"
ts_status_msg = "Waiting..."

try:
    while True:
        # 1. Read Sensor
        voltage = ch0.voltage
        
        # 2. Logic
        sys_status = "SAFE"
        cam_status = "Standby"

        # Update ThingSpeak FIRST so we don't lose data during the 60s scan
        try:
            url = f"https://api.thingspeak.com/update?api_key={WRITE_API_KEY}&field1={voltage}"
            r = requests.get(url, timeout=2)
            ts_status_msg = "Sent" if r.status_code == 200 else "Error"
        except:
            ts_status_msg = "Network"

        # Check Threshold
        if voltage > THRESHOLD_WARNING:
            sys_status = "GAS DETECTED - VERIFING"
            cam_status = "VISIUAL CONFIRMATION (60s)"
            
            # Show dashboard before entering the blocking loop
            print_dashboard(voltage, sys_status, cam_status, last_photo_name, ts_status_msg)
            
            # --- START 60 SECOND MONITORING ---
            fire_found, evidence_file = run_continuous_monitoring(voltage)
            
            if evidence_file:
                last_photo_name = evidence_file

            if fire_found:
                sys_status = "FIRE CONFIRMATION LOGGING"
                print_dashboard(voltage, sys_status, "Finished Scan", last_photo_name, ts_status_msg)
                # Logic: If fire found, maybe wait a bit before scanning again?
            else:
                sys_status = "Scan Complete: No Fire"
                print_dashboard(voltage, sys_status, "Finished Scan", last_photo_name, ts_status_msg)

        else:
            # Normal Safe Mode
            print_dashboard(voltage, sys_status, cam_status, last_photo_name, ts_status_msg)
            time.sleep(20)

except KeyboardInterrupt:
    print("\nStopped.")
finally:
    i2c.deinit()


         HDSS (MONITORING MODE)
    SENSOR STATUS
    Current State    : GAS DETECTED - VERIFING
    MQ-2 Voltage     : [@@@@@--------] 2.0945 V

    CAMERA OPERATION
    Action           : VISIUAL CONFIRMATION (60s)
    Last Evidence    : alert_photos/monitor_102635.jpg

    CLOUD (ThingSpeak)
    Status           : Sent
    
CAMERA OPEN: VISIUAL CONFIRMATION FOR 60s...
>> ALERT SENT TO WHATSAPP!
>> Evidence Saved: alert_photos/monitor_102638.jpg
>> Evidence Saved: alert_photos/monitor_102651.jpg

Stopped.
