# 텍스트 유사도

-------
## (1) 수치(벡터) 거리

In [None]:
# - 수치(벡터) 거리
# 1. 코사인 유사도(Cosine Similarity)
# 2. 유클리디언 거리 유사도(Euclidean Distance Similarity)
# 3. 맨하탄 거리 유사도(Manhattan Distance Similarity)
# 4. TS / SS / TS-SS (삼각형/부채꼴/혼합형)

## (2) 편집 거리

In [None]:
# - 편집 거리
# 1. 해밍 거리 유사도(Hamming Distance Similarity)
# 2. 자로윙클러 유사도(Jaro-Winkler Similarity)
# 3. 레벤슈타인 유사도(levenshtein Similarity)

-------

In [None]:
# 1. 코사인 유사도
# - 두 문장을 각각 벡터값으로 변환 후 코사인 각도를 구해 유사도 측정
# - 결과값 -1 ~ 1 : 1에 가까울수록 유사도가 높다고 해석
# - 동일한 문장 길이가 아니어도 문장 길이를 정규화하여 비교하는 로직

In [None]:
# 2. 유클리디언 거리 유사도
# - 두 문장을 각각 벡터값으로 변환 후 n차원 공간에서 두 점 사이 최단거리를 구해 유사도 측정
# - 결과값 0 ~ 무한대 : 0에 가까울수록 유사도가 높다고 해석
# - 결과값을 0 ~ 1 사이의 값으로 정규화 시켜 해석 진행
# - 수치 간 거리를 구하는 방식에는 좋지만, 정보간의 거리를 나타내는 것에는 좋지 않음

In [None]:
# 3. 맨하탄 거리 유사도
# - 유클리디언과 비슷한 로직이지만, 두 점 사이의 직선이 아닌 블록 단위의 최단거리를 구해 유사도 측정
# - 타 알고리즘에 비해 정확도가 떨어진다는 의견이 많음

In [None]:
# 4. 해밍 거리 유사도
# - 두 개의 텍스트의 길이가 같을 때, 각 문자가 서로 다를 때마다 거리를 1씩 늘려나가는 방식으로 계산.
#   즉, 같은 문자로 바꾸기 위해 몇 개의 글자를 바꿔줘야 하는지에 대한 계산 방식
# - 결과값 0 ~ 무한대 : 0에 가까울수록 유사도가 높다고 해석

In [None]:
# 5. 자로윙클러 유사도
# - 두 텍스트간 편집 횟수를 카운팅(Transposition만 고려)
# - 결과값 0 ~ 1 : 1에 가까울수록 유사도가 높다고 해석

In [None]:
# 6. 레벤슈타인 유사도
# - 두 텍스트간 편집 횟수를 카운팅(Transposition뿐만 아닌 삭제, 수정 등 고려)
# - 결과값 0 ~ 무한대 : 0에 가까울수록 유사도가 높다고 해석

In [None]:
# - 편집 거리의 유사도 고려사항
# ex. 미르건축사무소 & 아르건축사무소 => 유사도 80%
#     (전처리 후) 미르 & 아르 => 유사도 50%
# 즉, (전처리 이후 단어가 작아지므로) 한 단어라도 일치할 경우 편집 거리의 유사도는 높게 출력됨

-------

## Library Import

In [None]:
from pyhive import hive
from os import linesep, path, sep
from requests import Session
from requests.adapters import HTTPAdapter
from requests.exceptions import *
from sys import stdin, stderr, argv
from urllib.parse import quote_plus
import json
import traceback
import sys
import pandas as pd
import time
import ctypes
import multiprocessing
import re
from pyhive import hive
import os
import pickle
import numpy as np
from multiprocessing import Process, current_process, Pool
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.functions import udf,col
from pyspark.sql.functions import lit

import affinegap as affi
import jellyfish as jell
import textdistance as td
# from korean_romanizer.romanizer import Romanizer

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.type import *
from pyspark.sql.functions import pandas_udf

## Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *
import re
import os
from os.path import join, abspath
import datetime
import time
from datetime import timedelta
from dateutil import relativedelta
import dlp

In [None]:
# class Spark_Session(object):
    
