### Load Required Libraries

In [None]:
import os
import openai # Import the 'openai' module directly
import json
import gradio as gr
import requests
import xml.etree.ElementTree as ET
import Jetson.GPIO as GPIO
import time
import math

In [None]:
# Set the API key in environment variables
os.environ['OPENAI_API_KEY'] = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'

# Set the OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")

# Example API call (Retrieve model list)
response = openai.Model.list()
print(json.dumps(response, indent=2))

In [None]:
# Configure Air Korea API
API_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
BASE_URL = "http://apis.data.go.kr/B552584/ArpltnInforInqireSvc"

# Gradio Chatbot
## Function Definitions


*   Functions to load Daejeon fine dust values using the API: get_station_fine_dust(), get_all_fine_dust()
*   Functions to load real-time fine dust measurements: measure_pm25()
*   Functions to process user messages: process()



In [None]:
# Fetch fine dust (PM2.5) values from Air Korea API
# "http://apis.data.go.kr/B552584/ArpltnInforInqireSvc"

# Retrieve fine dust data for a specific city
def get_station_fine_dust(city_name, station_name):
    endpoint = f"{BASE_URL}/getCtprvnRltmMesureDnsty"
    params = {
        "serviceKey": requests.utils.unquote(API_KEY),
        "returnType": "xml",
        "numOfRows": 100,
        "pageNo": 1,
        "sidoName": city_name,
        "ver": "1.3"
    }

    response = requests.get(endpoint, params=params)
    if response.status_code == 200:
        root = ET.fromstring(response.text)
        items = root.findall(".//item")
        for item in items:
            station = item.find("stationName").text
            if station == station_name:
                pm25_value = item.find("pm25Value").text
                return pm25_value
        return None
    else:
        return None

# Retrieve fine dust data for the entire Daejeon area
def get_all_fine_dust(city_name):
    endpoint = f"{BASE_URL}/getCtprvnRltmMesureDnsty"
    params = {
        "serviceKey": requests.utils.unquote(API_KEY),
        "returnType": "xml",
        "numOfRows": 100,
        "pageNo": 1,
        "sidoName": city_name,
        "ver": "1.3"
    }

    response = requests.get(endpoint, params=params)
    if response.status_code == 200:
        root = ET.fromstring(response.text)
        items = root.findall(".//item")
        fine_dust_data = []
        for item in items:
            station_name = item.find("stationName").text
            pm25_value = item.find("pm25Value").text
            fine_dust_data.append(f"측정소: {station_name}, 미세먼지(PM2.5): {pm25_value}㎍/㎥")
        return "\n".join(fine_dust_data)
    else:
        return f"API 요청 실패: {response.status_code}"

In [None]:
# Real-time fine dust measurement (Jetson GPIO)
def measure_pm25(pin=8, sample_time_ms=10000):
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(pin, GPIO.IN)
    low_pulse_occupancy = 0
    start_time = time.time()

    try:
        while (time.time() - start_time) * 1000 <= sample_time_ms:
            pulse_start = time.time()
            while GPIO.input(pin) == GPIO.LOW:
                pass
            pulse_end = time.time()
            low_pulse_occupancy += (pulse_end - pulse_start) * 1e6

        ratio = low_pulse_occupancy / (sample_time_ms * 10.0)
        concentration = (
            1.1 * 0.22 * math.pow(ratio, 3) -
            3.8 * 0.22 * math.pow(ratio, 2) +
            520 * 0.22 * ratio +
            0.62
        ) * 10**-4
        return round(concentration, 2)
    except Exception as e:
        return f"센서 오류: {e}"
    finally:
        GPIO.cleanup()

In [None]:
# Process user messages
def process(user_message, chat_history, state):
    # Real-time sensor measurement values
    sensor_pm25_value = measure_pm25()

    # List for extracting specific districts
    station_list = ["읍내동", "문평동", "문창동", "구성동", "노은동", "상대동(대전)", "관평동", "대흥동1", "성남동1", "대성동", "정림동", "둔산동", "월평동"]

    if "실시간" in user_message and "미세먼지" in user_message:
        # Return only real-time measurement data
        fine_dust_info = f"실시간 측정된 미세먼지 농도는 {sensor_pm25_value}㎍/㎥입니다."
    elif "미세먼지" in user_message:
        station_name = None
        for s in station_list:
            if s in user_message:
                station_name = s
                break

        if station_name:
            station_pm25_str = get_station_fine_dust("대전", station_name)
            if station_pm25_str is None:
                fine_dust_info = f"'{station_name}'에 대한 데이터를 찾을 수 없습니다."
            else:
                # Convert string to float (Handle exceptions if API values are not numeric)
                try:
                    station_pm25_value = float(station_pm25_str)
                    fine_dust_info = f"{station_name}의 미세먼지(PM2.5) 농도는 {station_pm25_value}㎍/㎥입니다.\n"

                    # Compare with real-time measurement values
                    if isinstance(sensor_pm25_value, (int, float)):
                        diff = round(sensor_pm25_value - station_pm25_value, 2)
                        if diff > 0:
                            fine_dust_info += f"현재 실내 측정값이 더 높습니다. 미세먼지 농도의 차이값이 {diff}㎍/㎥ 차이가 나며 환기가 필요합니다."
                        else:
                            fine_dust_info += f"현재 실내 측정값이 더 낮거나 같습니다. 미세먼지 농도의 차이값은 {diff}㎍/㎥ 입니다."
                    else:
                        fine_dust_info += "실시간 센서 데이터에 오류가 있습니다."

                except ValueError:
                    fine_dust_info = f"{station_name} 측정값을 변환하는 중 오류가 발생했습니다."
        else:
            # Show data for the entire Daejeon area if no specific district is selected
            fine_dust_info = get_all_fine_dust("대전")
    else:
        fine_dust_info = "미세먼지 데이터 요청 또는 실시간 데이터를 입력해주세요."

    chat_history.append((user_message, fine_dust_info))
    return "", chat_history, state

