### Spark 설정

In [2]:
import glob
import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf()
spark = SparkSession.builder.appName("listings").config(conf=conf).getOrCreate()

In [1]:
# Write the DataFrame to an Elasticsearch index
es_conf = {
    "es.nodes.discovery": "false",
    "es.nodes.data.only": "false",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "password",
    "es.index.auto.create": "true",
    "es.nodes": "http://127.0.0.1",
    "es.port": "9200",
    "es.mapping.id": "item_id",
}

### elasticsearch index batch  (특정폴더의 json 파일 색인)

In [None]:
def index_json_to_elasticsearch_batch(spark, directory_path):
    """
    특정 디렉토리의 JSON 파일을 순차적으로 읽고 Spark DataFrame으로 처리하는 엘라스틱서치 색인 요청
    Args:
        spark: SparkSession 객체
        directory_path: JSON 파일이 위치한 디렉토리 경로
    """    
    json_files = glob.glob(directory_path + "/*.json")

    # 각 JSON 파일을 순차적으로 색인
    for file_path in json_files:
        df = spark.read.json(file_path)
        print(f"Processing file: {file_path}")
        #df.show(n=3)
        df.write.mode("append")\
            .format('org.elasticsearch.spark.sql') \
            .options(**es_conf) \
            .save("listings")

In [None]:
index_json_to_elasticsearch_batch(spark, "data/")

In [5]:
pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-8.15.1-py3-none-any.whl.metadata (8.7 kB)
Collecting elastic-transport<9,>=8.13 (from elasticsearch)
  Downloading elastic_transport-8.15.1-py3-none-any.whl.metadata (3.7 kB)
Downloading elasticsearch-8.15.1-py3-none-any.whl (524 kB)
   ---------------------------------------- 0.0/524.6 kB ? eta -:--:--
   --------------------------------------- 524.6/524.6 kB 16.4 MB/s eta 0:00:00
Downloading elastic_transport-8.15.1-py3-none-any.whl (64 kB)
Installing collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.15.1 elasticsearch-8.15.1
Note: you may need to restart the kernel to use updated packages.


In [36]:
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")  
index_name = "listings"

### 인덱스의 특정 문서 업데이트

In [47]:
def update_product(es, index_name, doc_id, item_keywords, product_description):
    """
    인덱스의 특정 문서를 업데이트하는 함수

    Args:
        es: Elasticsearch 클라이언트 객체
        index_name: 업데이트할 인덱스 이름
        doc_id: 업데이트할 문서의 ID
        item_keywords: 새로운 키워드 리스트
        product_description: 새로운 상품 설명
    """
    
    doc = {
        "doc": {
            "item_keywords": item_keywords,
            "product_description.value": product_description
        }
    }

    # 문서 업데이트
    response = es.update(index=index_name, id=doc_id, body=doc)

    return response

In [49]:
doc_id = "B07RQL63MD"
item_keywords =  [
    {
      "language_tag": "en_IN",
      "value": "handwash"
    },
    {
      "language_tag": "en_IN",
      "comment": "liquid hand wash"
    }
  ]
product_description = "Solimo Handwash Liquid, Sea Minerals."

result = update_product(es,index_name, doc_id, item_keywords, product_description)
print(result)

{'_index': 'listings', '_id': 'B07RQL63MD', '_version': 7, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 304646, '_primary_term': 2}


### 인덱스 검색 (특정 키워드를 포함하는 문서 검색)

In [16]:
def search_products_by_keyword(es, index_name, keyword):
  """
  인덱스 검색 (특정 키워드를 포함하는 문서 검색)
  Args:
    es: Elasticsearch 클라이언트 객체
    index_name: 검색할 인덱스 이름
    keyword: 검색할 키워드

  Returns:
    검색 결과
  """

  query = {
    "query": {
      "bool": {
        "should": [
          {"match": {"item_keywords.value": keyword}},
          {"match": {"product_description.value": keyword}}
        ]
      }
    }
  }
    
  response = es.search(index=index_name, body=query)
  return response


In [17]:
keyword = "mobile"
results = search_products_by_keyword(es, index_name, keyword)

# 결과 출력 (예시)
for hit in results['hits']['hits']:
  print(hit['_source'])

{'brand': [{'language_tag': 'de_DE', 'value': 'EONO'}], 'bullet_point': [{'language_tag': 'nl_NL', 'value': 'White board met moderne whiteboard-standaard, geavanceerde Triple-Surface-Coat Technology biedt ultra-smooth, anti-glare whiteboard afwerking die schoon maakt met één wip, bestand tegen krassen, en is getest om vrij van stains en markeerresten te blijven.'}, {'language_tag': 'nl_NL', 'value': 'Smooth-rollende wielen – het whiteboard staat gemakkelijk op harde of harde oppervlakken en de wiel vergrendelt op zijn plaats wanneer je klaar bent om te starten met de wheel slot.'}, {'language_tag': 'nl_NL', 'value': 'Schrijven More, Faster - Met een snelle aanraking van de knop op de bord, de whiteboard-flips om je binnen enkele seconden toegang te geven tot de bovenkant van het board.'}, {'language_tag': 'nl_NL', 'value': 'Inclusief alle hardware en gereedschappen voor setup, zoals twee markers en meer. Het heavy-duty frame is gemakkelijk te monteren en is ontworpen om de meest voorko