# 개요
* 참여중인 데이터 엔지니어링 스터디에서 배우는 내용 정리
  * 데이터 수집, 정제 : pyspark, airflow
  * 저장 : elasticsearch
  * 시각화 : kibana

* 4주차 과제 : elasticsearch관련 실습(index만들기, pyspark dataframe저장 등)

# 과제 - 4주차

## 데이터 포맷에 적합한 elasticsearch index 생성하기

### 기본코드

In [None]:
import argparse
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import sys
sys.path.append('/home/jovyan/jobs')
from base import read_input, init_df, df_with_meta
from filter import DailyStatFilter, TopRepoFilter, TopUserFilter, PytorchTopIssuerFilter
from es import Es

# SparkSession
spark = (SparkSession
    .builder
    .master("local")
    .appName("spark-sql")
    .config("spark.driver.extraClassPath", "/opt/bitnami/spark/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    .config("spark.jars", "/opt/bitnami/spark/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    # for jupyter
    .config("spark.driver.extraClassPath", "/home/jovyan/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    .config("spark.jars", "/home/jovyan/resources/elasticsearch-spark-30_2.12-8.4.3.jar")   
    # 옵션추가 시작
    .config("spark.executor.memory","3G")
    .config("spark.driver.memory","3G")
    .config("spark.executor.cores",2)
    # 옵션추가 끝
    .getOrCreate())

# 제출용 파일이므로 로그는 미출력되게 조정 (ALL,DEBUG,ERROR,FATAL,TRACE,WARN,INFO,OFF)
spark.sparkContext.setLogLevel("OFF")

class Args:
    def __init__(self):
        self.target_date = None
        self.input_path = None
        self.spark = None

args = Args()
args.spark = spark

24/09/05 14:06:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# for Jupyter test
args.input_path = f"/home/jovyan/data/gh_archive/2024-08-24-*.json.gz"

### 실습코드 데이터 확인

In [None]:
df = read_input(args.spark, args.input_path)
df = init_df(df)

In [None]:
# DailyStatFilter 산출데이터 확인
stat_filter = DailyStatFilter(args)
stat_df = stat_filter.filter(df)
stat_df.show()

                                                                                

+------------+------------+----------+--------+----------+--------------------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|
+------------+------------+----------+--------+----------+--------------------+
|317111      |255241      |1103808   |106155  |21821     |1480                |
+------------+------------+----------+--------+----------+--------------------+





+------------+------------+----------+--------+----------+--------------------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|
+------------+------------+----------+--------+----------+--------------------+
|      317111|      255241|   1103808|  106155|     21821|                1480|
+------------+------------+----------+--------+----------+--------------------+




                                                                                

In [None]:
# DailyStatFilter 산출데이터 with metadata(timestamp) 확인
stat_df = df_with_meta(stat_df, args.target_date)
stat_df.show()

                                                                                

+------------+------------+----------+--------+----------+--------------------+----------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|@timestamp|
+------------+------------+----------+--------+----------+--------------------+----------+
|      317111|      255241|   1103808|  106155|     21821|                1480|      null|
+------------+------------+----------+--------+----------+--------------------+----------+



### eleasticsearch index생성할 데이터 확인

In [None]:
import pyspark.sql.functions as F

In [None]:
# Filter : repo_name = pytorch
base_df = df.filter(F.col('userid_and_repo_name') == 'pytorch/pytorch')

issues_event_exists = base_df.filter(base_df["type"] == "IssuesEvent").count() > 0
if issues_event_exists:
    filtered_df = base_df.filter(F.col('type') == 'IssuesEvent')
else:
    filtered_df is None

if filtered_df is not None:
    filtered_df.show()



+-----------------+--------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+--------------------+--------------------+---------+
|        user_name|                 url|         created_at|         id|repository_id|size|distinct_size|comment|       type|            repo_url|userid_and_repo_name|repo_name|
+-----------------+--------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+--------------------+--------------------+---------+
|         hyperkai|https://api.githu...|2024-08-24 16:21:24|41307929891|         null|null|         null|   null|IssuesEvent|https://api.githu...|     pytorch/pytorch|  pytorch|
|       phanicoder|https://api.githu...|2024-08-24 16:48:56|41308170756|         null|null|         null|   null|IssuesEvent|https://api.githu...|     pytorch/pytorch|  pytorch|
|samuele-bortolato|https://api.githu...|2024-08-24 14:56:17|41307163891|         null|null|         null|   nu


                                                                                

In [None]:
# groupby : 
result_df = filtered_df.groupBy('user_name').pivot('type').count()
result_df = result_df.cache()
result_df.where((~F.col('user_name').contains('[bot]'))) \
            .orderBy(F.desc('IssuesEvent')) \
            .limit(10)
result_df.show()

                                                                                

+-----------------+-----------+
|        user_name|IssuesEvent|
+-----------------+-----------+
|         hyperkai|          1|
|       phanicoder|          1|
|  pytorchmergebot|          3|
|       stevenvana|          1|
|samuele-bortolato|          1|
+-----------------+-----------+



In [None]:
result_df.printSchema()

root
 |-- user_name: string (nullable = true)
 |-- IssuesEvent: long (nullable = true)



### elasticsearch index 생성해보기

* GPT에서 스키마를 주고 index 생성
  * 강의내용에 따라 수정하고자 했으나, GPT의 의도가 내 사용목적에 부합함
    * user_name필드
      * `type:text`로 검색가능한(full-text search) 텍스트 데이터(analyzer 적용)
      * 정렬을 위한 `fields.keyword`사용(analyzer 미적용)
    * IssuesEvent필드
      * type:long
```
PUT /pytorch_top_issuer
{
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "IssuesEvent": {
        "type": "long"
      }
    }
  }
}
```

In [None]:
# 만든 index넣어보기 (코드 by GPT)
import requests
import json

# Elasticsearch 클러스터의 URL 및 포트
es_host = 'localhost'
es_port = 9200
index_name = 'pytorch_top_issuer'

# 인덱스 생성에 사용할 JSON 데이터
mapping = {
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "IssuesEvent": {
        "type": "long"
      }
    }
  }
}

