In [None]:
discovery.get_pis().items()

In [None]:
from zeroconf import Zeroconf, ServiceBrowser, ServiceStateChange
import socket


class PiDiscovery:
    def __init__(self):
        self.pis = {}
        self.zeroconf = Zeroconf()
        self.browser = ServiceBrowser(
            self.zeroconf,
            "_pivideo._tcp.local.",
            handlers=[self.on_service_state_change],
        )

    def on_service_state_change(self, zeroconf, service_type, name, state_change):
        if state_change is ServiceStateChange.Added:
            info = zeroconf.get_service_info(service_type, name)
            if info:
                ip = socket.inet_ntoa(info.addresses[0])
                hostname = info.properties.get(b"hostname", b"").decode("utf-8")
                self.pis[hostname] = ip
        elif state_change is ServiceStateChange.Removed:
            self.pis.pop(name, None)

    def get_pis(self):
        return [{"name": name, "host": host} for name, host in self.pis.items()]


if __name__ == "__main__":
    import time

    discovery = PiDiscovery()

    try:
        print("Scanning for Pis...")
        while True:
            time.sleep(5)
            print("Discovered Pis:", discovery.get_pis())
    except KeyboardInterrupt:
        print("Stopping discovery.")
    finally:
        discovery.zeroconf.close()


In [None]:
import os
time.ctime(os.path.getmtime("videos/ah shit here we go again.mp4"))

In [None]:
from zeroconf import ServiceBrowser, Zeroconf
import socket
import time
from typing import Dict, Optional
import asyncio
from datetime import datetime

class PiVideoListener:
    def __init__(self):
        self.pi_services: Dict[str, dict] = {}
    
    def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
        """Called when a service is removed from the network."""
        print(f"Service {name} removed")
        if name in self.pi_services:
            del self.pi_services[name]
    
    def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
        """Called when a service is discovered on the network."""
        info = zc.get_service_info(type_, name)
        if info:
            addresses = [socket.inet_ntoa(addr) for addr in info.addresses]
            self.pi_services[name] = {
                'hostname': info.properties.get(b'name', b'unknown').decode('utf-8'),
                'addresses': addresses,
                'port': info.port,
                'version': info.properties.get(b'version', b'unknown').decode('utf-8'),
                'last_seen': datetime.now()
            }
            print(f"\nNew Pi discovered!")
            print(f"Name: {name}")
            print(f"Addresses: {addresses}")
            print(f"Port: {info.port}")
            print(f"Version: {info.properties.get(b'version', b'unknown').decode('utf-8')}\n")

    def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
        """Called when a service is updated on the network."""
        info = zc.get_service_info(type_, name)
        if info and name in self.pi_services:
            self.pi_services[name]['last_seen'] = datetime.now()

def main():
    zeroconf = Zeroconf()
    listener = PiVideoListener()
    browser = ServiceBrowser(zeroconf, "_pivideo._tcp.local.", listener)
    
    try:
        print("Searching for Pi Video services on the network...")
        print("Press Ctrl+C to stop\n")
        
        while True:
            time.sleep(1)
            # Optionally print current status periodically
            if listener.pi_services:
                print("\nCurrent active Pi services:")
                for name, service in listener.pi_services.items():
                    print(f"- {service['hostname']}: {service['addresses']} (Port: {service['port']})")
            
    except KeyboardInterrupt:
        pass
    finally:
        print("\nShutting down...")
        zeroconf.close()

if __name__ == "__main__":
    main()

In [None]:
!pip install zeroconf