In [2]:
import sys
import json
import requests 
import datetime 
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, desc, when
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'
from pyspark.sql.types import StringType, StructType, StructField, FloatType

In [3]:
# 데이터 이동 함수 선언
conf_dw = {
      'url':'jdbc:mysql://localhost:3306/etlmysql?characterEncoding=utf8&serverTimezone=Asia/Seoul'
     ,'props':{
      'user':'bigMysql',
      'password':'bigMysql1234@'   
      }
}
conf_dm = {
      'url':'jdbc:mysql://localhost:3306/etlmysqlDM?characterEncoding=utf8&serverTimezone=Asia/Seoul'
     ,'props':{
      'user':'bigDM',
      'password':'bigDM1234@'   
      }
}
def find_data(config, table_name) :
    return spark.read.jdbc(url= config['url'], table=table_name, properties=config['props'])
def save_data(config, df, table_name) :
    return df.write.jdbc(url= config['url'], table=table_name, mode='overwrite' , properties=config['props'])   


In [4]:
# 데이터 프레임 생성
read_restaurant_info = find_data(conf_dw, 'RESTAURANT_LIST')

In [5]:
# 크롤링한 데이터에서 서울이 아닌 다른지역 음식점 데이터 제거
read_restaurant_info = read_restaurant_info.filter(col("addr").startswith("서울"))
read_restaurant_info.count()
read_restaurant_info.printSchema()

2054

root
 |-- addr: string (nullable = true)
 |-- food: string (nullable = true)
 |-- re_rank1: string (nullable = true)
 |-- re_rank2: string (nullable = true)
 |-- re_rank3: string (nullable = true)
 |-- re_rank4: string (nullable = true)
 |-- re_rank5: string (nullable = true)
 |-- re_visitor: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- id: long (nullable = true)



In [6]:
# 카카오 API연결
KAKAO_API_KEY = "18b9e741e7c823f8ba016456591a5d00" #카카오 restapi 키 등록

In [7]:
## 카카오서버에 요청 후 응답받은 데이터를 반환하는 함수
def json_request(url):
    headers = {'Authorization': 'KakaoAK {}'.format(KAKAO_API_KEY)}
    res = requests.get(url, headers=headers)
    return res.text

In [8]:
# 주소 -> 좌표 변환 요청 주소 https://dapi.kakao.com/v2/local/search/address.${FORMAT}
def addr_lat_lon(addr):
    url = 'https://dapi.kakao.com/v2/local/search/address.json?query={address}'.format(address=addr)
    try : 
        res_json = json_request(url)
        res = json.loads(res_json)
        match_adr = res['documents'][0]['address']
    except :
        return 'NaN','NaN'
    
    return float(match_adr['x']), float(match_adr['y'])

In [9]:
# 주소 바꾸기 작업을 위해 pandas로 변환
pd_df = read_restaurant_info.toPandas()

In [10]:
pd_df.head()

Unnamed: 0,addr,food,re_rank1,re_rank2,re_rank3,re_rank4,re_rank5,re_visitor,store_name,tel,id
0,서울 강동구 상일로6길 39 지하1층,간장게장,음식이 맛있어요 404,매장이 넓어요 170,양이 많아요 141,단체모임 하기 좋아요 131,가성비가 좋아요 125,게장이 짜지도않고너무 맛있어요부모님 모시고 가기 너무 좋아요먹고나오는데 매실차도 인...,강동반상,02-429-2733,1088
1,서울 강남구 선릉로131길 17 1층,간장게장,음식이 맛있어요 230,재료가 신선해요 164,매장이 청결해요 159,친절해요 147,특별한 메뉴가 있어요 80,비리지 않고 정갈하게 차려진 게장한상 외국분들 진짜 많이 와요 ㅎㅎ 🤩😝,게방식당,010-8479-1107,1092
2,서울 강남구 강남대로84길 33 대우디오빌플러스 지하1층 107호,간장게장,음식이 맛있어요 117,친절해요 80,재료가 신선해요 67,특별한 메뉴가 있어요 48,매장이 청결해요 38,"간장게장을 좋아하는 우리 딸이 강남역 부근의 맛집을 찾았어요.오늘 처음 방문했는데,...",게새장터,0507-1344-8455,1105
3,서울 강서구 공항대로 269-15 힐스테이트에코 마곡 2층 220-1호 (A동 방향),간장게장,음식이 맛있어요 1182,친절해요 697,재료가 신선해요 672,매장이 청결해요 601,가성비가 좋아요 482,점심시간때 같더니 가격도 할인되고 간장게장도 안짜고 너무 맛있고 살이 꽉 차있고 반...,게장게장게장 마곡본점,0507-1365-0490,1081
4,서울 서대문구 연희로26길 28,간장게장,음식이 맛있어요 413,친절해요 157,재료가 신선해요 155,가성비가 좋아요 118,단체모임 하기 좋아요 96,고미정에 주말 점심시간에 방문해서 쌀밥 정식과 게장 정식을 먹었는데 너무 만족스러웠...,고미정,010-2105-0155,1078