#     """
#         Spark Session 생성 Class
#         Args:
#         Returns:
#         Exception:
#     """
    
#     def __init__(self,
#                 appname,
#                 config_dict = None,
#                 dynamic = True
#                 verbose = False):
#         """
#             Spark_Session Class의 생성자
            
#             예시)
#                 1) 일반적으로 spark 세션을 분석 환경에서 열어서 사용하고 싶을 때
#                     (* start() Method를 사용해야함)
#                     spark = Spark_Session(appname = "DonationDeduction", config_dict = config_dict, dynamic = False).start()
                    
#                 2) Context Manager를 사용해 with 구문 안에서 작업하고 싶을 때
#                     (* start() Method를 사용하지 않아도 됨)
#                     with Spark_Session(appname = "DonationDeduction", config_dict = config_dict, verbose = True) as sp:
#                     sp.sql("select * from tmpr.table_nm limit 100").show()
                    
#                     Args:
#                         appname : 사용할 App Name (String)
#                         config_dict : Spark Config 관련 정보 (Dictionary)
#                             ex) config_dict = {"spark.driver.memory" : "32g",
#                                                 "spark.executor.instances" : "8",
#                                                 "spark.executor.cores" : "8",
#                                                 "spark.executor.memory " : "32g"}
#                         dynamic : dynamicAllocation을 사용할지 여부(Logical)
#                         verbose : 만들어진 소스 코드를 출력할지 여부(Logical)
                        
#                     Returns:
#                         Spark Session을 시작하는 코드(String)
#                     Exception: None
#         """
        
#         self.appname = appname
#         self.config_dict = config_dict
#         self.verbose = verbose
#         self.dynamic = dynamic
        
#         # set environment_variable
#         self.driverHost = os.environ["NODEPORT_IP"]
#         self.driverPort = os.environ["SERVICE_PORT_SPARK1"]
#         self.blockMngPort = os.environ["SERVICE_PORT_SPARK2"]
#         self.driverBindAddress = "0.0.0.0"
#         self.warehouse_location = abspath('/warehouse/tablespace/managed/hive')
        
#         # 주어진 설정 정보를 활용하여 소스 코드를 생성함
#         Self.__create_session_start_code()
        
#     def __create_session_start_code(self):
#         """
#             Spark Session을 시작하는 코드(String) 생성하는 함수
#             Args : None
#             Returns :
#                 Spark Session을 시작하는 코드(String)
#             Exception : None
#         """
        
#         # Spark Session 설정 코드 생성
#         # --> 이후 start Method에서 exec으로 실행
#         self.temp_code = """_spark = (SparkSession
#                                         .builder
#                                         .master('yarn')
#                                         .appName('""" + self.appname + """')
#                                         .config('spark.driver.host', '""" + self.driverHost + """')
#                                         .config('spark.driver.port', '""" + self.driverPort + """')
#                                         .config('spark.driver.blockManager.port', '""" + self.blockMngPort + """')
#                                         .config('spark.driver.bindAddress', '""" + self.driverBindAddress + """')
#                                         .config('spark.sql.warehouse.dir', '""" + self.warehouse_location + """')
#                                         .config("hive.exec.dynamic.partition", "true")
#                                         .config("hive.exec.dynamic.partition.mode", "nonstrict")
#                                         .config("spark.sql.broadcastTimeout","36000")
#                                         )"""
#         # Dynamic Allocation 관련 설정
#         if self.dynamic == True:
#             dynamic_code = """.config("spark.shuffle.service.enabled","true")
#                                 .config("spark.dynamicAllocation.enabled","true")
#                                 .config("spark.dynamicAllocation.minExecutors","8")
#                                 .config("spark.dynamicAllocation.initialExecutors","12")
#                                 .config("spark.dynamicAllocation.maxExecutors","32")
#                             """
#             self.temp_code = self.temp_code + dynamic_code
            
#         if self.config_dict is not None:
#             for key, value in self.config_dict.items():
#                 self.temp_code = self.temp_code + """.config('""" + key + """','""" + value + """')\n"""
                
#                 self.temp_code = self.temp_code + """.enableHiveSupport()
#                                                     .getOrCreate() )"""