In [None]:
# Define Gradio interface
with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="미세먼지 챗봇")
    user_textbox = gr.Textbox(label="질문 입력")
    state = gr.State()

    user_textbox.submit(process, [user_textbox, chatbot, state], [user_textbox, chatbot, state])

demo.launch(share=True, debug=True)


# Notifications Using Discord


*   Send notifications through the Discord app if indoor fine dust levels are above or below the threshold.



In [None]:
!pip install discord
!pip install python-dotenv

In [None]:
import Jetson.GPIO as GPIO
import time
import math
import discord
from discord.ext import commands
import asyncio
import nest_asyncio
from dotenv import load_dotenv
import os

In [None]:
# Apply nest_asyncio to make it compatible with Jupyter Notebook's event loop
nest_asyncio.apply()

# Load environment variables
load_dotenv()

DISCORD_CHANNEL_ID = "xxxxxxxxxxxxxxx"  # Channel ID
DISCORD_BOT_TOKEN = "xxxxxxxxxxxxxxxxxx"  # Bot token

In [None]:
# Function to measure PM2.5 levels
def measure_pm25():
    """
    PM2.5 농도를 측정하여 반환하는 함수.
    """
    pin = 8
    sample_time_ms = 30000  # 30-second sampling time

    # Initialize GPIO
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(pin, GPIO.IN)

    low_pulse_occupancy = 0
    start_time = time.time()

    try:
        while (time.time() - start_time) * 1000 <= sample_time_ms:
            pulse_start = time.time()
            while GPIO.input(pin) == GPIO.LOW:
                pass
            pulse_end = time.time()

            # Calculate the duration of the LOW state
            pulse_duration = (pulse_end - pulse_start) * 1e6  # In microseconds
            low_pulse_occupancy += pulse_duration

        # Calculate PM2.5 concentration
        ratio = low_pulse_occupancy / (sample_time_ms * 10.0)
        concentration = (
            1.1*0.22* math.pow(ratio, 3) - 3.8 *0.22* math.pow(ratio, 2) + 520 *0.22* ratio + 0.62
        )*10**-4
        return round(concentration, 2)

    except Exception as e:
        print(f"Error during measurement: {e}")
        return None

    finally:
        GPIO.cleanup()

In [None]:
# Configure Discord bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)

@bot.event
async def on_ready():
    print(f'Bot is ready')
    channel = bot.get_channel(DISCORD_CHANNEL_ID)
    if channel is None:
        print('채널이 없어요 😂')
        return
    print(f'봇이 {channel}에서 준비되었습니다! 😊')
    bot.loop.create_task(pm25_alert(channel))  # Start PM2.5 alert task

# Check PM2.5 levels and send notifications
async def pm25_alert(channel):
    while True:
        pm25_concentration = measure_pm25()
        if pm25_concentration is not None:
            if pm25_concentration > 20:
                warning_message = (
                    f"🚨 경고: PM2.5 농도가 {pm25_concentration}㎍/㎥로 위험 수준입니다!\n"
                    f"🌬️ 환기가 필요합니다."
                )
                await channel.send(warning_message)
            else:
                normal_message = (
                    f"✅ 정상: PM2.5 농도가 {pm25_concentration}㎍/㎥로 정상 수준입니다."
                )
                await channel.send(normal_message)
        else:
            await channel.send("❌ PM2.5 농도를 측정할 수 없습니다.")

        await asyncio.sleep(3600)  # Send notifications every hour

# Check measurements using bot commands
@bot.command()
async def check_pm25(ctx):
    pm25_concentration = measure_pm25()
    if pm25_concentration is not None:
        if pm25_concentration > 20:
            await ctx.send(
                f"🚨 경고: PM2.5 농도가 {pm25_concentration}㎍/㎥로 위험 수준입니다!\n!"
                f"🌬️ 환기가 필요합니다."
            )
        else:
            await ctx.send(
                f"✅ 정상: PM2.5 농도가 {pm25_concentration}㎍/㎥로 정상 수준입니다."
            )
    else:
        await ctx.send("❌ PM2.5 농도를 측정할 수 없습니다.")

bot.run(DISCORD_BOT_TOKEN)