In [4]:
#Module
import requests
import json
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta


In [5]:
# WEATHER (API) => Decision -(if yes)> Make message by prompting Query and LLM -> Send message
#                      
#                       

In [14]:
class WeatherActivityAdvisor:
    def __init__(self, service_key):
        """
        기상청과 대기질 API를 활용한 야외활동 적합성 판단 클래스
        
        :param service_key: 공공데이터포털에서 발급받은 서비스키
        """
        self.service_key = service_key
        self.base_url_weather = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0"
        self.base_url_air = "http://apis.data.go.kr/B552584/ArpltnInforInqireSvc"

    def get_weather_forecast(self, nx, ny):
    
        """
        기상청 단기예보 조회
        
        :param nx: 예보지점 X 좌표
        :param ny: 예보지점 Y 좌표
        :return: 기상 예보 데이터
        """
        now = datetime.now()
        base_date = now.strftime("%Y%m%d")
        
        # API는 매 3시간마다 업데이트되므로 현재 시간에 맞는 base_time 설정
        if now.hour < 2:
            base_date = (now - timedelta(days=1)).strftime("%Y%m%d")
            base_time = "2300"
        elif now.hour < 5:
            base_time = "0200"
        elif now.hour < 8:
            base_time = "0500"
        elif now.hour < 11:
            base_time = "0800"
        elif now.hour < 14:
            base_time = "1100"
        elif now.hour < 17:
            base_time = "1400"
        elif now.hour < 20:
            base_time = "1700"
        elif now.hour < 23:
            base_time = "2000"
        else:
            base_time = "2300"
        
        url = f"{self.base_url_weather}/getVilageFcst"
        params = {
            'serviceKey': self.service_key,
            'pageNo': '4',
            'numOfRows': '1000',
            'dataType': 'JSON',
            'base_date': base_date,
            'base_time': base_time,
            'nx': nx,
            'ny': ny
        }
        
        try:
            response = requests.get(url, params=params)
            print(f"날씨 API 응답 상태 코드: {response.status_code}")
            
            # 응답의 내용 일부 출력하여 디버깅
            print(f"응답 내용 일부: {response.text[:200]}...")
            
            if response.status_code == 200:
                
                # 응답 형식 확인
                content_type = response.headers.get('Content-Type', '')
                if 'xml' in content_type.lower():
                    # XML 응답 처리
                    print("XML 응답 처리 중...")
                    root = ET.fromstring(response.text)
                    items = []
                    for item in root.findall('.//item'):
                        item_dict = {}
                        for child in item:
                            item_dict[child.tag] = child.text
                        items.append(item_dict)
                    return items
                else:
                    # JSON 응답 처리
                    try:
                        data = response.json()
                        items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
                        
                        three_days_later_forecast = [item for item in items if item['fcstDate'] == "20250322"]
                        print(three_days_later_forecast)
                        if not items:
                            print("날씨 API 응답에 데이터가 없습니다.")
                        return items
                    except json.JSONDecodeError as e:
                        print(f"JSON 파싱 오류: {str(e)}")
                        # JSON 파싱 실패 시 대체 처리
                        if "SERVICE_KEY" in response.text:
                            print("서비스키 관련 오류일 수 있습니다. 서비스키를 확인해주세요.")
                        
                        # 빈 리스트 반환 또는 기본값 설정
                        return []
            else:
                print(f"날씨 API 호출 실패: {response.status_code}, 응답: {response.text}")
                return []
        except Exception as e:
            print(f"날씨 API 호출 중 예외 발생: {str(e)}")
            return []
        
    def get_weather_forecast_by_date(self, nx, ny):
        """
        기상청 단기예보 조회 - 여러 페이지를 조회하여 날짜별로 정리
        
        :param nx: 예보지점 X 좌표
        :param ny: 예보지점 Y 좌표
        :return: 날짜별로 정리된 기상 예보 데이터
        """
        now = datetime.now()
        base_date = now.strftime("%Y%m%d")
        
        # API는 매 3시간마다 업데이트되므로 현재 시간에 맞는 base_time 설정
        if now.hour < 2:
            base_date = (now - timedelta(days=1)).strftime("%Y%m%d")
            base_time = "2300"
        elif now.hour < 5:
            base_time = "0200"
        elif now.hour < 8:
            base_time = "0500"
        elif now.hour < 11:
            base_time = "0800"
        elif now.hour < 14:
            base_time = "1100"
        elif now.hour < 17:
            base_time = "1400"
        elif now.hour < 20:
            base_time = "1700"
        elif now.hour < 23:
            base_time = "2000"
        else:
            base_time = "2300"
        
        # 여러 페이지의 데이터를 모두 수집
        all_items = []
        for page_no in range(1, 5):  # 페이지 1, 2, 3, 4를 조회
            url = f"{self.base_url_weather}/getVilageFcst"
            params = {
                'serviceKey': self.service_key,
                'pageNo': str(page_no),
                'numOfRows': '1000',
                'dataType': 'JSON',
                'base_date': base_date,
                'base_time': base_time,
                'nx': nx,
                'ny': ny
            }
            
            try:
                response = requests.get(url, params=params)
                print(f"날씨 API 페이지 {page_no} 응답 상태 코드: {response.status_code}")
                
                # 응답의 내용 일부 출력하여 디버깅
                print(f"페이지 {page_no} 응답 내용 일부: {response.text[:200]}...")
                
                if response.status_code == 200:
                    # 응답 형식 확인
                    content_type = response.headers.get('Content-Type', '')
                    if 'xml' in content_type.lower():
                        # XML 응답 처리
                        print(f"페이지 {page_no} XML 응답 처리 중...")
                        root = ET.fromstring(response.text)
                        for item in root.findall('.//item'):
                            item_dict = {}
                            for child in item:
                                item_dict[child.tag] = child.text
                            all_items.append(item_dict)
                    else:
                        # JSON 응답 처리
                        try:
                            data = response.json()
                            items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
                            if items:
                                all_items.extend(items)
                            else:
                                print(f"페이지 {page_no} 날씨 API 응답에 데이터가 없습니다.")
                        except json.JSONDecodeError as e:
                            print(f"페이지 {page_no} JSON 파싱 오류: {str(e)}")
                            if "SERVICE_KEY" in response.text:
                                print("서비스키 관련 오류일 수 있습니다. 서비스키를 확인해주세요.")
                else:
                    print(f"페이지 {page_no} 날씨 API 호출 실패: {response.status_code}, 응답: {response.text}")
            except Exception as e:
                print(f"페이지 {page_no} 날씨 API 호출 중 예외 발생: {str(e)}")
        
        # 오늘부터 3일 후까지의 날짜를 계산
        today = now.date()
        dates = {
            "today": today.strftime("%Y%m%d"),
            "tomorrow": (today + timedelta(days=1)).strftime("%Y%m%d"),
            "day_after_tomorrow": (today + timedelta(days=2)).strftime("%Y%m%d"),
            "three_days_later": (today + timedelta(days=3)).strftime("%Y%m%d")
        }
        
        # 날짜별로 데이터 정리
        forecast_by_date = {
            "today": [],
            "tomorrow": [],
            "day_after_tomorrow": [],
            "three_days_later": []
        }
        
        # 각 아이템을 날짜별로 분류
        for item in all_items:
            if item['fcstDate'] == dates["today"]:
                forecast_by_date["today"].append(item)
            elif item['fcstDate'] == dates["tomorrow"]:
                forecast_by_date["tomorrow"].append(item)
            elif item['fcstDate'] == dates["day_after_tomorrow"]:
                forecast_by_date["day_after_tomorrow"].append(item)
            elif item['fcstDate'] == dates["three_days_later"]:
                forecast_by_date["three_days_later"].append(item)
        
        # 날짜별 데이터 요약 (카테고리별로 데이터 그룹화)
        summarized_forecast = {}
        for date_key, items in forecast_by_date.items():
            summary = {}
            for item in items:
                category = item['category']
                if category not in summary:
                    summary[category] = []
                summary[category].append({
                    "fcstTime": item['fcstTime'],
                    "fcstValue": item['fcstValue'],
                    "fcstDate": item['fcstDate']
                })
            summarized_forecast[date_key] = summary
        
        # 결과 출력
        for date_key, summary in summarized_forecast.items():
            print(f"\n===== {date_key} 날씨 예보 =====")
            for category, forecasts in summary.items():
                if forecasts:
                    print(f"{category}: {len(forecasts)}개 데이터")
        
        return summarized_forecast

    def get_air_quality(self, station_name):
        """
        측정소별 대기질 정보 조회
        
        :param station_name: 측정소명
        :return: 대기질 데이터
        """
        url = f"{self.base_url_air}/getMsrstnAcctoRltmMesureDnsty"
        params = {
            'serviceKey': self.service_key,
            'returnType': 'json',
            'stationName': station_name,
            'dataTerm': 'DAILY',
            'ver': '1.0'
        }
        
        try:
            response = requests.get(url, params=params)
            print(f"대기질 API 응답 상태 코드: {response.status_code}")
            
            if response.status_code == 200:
                content_type = response.headers.get('Content-Type', '')
                if 'xml' in content_type.lower():
                    # XML 응답 처리
                    root = ET.fromstring(response.text)
                    items = []
                    for item in root.findall('.//item'):
                        item_dict = {}
                        for child in item:
                            item_dict[child.tag] = child.text
                        items.append(item_dict)
                    return items
                else:
                    # JSON 응답 처리
                    try:
                        data = response.json()
                        return data.get('response', {}).get('body', {}).get('items', [])
                    except json.JSONDecodeError:
                        print(f"대기질 API JSON 파싱 오류")
                        return []
            else:
                print(f"대기질 API 호출 실패: {response.status_code}")
                return []
        except Exception as e:
            print(f"대기질 API 호출 중 예외 발생: {str(e)}")
            return []
    



