In [21]:
import os
import sys
import math
import json
import requests

from datetime import datetime

from dotenv import load_dotenv

"""
    https://www.data.go.kr/data/15084084/openapi.do#/tab_layer_prcuse_exame
"""

def setup_env():
    
    env_path = os.path.join(os.getcwd(), '../.env')

    if os.path.exists(env_path):
        
        load_dotenv(dotenv_path=env_path)
        
        print(f"Loaded environment variables from: \033[94m{env_path}\033[0m")
        
    else:
            print("\033[91mError: .env file not found. Please create one with your OPENAI_API_KEY.\033[0m")
            sys.exit(1)
            
setup_env()

Loaded environment variables from: [94m/home/ws/src/kumdori_rag_exp/src/../.env[0m


In [22]:
def lonlat_to_xy(lon, lat):
    """
    위도, 경도를 단기예보 격자 좌표(x, y)로 변환
    
    Args:
        lon (float): 경도 (degree)
        lat (float): 위도 (degree)
    
    Returns:
        tuple: (x, y) 격자 좌표
    """
    # 단기예보 지도 정보 (기상청 기준)
    RE = 6371.00877    # 지구반경 (km)
    GRID = 5.0         # 격자간격 (km)
    SLAT1 = 30.0       # 표준위도 1
    SLAT2 = 60.0       # 표준위도 2
    OLON = 126.0       # 기준점 경도
    OLAT = 38.0        # 기준점 위도
    XO = 210/GRID      # 기준점 X좌표
    YO = 675/GRID      # 기준점 Y좌표
    
    # 상수
    DEGRAD = math.pi / 180.0
    
    # Lambert Conformal Conic Projection 계산
    re = RE / GRID
    slat1 = SLAT1 * DEGRAD
    slat2 = SLAT2 * DEGRAD
    olon = OLON * DEGRAD
    olat = OLAT * DEGRAD
    
    sn = math.tan(math.pi * 0.25 + slat2 * 0.5) / math.tan(math.pi * 0.25 + slat1 * 0.5)
    sn = math.log(math.cos(slat1) / math.cos(slat2)) / math.log(sn)
    
    sf = math.tan(math.pi * 0.25 + slat1 * 0.5)
    sf = pow(sf, sn) * math.cos(slat1) / sn
    
    ro = math.tan(math.pi * 0.25 + olat * 0.5)
    ro = re * sf / pow(ro, sn)
    
    # 위경도를 격자 좌표로 변환
    ra = math.tan(math.pi * 0.25 + lat * DEGRAD * 0.5)
    ra = re * sf / pow(ra, sn)
    
    theta = lon * DEGRAD - olon
    if theta > math.pi:
        theta -= 2.0 * math.pi
    if theta < -math.pi:
        theta += 2.0 * math.pi
    theta *= sn
    
    x = ra * math.sin(theta) + XO
    y = ro - ra * math.cos(theta) + YO
    
    # 격자 좌표는 1부터 시작하므로 1.5를 더하고 정수로 변환
    return int(x + 1.5), int(y + 1.5)

def xy_to_lonlat(x, y):
    """
    단기예보 격자 좌표(x, y)를 위도, 경도로 변환
    
    Args:
        x (int): X 격자 좌표
        y (int): Y 격자 좌표
    
    Returns:
        tuple: (lon, lat) 경도, 위도
    """
    # 단기예보 지도 정보 (기상청 기준)
    RE = 6371.00877    # 지구반경 (km)
    GRID = 5.0         # 격자간격 (km)
    SLAT1 = 30.0       # 표준위도 1
    SLAT2 = 60.0       # 표준위도 2
    OLON = 126.0       # 기준점 경도
    OLAT = 38.0        # 기준점 위도
    XO = 210/GRID      # 기준점 X좌표
    YO = 675/GRID      # 기준점 Y좌표
    
    # 상수
    DEGRAD = math.pi / 180.0
    RADDEG = 180.0 / math.pi
    
    # Lambert Conformal Conic Projection 계산
    re = RE / GRID
    slat1 = SLAT1 * DEGRAD
    slat2 = SLAT2 * DEGRAD
    olon = OLON * DEGRAD
    olat = OLAT * DEGRAD
    
    sn = math.tan(math.pi * 0.25 + slat2 * 0.5) / math.tan(math.pi * 0.25 + slat1 * 0.5)
    sn = math.log(math.cos(slat1) / math.cos(slat2)) / math.log(sn)
    
    sf = math.tan(math.pi * 0.25 + slat1 * 0.5)
    sf = pow(sf, sn) * math.cos(slat1) / sn
    
    ro = math.tan(math.pi * 0.25 + olat * 0.5)
    ro = re * sf / pow(ro, sn)
    
    # 격자 좌표를 위경도로 변환
    x = x - 1
    y = y - 1
    
    xn = x - XO
    yn = ro - y + YO
    ra = math.sqrt(xn * xn + yn * yn)
    if sn < 0.0:
        ra = -ra
    
    alat = pow((re * sf / ra), (1.0 / sn))
    alat = 2.0 * math.atan(alat) - math.pi * 0.5
    
    if abs(xn) <= 0.0:
        theta = 0.0
    else:
        if abs(yn) <= 0.0:
            theta = math.pi * 0.5
            if xn < 0.0:
                theta = -theta
        else:
            theta = math.atan2(xn, yn)
    
    alon = theta / sn + olon
    
    lat = alat * RADDEG
    lon = alon * RADDEG
    
    return lon, lat

In [23]:
# 위경도를 격자 좌표로 변환 예제
# lon, lat = 126.929810, 37.488201
lon, lat = 127.405077, 36.424859
x, y = lonlat_to_xy(lon, lat)
print(f"경도: {lon}, 위도: {lat} -> X: {x}, Y: {y}")

# 격자 좌표를 위경도로 변환 예제
x, y = 59, 125
lon, lat = xy_to_lonlat(x, y)
print(f"X: {x}, Y: {y} -> 경도: {lon:.6f}, 위도: {lat:.6f}")

경도: 127.405077, 위도: 36.424859 -> X: 68, Y: 102
X: 59, Y: 125 -> 경도: 126.929809, 위도: 37.488200


In [24]:
from datetime import datetime, timezone, timedelta

def get_current_datetime():
    """
    현재 날짜와 시간을 'yyyyMMdd' 및 'HHMM' 형식으로 반환
    
    Returns:
        tuple: (date_str, time_str)
    """
    # 한국 표준시(KST, UTC+9)로 현재 시각을 얻고, 기준시는 2시간 전으로 설정
    now = datetime.now(timezone(timedelta(hours=9)))
    base_time = now - timedelta(hours=2)
    
    date_str = now.strftime("%Y%m%d")
    time_str = base_time.strftime("%H00")

    return date_str, time_str

In [25]:
lon, lat = 127.405077, 36.424859

nx, ny = lonlat_to_xy(lon, lat)

print(f"경도: {lon}, 위도: {lat} -> X: {nx}, Y: {ny}")

url = 'http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtFcst'

serviceKey = os.getenv("WEATHER_API_KEY")

print(serviceKey)

date_str, time_str = get_current_datetime()

params = {'serviceKey' : serviceKey, 'pageNo' : '1', 'numOfRows' : '1000', 'dataType' : 'JSON', 'base_date' : date_str, 'base_time' : time_str, 'nx' : nx, 'ny' : ny }

경도: 127.405077, 위도: 36.424859 -> X: 68, Y: 102
d417e40c157976791ee87859af286371bea584cb2faec2a2598af12a34c92c51


In [27]:
def get_url_content(url, params):
    try:
        response = requests.get(url, params)
        if response.status_code == 200:
            return response.text
        else:
            response.raise_for_status()

    except requests.exceptions.RequestException as e:
        print(f"Request Exception: {e}")
        return None
    
result = get_url_content(url, params)

if result is not None:
    data = json.loads(result)
    print(data)
else:
    print("Failed to retrieve URL content.")

{'response': {'header': {'resultCode': '00', 'resultMsg': 'NORMAL_SERVICE'}, 'body': {'dataType': 'JSON', 'items': {'item': [{'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1400', 'fcstValue': '0', 'nx': 68, 'ny': 102}, {'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1500', 'fcstValue': '0', 'nx': 68, 'ny': 102}, {'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1600', 'fcstValue': '0', 'nx': 68, 'ny': 102}, {'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1700', 'fcstValue': '0', 'nx': 68, 'ny': 102}, {'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1800', 'fcstValue': '0', 'nx': 68, 'ny': 102}, {'baseDate': '20251112', 'baseTime': '1330', 'category': 'LGT', 'fcstDate': '20251112', 'fcstTime': '1900', 'fcstValue': '0', 'nx': 68,

In [None]:
def proc_weather(data, city_name_toshow, lang_code):
    
    cur_weather=[]
    
    try:
        
        response = requests.get(url, params=data, timeout=20)
        
        res = json.loads(response.text)
        
        informations = dict()
        
        print("informations:",informations)
        
        for items in res['response']['body']['items']['item']:
            
            cate = items['category']
            
            fcstTime = items['fcstTime']
            
            fcstValue = items['fcstValue']
            
            temp = dict()
            
            temp[cate] = fcstValue

            if fcstTime not in informations.keys():
                informations[fcstTime] = dict()
                
            informations[fcstTime][cate] = fcstValue

        print(informations)
        
        pyt_code = {0: '강수는 없습니다.', 1: '비가 옵니다.', 2: '비와 눈이 옵니다.',
                    3: '눈이 옵니다.', 5: '빗방울이 떨어집니다.', 6: '진눈깨비가 내립니다.', 7: '눈이 날립니다.'}
        
        sky_code = {1: '맑으며, ', 3: '구름이 많으며, ', 4: '흐리며, '}

        for key, val in zip(informations.keys(), informations.values()):
            template = ""
            if val['SKY']:
                sky_temp = sky_code[int(val['SKY'])]
                template += sky_temp + " "
            if val['PTY']:
                pty_temp = pyt_code[int(val['PTY'])]
                template += pty_temp
            if val['T1H']:
                t1h_temp = float(val['T1H'])
                template += f" 기온은 {t1h_temp} ℃이며, "
            if val['REH']:
                reh_temp = float(val['REH'])
                template += f" 습도는 {reh_temp} %"
            
            print(f'template:{template}')
            
            cur_weather.append(template)
            
        cur_weather[0] = f"{city_name_toshow}의 현재 날씨는 {cur_weather[0]} 입니다."
        
        print(cur_weather[0])
            
    except requests.Timeout:
        
        print('timeout')
        
        return
        
    return cur_weather[0]

In [16]:
def find_coordinates(self, province_value, city_value):
    """
        주어진 '1단계' 및 '2단계' 값에 해당하는 격자 좌표(X, Y)를 찾음.
    """    
    
    matched_rows = self.data[(self.data["1단계"] == province_value) & (
        self.data["2단계"].str.contains(city_value))]
    
    print("matched_rows : ", matched_rows)
    
    if not matched_rows.empty:
        self.x_coor = str(matched_rows.iloc[0]["격자 X"])
        self.y_coor = str(matched_rows.iloc[0]["격자 Y"])
        return self.x_coor, self.y_coor
    
    else:
        return None, None

In [None]:
def fetch_weather(self, city_weather, lang_code):
        
        print(f'lang_code of wf class: {lang_code}')
        
        try:
            
            if " " in city_weather:
                self.province, self.city = city_weather.split(
                    " ", 1)  # 추가: split의 결과를 최대 2개로 제한
                for element in self.province_list:
                    if self.province in element:
                        print(
                            f"{self.province}는 province_list의 {element[0]}에 포함됩니다.")
                        self.province = element[0]
                        self.search_mode = 'province'
                        
                        break
                for element in self.metropolitan_list:
                    if self.province in element:
                        print(
                            f"{self.province}는 metropolitan_list의 {element[0]}에 포함됩니다.")
                        self.province = element[0]
                        self.search_mode = 'metropolitan'
                        
                        break
                print("city with province")
                
            else:
                try:
                    self.city, self.search_mode = self.check_location(
                        city_weather, self.city, self.search_mode)
                    
                except Exception:
                    if lang_code == 'ko-kr':
                        self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                    elif lang_code == 'en-us':
                        self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    print(f"Error: {e}. 정확한 도, 시의 이름을 말씀해주세요.")
                    return self.cur_weather
            print("결과 : ", self.province, self.city, self.search_mode)

            if self.search_mode == 'metropolitan_only':
                print(f'search_mode: {self.search_mode}')
                # 광역시(예: 대구광역시)가 1단계에 있는지 또는 2단계에 광역시 이름(예: 대구)이 포함되어 있는지 확인
                matched_rows = self.data[(self.data["1단계"] == self.city) | (self.data["2단계"].str.contains(self.city, na=False))]
                
                if not matched_rows.empty:
                    # 첫 번째 행의 '격자 X'와 '격자 Y' 값을 가져오기
                    self.x_coor = str(matched_rows.iloc[0]["격자 X"])
                    self.y_coor = str(matched_rows.iloc[0]["격자 Y"])
                    print(f"격자 X: {self.x_coor}, 격자 Y: {self.y_coor}")
                    self.params = self.set_params(
                        self.params, self.x_coor, self.y_coor)
                    print(f"self.params: {self.params}")
                    # self.cur_weather = f"{self.city}의 현재 날씨는 {self.proc_weather(self.params, lang_code)} 입니다."
                    self.cur_weather = self.proc_weather(self.params, self.city, lang_code)
                    print('metropolitan_only cur_weather : ', self.cur_weather)
                    return self.cur_weather
                elif matched_rows.empty:
                    if lang_code == 'ko-kr':
                        self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                    elif lang_code == 'en-us':
                        self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    print(self.cur_weather)
                    return self.cur_weather

            elif self.search_mode == 'metropolitan':
                print(f'search_mode: {self.search_mode}')
                self.x_coor, self.y_coor = self.find_coordinates(
                    self.province, self.city)
                if self.x_coor is not None:
                    print(f"격자 X: {self.x_coor}, 격자 Y: {self.y_coor}")
                    self.params = self.set_params(
                        self.params, self.x_coor, self.y_coor)
                    print(f"self.params: {self.params}")
                    self.city_name_toshow = ' '.join([self.province, self.city])
                    # self.cur_weather = f"{self.province} {self.city}의 현재 날씨는 {self.proc_weather(self.params, lang_code)} 입니다."
                    self.cur_weather = self.proc_weather(self.params, self.city_name_toshow, lang_code)
                    print('metropolitan cur_weather : ', self.cur_weather)
                    return self.cur_weather
                elif self.x_coor is None:
                    if lang_code == 'ko-kr':
                        self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                    elif lang_code == 'en-us':
                        self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    print(self.cur_weather)
                    return self.cur_weather

            elif self.search_mode == 'province':
                print(f'search_mode: {self.search_mode}')
                self.x_coor, self.y_coor = self.find_coordinates(
                    self.province, self.city)
                if self.x_coor is not None:
                    print(f"격자 X: {self.x_coor}, 격자 Y: {self.y_coor}")
                    self.params = self.set_params(
                        self.params, self.x_coor, self.y_coor)
                    print(f"self.params: {self.params}")
                    self.city_name_toshow = ' '.join([self.province, self.city])
                    # self.cur_weather = f"{self.province} {self.city}의 현재 날씨는 {self.proc_weather(self.params, lang_code)} 입니다."
                    self.cur_weather = self.proc_weather(self.params, self.city_name_toshow, lang_code)
                    print('province cur_weather : ', self.cur_weather)
                    return self.cur_weather
                elif self.x_coor is None:
                    if lang_code == 'ko-kr':
                        self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                    elif lang_code == 'en-us':
                        self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    print(self.cur_weather)
                    return self.cur_weather

            elif self.search_mode == 'city_only':
                print(f'search_mode: {self.search_mode}')
                matched_rows = self.data[self.data["2단계"].str.contains(
                    self.city, na=False)]
                if not matched_rows.empty:
                    # 첫 번째 행의 '격자 X'와 '격자 Y' 값을 가져오기
                    self.x_coor = str(matched_rows.iloc[0]["격자 X"])
                    self.y_coor = str(matched_rows.iloc[0]["격자 Y"])
                    print(f"격자 X: {self.x_coor}, 격자 Y: {self.y_coor}")
                    self.params = self.set_params(
                        self.params, self.x_coor, self.y_coor)
                    print(f"self.params: {self.params}")
                    self.city_name_toshow = ' '.join([matched_rows.iloc[0]['1단계'], self.city])
                    # self.cur_weather = f"{matched_rows.iloc[0]['1단계']} {self.city}의 현재 날씨는 {self.proc_weather(self.params, lang_code)} 입니다."
                    self.cur_weather = self.proc_weather(self.params, self.city_name_toshow, lang_code)
                    print('city_only cur_weather : ', self.cur_weather)
                    return self.cur_weather
                
                elif matched_rows.empty:
                    if lang_code == 'ko-kr':
                        self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                        print(f'ko-kr {self.cur_weather}')
                        
                    elif lang_code == 'en-us':
                        self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                        
                    print(self.cur_weather)
                    
                    return self.cur_weather

            elif self.search_mode == 'province_only':
                if lang_code == 'ko-kr':
                    self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                elif lang_code == 'en-us':
                    self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    
                print(self.cur_weather)
                
                return self.cur_weather
            else:
                if lang_code == 'ko-kr':
                    self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
                elif lang_code == 'en-us':
                    self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                    
                print(self.cur_weather)
                
                print(f"Error: {e}. 정확한 도, 시의 이름을 말씀해주세요.")
                
                return self.cur_weather

        except ValueError as e:
            if lang_code == 'ko-kr':
                self.cur_weather = '정확한 도, 시의 이름을 말씀해주세요.'
            
            elif lang_code == 'en-us':
                self.cur_weather = 'Please tell me the exact name of the city in Korea.'
                
            print(f"Error: {e}. 정확한 도, 시의 이름을 말씀해주세요.")
            
            return self.cur_weather

In [None]:
def fetch_weather_coor(self, latitude, longtitude):
    # 소수점 한자리까지 반올림
    # 소수점 한자리까지 반올림
    rounded_latitude = round(latitude, 1)
    rounded_longtitude = round(longtitude, 1)
    print('rounded_latitude : ', rounded_latitude)
    print('rounded_longtitude : ', rounded_longtitude)

    # 데이터에서 해당 좌표와 일치하거나, 포함되는 값을 찾는 부분
    matched_rows = self.data[
        self.data["위도(초/100)"].astype(str).str.contains(str(rounded_latitude))
        & self.data["경도(초/100)"].astype(str).str.contains(str(rounded_longtitude))
    ]

    if matched_rows.empty:
        # 근사한 값을 찾는 부분
        distances = np.sqrt(
            (self.data["위도(초/100)"].astype(float) - rounded_latitude)**2
            + (self.data["경도(초/100)"].astype(float) -
                rounded_longtitude)**2
        )
        closest_index = distances.argmin()
        matched_rows = self.data.iloc[[closest_index]]
        print('matched_rows closest_index : ', matched_rows)

    if not matched_rows.empty:
        print('matched_rows not empty : ',
                matched_rows.iloc[0]["경도(초/100)"])
        print('matched_rows not empty : ',
                matched_rows.iloc[0]["위도(초/100)"])
        print('matched_rows not empty : ',
                matched_rows.iloc[0]["2단계"])
        print('matched_rows not empty : ',
                matched_rows.iloc[0]["3단계"])
        self.x_coor = str(matched_rows.iloc[0]["격자 X"])
        self.y_coor = str(matched_rows.iloc[0]["격자 Y"])
        self.city = matched_rows.iloc[0]["1단계"]
        print(f"격자 X: {self.x_coor}, 격자 Y: {self.y_coor}")
        self.params = self.set_params(
            self.params, self.x_coor, self.y_coor)
        self.cur_weather = f"{self.city}의 현재 날씨는 {self.proc_weather(self.params)} 입니다."
        print('metropolitan_only cur_weather : ', self.cur_weather)
        return self.cur_weather
    else:
        # 이 부분은 사실상 실행되지 않아야 함
        self.cur_weather = '123정확한 도, 시의 이름을 말씀해주세요.'
        print(self.cur_weather)
        return self.cur_weather

In [None]:
if __name__ == "__main__":
    # service = WeatherService()
    city_weather = input("어떤 도, 시의 날씨를 알려드릴까요? : ")
    # service.fetch_weather(city_weather)