# 과제 - 4주차

## 데이터 포맷에 적합한 elasticsearch index 생성하기

### 기본코드

In [1]:
import argparse
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import sys
sys.path.append('/home/jovyan/jobs')
from base import read_input, init_df, df_with_meta
from filter import DailyStatFilter, TopRepoFilter, TopUserFilter, PytorchTopIssuerFilter
from es import Es

# SparkSession
spark = (SparkSession
    .builder
    .master("local")
    .appName("spark-sql")
    .config("spark.driver.extraClassPath", "/opt/bitnami/spark/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    .config("spark.jars", "/opt/bitnami/spark/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    # for jupyter
    .config("spark.driver.extraClassPath", "/home/jovyan/resources/elasticsearch-spark-30_2.12-8.4.3.jar")
    .config("spark.jars", "/home/jovyan/resources/elasticsearch-spark-30_2.12-8.4.3.jar")   
    # 옵션추가 시작
    .config("spark.executor.memory","3G")
    .config("spark.driver.memory","3G")
    .config("spark.executor.cores",2)
    # 옵션추가 끝
    .getOrCreate())

# 제출용 파일이므로 로그는 미출력되게 조정 (ALL,DEBUG,ERROR,FATAL,TRACE,WARN,INFO,OFF)
spark.sparkContext.setLogLevel("OFF")

class Args:
    def __init__(self):
        self.target_date = None
        self.input_path = None
        self.spark = None

args = Args()
args.spark = spark

24/09/03 14:42:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# for Jupyter test
args.input_path = f"/home/jovyan/data/gh_archive/2024-08-24-*.json.gz"

### 실습코드 데이터 확인

In [None]:
df = read_input(args.spark, args.input_path)
df = init_df(df)

In [5]:
# DailyStatFilter 산출데이터 확인
stat_filter = DailyStatFilter(args)
stat_df = stat_filter.filter(df)
stat_df.show()

                                                                                

+------------+------------+----------+--------+----------+--------------------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|
+------------+------------+----------+--------+----------+--------------------+
|317111      |255241      |1103808   |106155  |21821     |1480                |
+------------+------------+----------+--------+----------+--------------------+





+------------+------------+----------+--------+----------+--------------------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|
+------------+------------+----------+--------+----------+--------------------+
|      317111|      255241|   1103808|  106155|     21821|                1480|
+------------+------------+----------+--------+----------+--------------------+



                                                                                

In [7]:
# DailyStatFilter 산출데이터 with metadata(timestamp) 확인
stat_df = df_with_meta(stat_df, args.target_date)
stat_df.show()

                                                                                

+------------+------------+----------+--------+----------+--------------------+----------+
|d_user_count|d_repo_count|push_count|pr_count|fork_count|commit_comment_count|@timestamp|
+------------+------------+----------+--------+----------+--------------------+----------+
|      317111|      255241|   1103808|  106155|     21821|                1480|      null|
+------------+------------+----------+--------+----------+--------------------+----------+



### eleasticsearch index생성할 데이터 확인

In [7]:
import pyspark.sql.functions as F

In [8]:
# Filter : repo_name = pytorch
base_df = df.filter(F.col('userid_and_repo_name') == 'pytorch/pytorch')

issues_event_exists = base_df.filter(base_df["type"] == "IssuesEvent").count() > 0
if issues_event_exists:
    filtered_df = base_df.filter(F.col('type') == 'IssuesEvent')
else:
    filtered_df is None

if filtered_df is not None:
    filtered_df.show()



+-----------------+--------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+--------------------+--------------------+---------+
|        user_name|                 url|         created_at|         id|repository_id|size|distinct_size|comment|       type|            repo_url|userid_and_repo_name|repo_name|
+-----------------+--------------------+-------------------+-----------+-------------+----+-------------+-------+-----------+--------------------+--------------------+---------+
|         hyperkai|https://api.githu...|2024-08-24 16:21:24|41307929891|         null|null|         null|   null|IssuesEvent|https://api.githu...|     pytorch/pytorch|  pytorch|
|       phanicoder|https://api.githu...|2024-08-24 16:48:56|41308170756|         null|null|         null|   null|IssuesEvent|https://api.githu...|     pytorch/pytorch|  pytorch|
|samuele-bortolato|https://api.githu...|2024-08-24 14:56:17|41307163891|         null|null|         null|   nu

                                                                                

In [9]:
# groupby : 
result_df = filtered_df.groupBy('user_name').pivot('type').count()
result_df = result_df.cache()
result_df.where((~F.col('user_name').contains('[bot]'))) \
            .orderBy(F.desc('IssuesEvent')) \
            .limit(10)
result_df.show()

                                                                                

+-----------------+-----------+
|        user_name|IssuesEvent|
+-----------------+-----------+
|         hyperkai|          1|
|       phanicoder|          1|
|  pytorchmergebot|          3|
|       stevenvana|          1|
|samuele-bortolato|          1|
+-----------------+-----------+



In [21]:
result_df.printSchema()

root
 |-- user_name: string (nullable = true)
 |-- IssuesEvent: long (nullable = true)



### elasticsearch index 생성해보기

* GPT에서 스키마를 주고 index 생성
  * 강의내용에 따라 수정하고자 했으나, GPT의 의도가 내 사용목적에 부합함
    * user_name필드
      * `type:text`로 검색가능한(full-text search) 텍스트 데이터(analyzer 적용)
      * 정렬을 위한 `fields.keyword`사용(analyzer 미적용)
    * IssuesEvent필드
      * type:long
```
PUT /pytorch_top_issuer
{
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "IssuesEvent": {
        "type": "long"
      }
    }
  }
}
```

In [10]:
# 만든 index넣어보기 (코드 by GPT)
import requests
import json

# Elasticsearch 클러스터의 URL 및 포트
es_host = 'localhost'
es_port = 9200
index_name = 'pytorch_top_issuer'

# 인덱스 생성에 사용할 JSON 데이터
mapping = {
  "mappings": {
    "properties": {
      "user_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "IssuesEvent": {
        "type": "long"
      }
    }
  }
}