In [17]:
advisor=WeatherActivityAdvisor(WEATHER_API_SERVICE_KEY)
nx=62
ny=126
station_name="강남구"
weather_data=advisor.get_weather_forecast_by_date(nx,ny )
air_quality_data=advisor.get_air_quality(station_name)

print(weather_data)
print(air_quality_data)
with open("weather_data.json", "w", encoding="utf-8") as json_file:
        json.dump(weather_data, json_file, ensure_ascii=False, indent=4)
with open("air_quality_data.json", "w", encoding="utf-8") as json_file:
        json.dump(air_quality_data, json_file, ensure_ascii=False, indent=4)



날씨 API 페이지 1 응답 상태 코드: 200
페이지 1 응답 내용 일부: {"response":{"header":{"resultCode":"00","resultMsg":"NORMAL_SERVICE"},"body":{"dataType":"JSON","items":{"item":[{"baseDate":"20250318","baseTime":"1400","category":"TMP","fcstDate":"20250318","fcstT...
날씨 API 페이지 2 응답 상태 코드: 200
페이지 2 응답 내용 일부: {"response":{"header":{"resultCode":"00","resultMsg":"NORMAL_SERVICE"},"body":{"dataType":"JSON","items":{"item":[{"baseDate":"20250318","baseTime":"1400","category":"TMP","fcstDate":"20250318","fcstT...
날씨 API 페이지 3 응답 상태 코드: 200
페이지 3 응답 내용 일부: {"response":{"header":{"resultCode":"00","resultMsg":"NORMAL_SERVICE"},"body":{"dataType":"JSON","items":{"item":[{"baseDate":"20250318","baseTime":"1400","category":"TMP","fcstDate":"20250318","fcstT...
날씨 API 페이지 4 응답 상태 코드: 200
페이지 4 응답 내용 일부: {"response":{"header":{"resultCode":"00","resultMsg":"NORMAL_SERVICE"},"body":{"dataType":"JSON","items":{"item":[{"baseDate":"20250318","baseTime":"1400","category":"TMP","fcstDate":"20250318","fcstT...

===== today

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
#from gemini_api/.retriever import retrieve_documents
from gemini_api.config import GEMINI_API_KEY, GEMINI_MODEL_NAME, MAX_OUTPUT_TOKENS, TEMPERATURE, TOP_P, TOP_K

def create_rag_chain():
    """RAG 체인 생성"""
    # Gemini 1.5 Pro 모델 설정
    gemini_model = ChatGoogleGenerativeAI(
        model=GEMINI_MODEL_NAME,
        google_api_key=GEMINI_API_KEY,
        max_output_tokens=MAX_OUTPUT_TOKENS,
        temperature=TEMPERATURE,
        top_p=TOP_P,
        top_k=TOP_K
    )

    # 프롬프트 설정
    prompt = PromptTemplate.from_template(
        """당신은 의료 정보를 제공하는 AI입니다.
        다음 문서 내용을 참고하여 질문에 답하세요.
        만약 문서에서 답을 찾을 수 없다면, 모른다고 말하세요.

        # 문서 내용:
        {context}

        # 질문:
        {question}

        # 답변:"""
    )

    # RAG 체인 실행 가능하도록 수정
    retriever = retrieve_documents  # 검색 함수 연결

    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()}  # 검색 결과와 질문 전달
        | prompt
        | gemini_model
        | StrOutputParser()
    )

    return rag_chain  # 실행 가능한 체인 객체 반환


In [None]:
# LINE 메시지 보내기 함수
def send_line_message(channel_access_token, user_to_send_id, message):
    url = 'https://api.line.me/v2/bot/message/push'
    
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {channel_access_token}'
        #'X-Line-Retry-Key':'Uaecc6981aace6cd3c6788ffb6019f1ff'
    }
    
    data = {
        'to': user_to_send_id,
        'messages': [
            {
                'type': 'text',
                'text': message
            }
        ]
    }
    
    response = requests.post(url, headers=headers, data=json.dumps(data))
    print(response.text)
    
    return response