# Elasticsearch에 인덱스 생성 요청
url = f'http://es:9200/{index_name}'
headers = {'Content-Type': 'application/json'}

response = requests.put(url, headers=headers, data=json.dumps(mapping))

# 응답 출력
print(response.status_code)
print(response.json())

200
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'pytorch_top_issuer'}


## spark dataframe 을 elasticsearch 에 저장해보기

In [None]:
# 강의에 사용된 elasticsearch 저장용 코드 그대로 사용
class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "yes"
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(result_df, "pytorch_top_issuer")

                                                                                

In [None]:
# 앞서 만들어본 elasticsearch index 적용시의 코드
## 위 실습에서 만들어본 pytorch_top_issuer 인덱스로 테스트. 이후부터는 auto_create옵션으로 진행할 예정
class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "no"  # 기존 인덱스 사용을 위해 "no"로 설정
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(df, "pytorch_top_issuer")

In [None]:
# 앞서 만들어본 elasticsearch index 적용시의 코드
## 위 실습에서 만들어본 pytorch_top_issuer 인덱스로 테스트. 이후부터는 auto_create옵션으로 진행할 예정
## 결과 확인을 위해 spark의 로그레벨을 다시 조정

class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "no"  # 기존 인덱스 사용을 위해 "no"로 설정
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()
        
# 결과확인을 위해 임시로 로그레벨 조정 (ALL,DEBUG,ERROR,FATAL,TRACE,WARN,INFO,OFF)
spark.sparkContext.setLogLevel("WARN")

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(df, "pytorch_top_issuer")

## 저장된 데이터 확인해보기

In [None]:
from elasticsearch import Elasticsearch

In [None]:
# Connect to Elasticsearch by GPT
es = Elasticsearch(['http://es:9200'])

# Define the index
index_name = 'pytorch_top_issuer'

# Example search query to retrieve all documents (with a size limit)
query = {
    "query": {
        "match_all": {}
    }
}

# Execute the search query
response = es.search(index=index_name, body=query, size=10)  # Adjust 'size' to retrieve more documents

  response = es.search(index=index_name, body=query, size=10)  # Adjust 'size' to retrieve more documents


In [None]:
# Parse and print the search results
for hit in response['hits']['hits']:
    print(hit['_source'])

{'user_name': 'hyperkai', 'IssuesEvent': 1}
{'user_name': 'phanicoder', 'IssuesEvent': 1}
{'user_name': 'pytorchmergebot', 'IssuesEvent': 3}
{'user_name': 'stevenvana', 'IssuesEvent': 1}
{'user_name': 'samuele-bortolato', 'IssuesEvent': 1}