In [11]:
# 주소 바꿀 함수 
def cg_addr(df, store_name, new_address):
    df.loc[df['store_name'] == store_name, 'addr'] = new_address
    return df

In [12]:
# # 주소 존재하는지 확인용 코드
# filtered_df = pd[pd['store_name'].str.contains('육몽 홍대본점')]
# selected_columns_df = filtered_df[['store_name', 'addr']]
# print(selected_columns_df)
# a = '서울 마포구 양화로16길 19'
# addr_lat_lon(a)

In [13]:
# 주소 바꾸기 실행
pd_df = cg_addr(pd_df, '사대부집 곳간', '서울 영등포구 여의대로 24 전경련회관')
pd_df = cg_addr(pd_df, '무청감자탕 수유점', '서울 강북구 도봉로87길 47')
pd_df = cg_addr(pd_df, '화목순대국', '서울 영등포구 여의대방로 383')
pd_df = cg_addr(pd_df, '봉열소곱창', '서울 송파구 송파대로 111')
pd_df = cg_addr(pd_df, '한양양곱창 본점', '서울 성동구 마조로 17')
pd_df = cg_addr(pd_df, '백곰막걸리', '서울 강남구 압구정로48길 39')
pd_df = cg_addr(pd_df, '신가네해물닭갈비', '서울 강동구 구천면로24길 26')
pd_df = cg_addr(pd_df, '춘천골닭갈비', '서울 노원구 상계로 90 마블러스')
pd_df = cg_addr(pd_df, '종로계림닭도리탕 충무로직영점', '서울 중구 충무로2길 43')
pd_df = cg_addr(pd_df, '청계산장수촌', '서울 서초구 원터5길 14')
pd_df = cg_addr(pd_df, '모퉁이네', '서울 영등포구 국제금융로6길 33')
pd_df = cg_addr(pd_df, '족발야시장 은평뉴타운점', '서울 은평구 통일로 1020')
pd_df = cg_addr(pd_df, 'Gongi', '서울 용산구 이태원로45길 4')
pd_df = cg_addr(pd_df, '효미역 서래마을점', '서울 서초구 서래로 5')
pd_df = cg_addr(pd_df, '능동미나리', '서울 용산구 한강대로40길 28')
pd_df = cg_addr(pd_df, '방이별관 잠실방이점', '서울 송파구 올림픽로32길 33')
pd_df = cg_addr(pd_df, '육몽 홍대본점', '서울 마포구 양화로16길 19')

In [14]:
# pandas를 다시 pyspark로 변환
pd.DataFrame.iteritems = pd.DataFrame.items
df_spark = spark.createDataFrame(pd_df)

In [15]:
# 'addr_lat_lon' 함수를 PySpark UDF로 등록 -> 좌표를 구하는 함수를 각 row마다 실행시키기 위해
udf_addr_lat_lon = udf(addr_lat_lon, StructType([
    StructField("x", FloatType(), True),
    StructField("y", FloatType(), True)
]))

In [16]:
# 좌표컬럼을 추가, 튜플로 좌표가 들어감
# '좌표' 컬럼을 가지고 있는 df = res_point
res = df_spark
res_point = res.withColumn("좌표", udf_addr_lat_lon("addr"))\
                .persist()
# res_point.select('store_name', 'addr', '좌표').show(2, truncate=False)

In [17]:
# x, y좌표를 나누는 함수 작성
def extract_coordinates(coord):
    if coord:
        return coord.x, coord.y
    else:
        return None, None

# 'extract_coordinates' 함수를 PySpark UDF로 등록
udf_extract_coordinates = udf(extract_coordinates, StructType([
    StructField("x", FloatType(), True),
    StructField("y", FloatType(), True)
]))

In [18]:
# restaurant_df에 최종 x,y좌표를 나눠서 입력.
restaurant_df = res_point.withColumn("x좌표", udf_extract_coordinates("좌표").getField("x"))\
                    .withColumn("y좌표", udf_extract_coordinates("좌표").getField("y")).persist()


In [19]:
# 중복 행 제거
# result_df = restaurant_df.dropDuplicates(["store_name", "food", 'addr'])
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

In [20]:
columns_to_drop = ['re_rank1', 're_rank2', 're_rank3', 're_rank4', 're_rank5', 're_visitor', '좌표']
restaurant_df = restaurant_df.drop(*columns_to_drop)

In [21]:
restaurant_df.printSchema()

root
 |-- addr: string (nullable = true)
 |-- food: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- id: long (nullable = true)
 |-- x좌표: float (nullable = true)
 |-- y좌표: float (nullable = true)



In [22]:
# restaurant_df.count()

In [None]:
save_data(conf_dm, restaurant_df, 'RESTAURANT_INFO')

In [None]:
restaurant_df.show()