### 1주차 과제 진행
* client.ipynb는 카프카에 요청 위한 코드
* 테스트 시 server.ipynb 실행, client.ipynb 실행 후 색인 진행

1. 제품 listing 전체를 Elasticsearch에 인덱스
2. 제품 인덱스의 ”item_keywords” 와 “product_description”를 업데이트 할 수 있는 API
3. item_keyword와 product_description 정보로 제품을 검색
---
세부사항
Kafka
Elasticsearch
- 로컬환경에서 작동되는 인프라를 구축합니다.(텍스트 전처리는 python의 nltk를 씁니다: https://www.nltk.org/)
- 다량의 업데이트를 지원할 수 있는 시스템 구축을 가정합니다. (Flink: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/python/overview/)
- language_tag가 en_US 로 된 제품만 인덱스 합니다. (Kafka: https://docs.confluent.io/kafka-clients/python/current/overview.html)

In [1]:
import nltk 
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer

# ~/nltk_data에 말뭉치 저장.
# nltk.download('stopwords')
# nltk.download('punkt_tab')
# nltk.download('wordnet')

def analyze_document_field(doc_field):
    if doc_field is None:
        return []  
        
    # 토크나이징
    word_tokens = word_tokenize(doc_field)
    
    # 불용어 처리
    stop_words = set(stopwords.words('english'))
    filtered_words = [w for w in word_tokens if not w.lower() in stop_words]
    
    # 어간 추출
    stemmer = PorterStemmer()
    stemmed_words = [stemmer.stem(word) for word in filtered_words]
    
    # 기본형 추출
    lemmatizer = WordNetLemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]
    return ' '.join(lemmatized_words)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode, expr

# SparkSession 생성
spark = SparkSession.builder \
    .appName("Read GZ Compressed JSON Files with PySpark") \
    .config("spark.sql.debug.maxToStringFields", "1000") \
    .getOrCreate()




24/10/11 13:57:49 WARN Utils: Your hostname, HG380-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.182.66 instead (on interface en0)
24/10/11 13:57:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/11 13:57:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# gzip으로 압축된 JSON 파일들 읽기 (예: /path/to/json/*.json.gz 경로의 모든 파일)
json_path = "abo-listings/listings/metadata/*.json.gz"
df = spark.read.json(json_path)

                                                                                

In [4]:
# Write the DataFrame to an Elasticsearch index
es_conf = {
    "es.nodes.discovery": "false",
    "es.nodes.data.only": "false",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "password",
    "es.index.auto.create": "true",
    "es.nodes": "http://127.0.0.1",
    "es.port": "9200",
    "es.mapping.id": "item_id",
    "es.nodes.wan.only": "true"
}

df.write.mode("append") \
        .format('org.elasticsearch.spark.sql') \
        .options(**es_conf) \
        .save("amazon-berkeley")

                                                                                

In [7]:
from pyspark.sql.functions import from_json, col, explode, collect_list
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

schema = StructType([
    StructField("item_id", StringType(), True),
    StructField("item_keywords", ArrayType(StructType([
        StructField("language_tag", StringType(), True),
        StructField("value", StringType(), True)
    ])), True),
    StructField("product_description", ArrayType(StructType([
        StructField("language_tag", StringType(), True),
        StructField("value", StringType(), True)
    ])), True)
])
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "amazon_berkeley_update") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

json_stream = kafka_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

In [None]:
es_conf = {
    "es.nodes.discovery": "false",
    "es.nodes.data.only": "false",
    "es.net.http.auth.user": "elastic",
    "es.net.http.auth.pass": "password",
    "es.nodes": "http://127.0.0.1",
    "es.port": "9200",
    "es.mapping.id": "item_id",
    "es.mapping.exclude": "item_id",
    "es.write.operation": "update",
    "checkpointLocation": "/tmp/",
    "es.spark.sql.streaming.sink.log.enabled": "true",
    # "failOnDataLoss": "false",
    "es.nodes.wan.only": "true"
}

# Filter the "en_US" language tag for item_keywords and product_description
def filter_en_us(df, epoch_id):
    # Explode the array columns to filter them
    keywords_df = df.withColumn("item_keywords", explode(col("item_keywords"))) \
                    .filter(col("item_keywords.language_tag") == "en_US") \
                    .groupBy("item_id") \
                    .agg(collect_list("item_keywords").alias("item_keywords"))

    description_df = df.withColumn("product_description", explode(col("product_description"))) \
                       .filter(col("product_description.language_tag") == "en_US") \
                       .groupBy("item_id") \
                       .agg(collect_list("product_description").alias("product_description"))

    # Join the two DataFrames back on item_id
    final_df = keywords_df.join(description_df, "item_id", "inner")

    # Write to Elasticsearch
    final_df.write.mode("append") \
        .format('org.elasticsearch.spark.sql') \
        .options(**es_conf) \
        .save('amazon-berkeley')

# Start the streaming query to process the incoming JSON stream
query = json_stream.writeStream \
    .outputMode("append") \
    .foreachBatch(filter_en_us) \
    .options(**es_conf) \
    .start("amazon-berkeley")

query.awaitTermination()


24/10/11 14:02:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/11 14:02:33 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                