In [5]:
from elasticsearch import Elasticsearch
import warnings
warnings.filterwarnings('ignore')
#import json

In [2]:
# Elasticsearch 클라이언트 연결
es = Elasticsearch(['http://localhost:9200'])

# 조회할 인덱스 이름 설정
index_name = 'fire_station'

# 스크롤 쿼리 설정
body = {
    "_source": ["location"],  # 필요한 필드만 조회
    "query": {
        "match_all": {}
    },
    "size": 1000  # 한 번에 가져올 문서 수
}

# 스크롤 초기화
response = es.search(index=index_name, body=body, scroll='1m')

# 첫 번째 결과에서 _scroll_id와 hits 가져오기
scroll_id = response['_scroll_id']
hits = response['hits']['hits']

# 결과 저장 리스트
location_data = []

# 첫 번째 배치 데이터 저장
for hit in hits:
    location_data.append(hit['_source']['location'])

# 스크롤을 이용해 나머지 데이터 가져오기
while len(hits) > 0:
    response = es.scroll(scroll_id=scroll_id, scroll='1m')
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']
    for hit in hits:
        location_data.append(hit['_source']['location'])

# 결과 출력
print(f"총 {len(location_data)} 개의 location을 가져옴.")


총 241 개의 location을 가져옴.


In [4]:
#### 조회 반경 설정
distance = "10km"
## 조회 인덱스 지정
search_index = "real_mountain_fire"
# 필터링된 인덱스명 지정 (범위 외)
out_filter_index = "out_firestation_fire_data"
in_filter_index = "in_firestation_fire_data"

mapping = {
  "mappings": {
    "properties": {
      "location": {
        "type": "geo_point"
      },
      "FIFTH_FRTP": {
        "type": "keyword"
      },
      "지번주소": {
        "type": "keyword"
      },
      "피해금액": {
        "type": "double"
      },
      "사유": {
        "type": "text"
      },
      "진화시간": {
        "type": "integer"
      },
      "발생시각": {
        "type": "date"
      },
      "피해면적": {
        "type": "double"
      },
      "시/도명": {
        "type": "keyword"
      }
    }
  }
}

# 새 인덱스 생성 (범위 외)
if not es.indices.exists(index=out_filter_index):
  es.indices.create(index=out_filter_index, body=mapping)
  print(f'{out_filter_index} :: 범위 외 인덱스 생성 완료')
else : 
  # 이미 인덱스가 존재하면 삭제 후 생성
  es.indices.delete(index=out_filter_index)
  es.indices.create(index=out_filter_index, body=mapping)
  print(f'{out_filter_index} :: 범위 외 인덱스 삭제 후 생성 완료')
  
# 새 인덱스 생성 (범위 내)
if not es.indices.exists(index=in_filter_index):
  es.indices.create(index=in_filter_index, body=mapping)
  print(f'{in_filter_index} :: 범위 내 인덱스 생성 완료')
else : 
  # 이미 인덱스가 존재하면 삭제 후 생성
  es.indices.delete(index=in_filter_index)
  es.indices.create(index=in_filter_index, body=mapping)
  print(f'{in_filter_index} :: 범위 내 인덱스 삭제 후 생성 완료')

# 필터 쿼리문 초안 작성 (범위 외)
out_query = {
  "source": {
    "index": search_index, 
    "query": {
      "bool": {
        "must_not": []
      }
    }
  },
  "dest": {
    "index": out_filter_index
  }
}
# 필터 쿼리문 초안 작성 (범위 내)
in_query = {
  "source": {
    "index": search_index, 
    "query": {
      "bool": {
        "should": [],
        "minimum_should_match": 1
      }
    }
  },
  "dest": {
    "index": in_filter_index
  }
}
# location_data를 위도 경도로 나눔
for location in location_data :
  geo = str.split(location, ',')
  lat = geo[0]
  lon = geo[1]

  geo_filter = {
    "geo_distance": {
      "distance": distance,
      "location": {
        "lat": float(lat),
        "lon": float(lon)
      }
    }
  }

  # 쿼리 초안에 삽입
  out_query["source"]["query"]["bool"]["must_not"].append(geo_filter)
  in_query["source"]["query"]["bool"]["should"].append(geo_filter)

'''# JSON 형식으로 쿼리 저장
output_file = "geo_out_filter_query.txt"
with open(output_file, "w", encoding="utf-8") as file:
  # 첫 번째 줄에 POST _reindex 추가
  file.write("POST _reindex\n")
  # 작성된 쿼리문 추가
  json.dump(out_query, file, indent=2, ensure_ascii=False)

# JSON 형식으로 쿼리 저장
output_file = "geo_in_filter_query.txt"
with open(output_file, "w", encoding="utf-8") as file:
  # 첫 번째 줄에 POST _reindex 추가
  file.write("POST _reindex\n")
  # 작성된 쿼리문 추가
  json.dump(in_query, file, indent=2, ensure_ascii=False)'''

# _reindex API 실행 (범위 외, 범위 내 쿼리)
try:
  # 범위 외 데이터 필터링 (out_query)
  es.reindex(body=out_query)
  print("범위 외 데이터 필터링 완료")

  # 범위 내 데이터 필터링 (in_query)
  es.reindex(body=in_query)
  print("범위 내 데이터 필터링 완료")
except Exception as e:
  print(f"오류 발생: {e}")

out_filtered_fire_data :: 범위 외 인덱스 삭제 후 생성 완료
in_filtered_fire_data :: 범위 내 인덱스 삭제 후 생성 완료
범위 외 데이터 필터링 완료
범위 내 데이터 필터링 완료