# Elasticsearch에 인덱스 생성 요청
url = f'http://es:9200/{index_name}'
headers = {'Content-Type': 'application/json'}

response = requests.put(url, headers=headers, data=json.dumps(mapping))

# 응답 출력
print(response.status_code)
print(response.json())

400
{'error': {'root_cause': [{'type': 'resource_already_exists_exception', 'reason': 'index [pytorch_top_issuer/O-7q5f43QXiD1MbdoXBpHA] already exists', 'index_uuid': 'O-7q5f43QXiD1MbdoXBpHA', 'index': 'pytorch_top_issuer'}], 'type': 'resource_already_exists_exception', 'reason': 'index [pytorch_top_issuer/O-7q5f43QXiD1MbdoXBpHA] already exists', 'index_uuid': 'O-7q5f43QXiD1MbdoXBpHA', 'index': 'pytorch_top_issuer'}, 'status': 400}


## spark dataframe 을 elasticsearch 에 저장해보기

In [22]:
# 강의에 사용된 elasticsearch 저장용 코드 그대로 사용
class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "yes"
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(result_df, "daily-stats-2024")

                                                                                

In [None]:
# 앞서 만들어본 elasticsearch index 적용시의 코드
## 위 실습에서 만들어본 pytorch_top_issuer 인덱스로 테스트. 이후부터는 auto_create옵션으로 진행할 예정
class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "no"  # 기존 인덱스 사용을 위해 "no"로 설정
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(df, "pytorch_top_issuer")

In [None]:
# 앞서 만들어본 elasticsearch index 적용시의 코드
## 위 실습에서 만들어본 pytorch_top_issuer 인덱스로 테스트. 이후부터는 auto_create옵션으로 진행할 예정
## 결과 확인을 위해 spark의 로그레벨을 다시 조정

class Es(object):
    def __init__(self, es_hosts, mode="append", write_operation="overwrite"):
        self.es_hosts = es_hosts
        self.es_mode = mode
        self.es_write_operation = write_operation
        self.es_index_auto_create = "no"  # 기존 인덱스 사용을 위해 "no"로 설정
        # self.es_mapping_id

    def write_df(self, df, es_resource):
        df.write.format("org.elasticsearch.spark.sql") \
          .mode(self.es_mode) \
          .option("es.nodes", self.es_hosts) \
          .option("es.index.auto.create", self.es_index_auto_create) \
          .option("es.resource", es_resource) \
          .save()
        
# 결과확인을 위해 임시로 로그레벨 조정 (ALL,DEBUG,ERROR,FATAL,TRACE,WARN,INFO,OFF)
spark.sparkContext.setLogLevel("WARN")

# 호스트 지정 후 저장함수 활용
es = Es("http://es:9200")
es.write_df(df, "pytorch_top_issuer")

24/09/03 14:51:13 TRACE BaseSessionStateBuilder$$anon$1: Fixed point reached for batch Substitution after 1 iterations.
24/09/03 14:51:13 TRACE PlanChangeLogger: Batch Substitution has no effect.
24/09/03 14:51:13 TRACE BaseSessionStateBuilder$$anon$1: Fixed point reached for batch Disable Hints after 1 iterations.
24/09/03 14:51:13 TRACE PlanChangeLogger: Batch Disable Hints has no effect.
24/09/03 14:51:13 TRACE BaseSessionStateBuilder$$anon$1: Fixed point reached for batch Hints after 1 iterations.
24/09/03 14:51:13 TRACE PlanChangeLogger: Batch Hints has no effect.
24/09/03 14:51:13 TRACE BaseSessionStateBuilder$$anon$1: Fixed point reached for batch Simple Sanity Check after 1 iterations.
24/09/03 14:51:13 TRACE PlanChangeLogger: Batch Simple Sanity Check has no effect.
24/09/03 14:51:13 TRACE Analyzer$ResolveReferences: Attempting to resolve SaveIntoDataSourceCommand org.elasticsearch.spark.sql.DefaultSource@2c6b4690, Map(es.nodes -> http://es:9200, es.index.auto.create -> no, es

24/09/03 14:51:13 DEBUG ContextCleaner: Got cleaning task CleanAccum(1261)
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaning accumulator 1261
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaned accumulator 1261
24/09/03 14:51:13 DEBUG ContextCleaner: Got cleaning task CleanAccum(1267)
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaning accumulator 1267
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaned accumulator 1267
24/09/03 14:51:13 DEBUG ContextCleaner: Got cleaning task CleanAccum(1277)
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaning accumulator 1277
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaned accumulator 1277
24/09/03 14:51:13 DEBUG ContextCleaner: Got cleaning task CleanAccum(1274)
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaning accumulator 1274
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaned accumulator 1274
24/09/03 14:51:13 DEBUG ContextCleaner: Got cleaning task CleanAccum(1297)
24/09/03 14:51:13 DEBUG ContextCleaner: Cleaning accumulator 1297
24/09/03 14:51:13 DEBUG ContextClea

24/09/03 14:51:13 TRACE PlanChangeLogger: 
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer ===
!'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StructField(user_name,StringType,true), StructField(url,StringType,true), StructField(created_at,TimestampType,true), StructField(id,StringType,true), StructField(repository_id,LongType,true), StructField(size,LongType,true), StructField(distinct_size,LongType,true), StructField(comment,StructType(StructField(_links,StructType(StructField(html,StructType(StructField(href,StringType,true)),true), StructField(pull_request,StructType(StructField(href,StringType,true)),true), StructField(self,StructType(StructField(href,StringType,true)),true)),true), StructField(author_association,StringType,true), StructField(body,StringType,true), StructField(commit_id,StringType,true), StructField(created_at,StringType,true), StructField(diff_hunk,StringType,true), StructField(html_url,St

 +- Project [user_name#60, url#61, created_at#94, id#63, repository_id#64L, size#65L, distinct_size#66L, comment#67, type#68, repo_url#70, userid_and_repo_name#106, repo_name#120]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

       +- Project [user_name#60, url#61, created_at#94, id#63, repository_id#64L, size#65L, distinct_size#66L, comment#67, type#68, name#69, repo_url#70, name#69 AS userid_and_repo_name#106]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

     if (!isNull_262) {
/* 15694 */
/* 15695 */         Object funcResult_45 = null;
/* 15696 */         funcResult_45 = value_296.toString();
/* 15697 */         value_295 = (java.lang.String) funcResult_45;
/* 15698 */
/* 15699 */       }
/* 15700 */     }
/* 15701 */     globalIsNull_33 = isNull_262;
/* 15702 */     return value_295;
/* 15703 */   }
/* 15704 */
/* 15705 */
/* 15706 */   private long If_41(InternalRow i) {
/* 15707 */     boolean isNull_1329 = i.isNullAt(7);
/* 15708 */     InternalRow value_1543 = isNull_1329 ?
/* 15709 */     null : (i.getStruct(7, 29));
/* 15710 */     boolean isNull_1328 = true;
/* 15711 */     boolean value_1542 = false;
/* 15712 */     if (!isNull_1329) {
/* 15713 */
/* 15714 */
/* 15715 */       argValue_118 = 19;
/* 15716 */
/* 15717 */       isNull_1328 = false;
/* 15718 */       if (!isNull_1328) {
/* 15719 */
/* 15720 */         Object funcResult_221 = null;
/* 15721 */         funcResult_221 = value_1543.isNullAt(argValue_118);
/* 15722 *

24/09/03 14:51:20 TRACE HttpMethodBase: enter HttpMethodBase.readResponseHeaders(HttpState,HttpConnection)
24/09/03 14:51:20 TRACE HttpConnection: enter HttpConnection.getResponseInputStream()
24/09/03 14:51:20 TRACE HttpParser: enter HeaderParser.parseHeaders(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readLine(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readRawLine()
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readLine(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readRawLine()
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readLine(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readRawLine()
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readLine(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readRawLine()
24/09/03 14:51:20 TRACE HttpParser: enter HttpParser.readLine(InputStream, String)
24/09/03 14:51:20 TRACE HttpParser:

{"user_name":"ssfun","url":"https://api.github.com/users/ssfun","created_at":1724511602000,"id":"41307194441","comment":{"author_association":"OWNER","body":"**Resolved:** HF.SMGC CHAT2API is back up in [`34cd8ae`](https://github.com/ssfun/upptime/commit/34cd8aea79d12828561cff5d065653598cad1bc2\n) after 16 minutes.","created_at":"2024-08-24T15:00:00Z","html_url":"https://github.com/ssfun/upptime/issues/2988#issuecomment-2308422595","id":2308422595,"issue_url":"https://api.github.com/repos/ssfun/upptime/issues/2988","node_id":"IC_kwDOLyXe-s6Jl7vD","reactions":{"+1":0,"-1":0,"confused":0,"eyes":0,"heart":0,"hooray":0,"laugh":0,"rocket":0,"total_count":0,"url":"https://api.github.com/repos/ssfun/upptime/issues/comments/2308422595/reactions"},"updated_at":"2024-08-24T15:00:00Z","url":"https://api.github.com/repos/ssfun/upptime/issues/comments/2308422595","user":{"avatar_url":"https://avatars.githubusercontent.com/u/15905889?v=4","events_url":"https://api.github.com/users/ssfun/events{/priv

24/09/03 14:51:20 TRACE CommonsHttpTransport: Rx @[172.18.0.5] [200-OK] [{"took":42,"errors":false,"items":[{"index":{"_index":"pytorch_top_issuer","_id":"wHheuJEB_alrSNqWaOQ8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1775082,"_primary_term":1,"status":201}},{"index":{"_index":"pytorch_top_issuer","_id":"wXheuJEB_alrSNqWaOQ8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1775083,"_primary_term":1,"status":201}},{"index":{"_index":"pytorch_top_issuer","_id":"wnheuJEB_alrSNqWaOQ8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1775084,"_primary_term":1,"status":201}},{"index":{"_index":"pytorch_top_issuer","_id":"w3heuJEB_alrSNqWaOQ8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1775085,"_primary_term":1,"status":201}},{"index":{"_index":"pytorch_top_issuer","_id":"xHheuJEB_alrSNqWaOQ8","_version":1,"result":"