### 1. 준비

In [None]:
### import pyspark
import time
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import array, array_contains, col, concat_ws, explode, flatten, length, max, regexp_extract, regexp_replace, size, substring, when

In [None]:
### create spark session
spark = SparkSession.builder \
        .appName("medistream-05") \
        .getOrCreate()

In [None]:
### 경로 값을 할당한다.
json_path = json_path
save_path = save_path

In [None]:
### get start time
st = time.time()

In [None]:
### read json data
json_path = json_path
data = spark.read.json(json_path)

### 2. hospital key의 json object 전처리

In [None]:
### hospital key의 json object를 hospital_data의 변수에 할당했다. alias("h") 옵션을 주어 이름을 "h"로 변경했다.
hospital_data = data.select(explode("hospital").alias("h"))

### hospital_data에서 필요한 item을 가져와서 hospital_df의 변수에 할당했다.
hospital_df = hospital_data.select(
    col("h.id").alias("id"),
    col("h.name").alias("name"),
    col("h.category").alias("category"),
    col("h.category_code").alias("category_code"),
    col("h.category_code_list").alias("category_code_list"),
    col("h.category_count").alias("category_count"),
    col("h.description").alias("description"),
    col("h.road_address").alias("road_address"),
    col("h.road").alias("road"),
    col("h.rcode").alias("rcode"),
    col("h.virtual_phone").alias("virtual_phone"),
    col("h.phone").alias("phone"),
    col("h.payment_info").alias("payment_info"),
    col("h.conveniences").alias("conveniences"),
    col("h.review_setting.keyword").alias("review_keyword"),
    col("h.keywords").alias("keywords"),
    col("h.booking_business_id").alias("booking_business_id"),
    col("h.booking_display_name").alias("booking_display_name"),
    col("h.visitor_reviews_score").alias("visitor_reviews_score"),
    col("h.visitor_reviews_total").alias("visitor_reviews_total"),
    col("h.visitor_reviews_text_review_total").alias("visitor_reviews_text_review_total"),
    col("h.images").alias("images"),
    col("h.homepages.etc").alias("homepages_etc"),
    col("h.homepages.repr").alias("homepages_repr"),
    col("h.homepages.repr.url").alias("is_rep"), # isRep?
    col("h.booking_url").alias("booking_url"),
    col("h.talktalk_url").alias("talktalk_url"),
    col("h.coordinate.x").alias("lon"),
    col("h.coordinate.y").alias("lat"),
)


### hospital_df의 전처리 과정을 진행했다. 

# 문자열 변경
hospital_df = hospital_df.withColumn(
    "description",
    regexp_replace("description", "[\n\r*,]", "") 
).withColumn(
    "road",
    regexp_replace("road", "[\n\r*,]", "")
).withColumn(
    "review_keyword",
    regexp_replace("review_keyword", "[\\\"]", "")
)

# 기타 전처리
hospital_df = hospital_df.withColumn(
    # get description's length
    "description_length",
    length("description")
).withColumn(
    # count images
    "images_count", 
    size("images")
).withColumn(
    # get photo_review_ratio
    'photo_review_ratio',
    (col('visitor_reviews_total')-col('visitor_reviews_text_review_total'))/col('visitor_reviews_total')
).withColumn(
    # get homepages' urls
    'homepages_url', 
    flatten(array(array('homepages_repr.url'), 'homepages_etc.url'))
).withColumn(
    # get homepages' types
    'homepages_type', 
    flatten(array(array('homepages_repr.type'), 'homepages_etc.type'))
).withColumn(
    # get homepages' order
    'homepages_order', 
    when(
        col('homepages_repr.order').isNull(), 0
    ).otherwise(
        size(flatten(array(array('homepages_repr.order'), 'homepages_etc.order')))
    )
).withColumn(
    # get boolean of smart phone
    'is_smart_phone',
    col('phone').startswith('010')
).withColumn(
    # get boolean of zero pay
    'is_zero_pay',
    array_contains(col('payment_info'), '제로페이')
).withColumn(
    # get boolean of dead url
    'is_dead_url',
    flatten(array(array('homepages_repr.isDeadUrl'), 'homepages_etc.isDeadUrl'))
).withColumn(
    # get 1st keyword
    'keywords_1',
    col('keywords')[0]
).withColumn(
    # get 2nd keyword
    'keywords_2',
    col('keywords')[1]
).withColumn(
    # get 3rd keyword
    'keywords_3',
    col('keywords')[2]
).withColumn(
    # get 4th keyword
    'keywords_4',
    col('keywords')[3]
).withColumn(
    # get 5th keyword
    'keywords_5',
    col('keywords')[4]
)

# drop unnecessary columns
hospital_df = hospital_df.drop(
    "images", 
    "keywords", 
    "homepages_repr", 
    "homepages_etc"
).withColumn(
    # type casting
    "description",
    col("description").cast(StringType())
).withColumn(
    # type casting
    "road",
    col("road").cast(StringType())
).withColumn(
    # type casting
    "road_address",
    col("road_address").cast(StringType())   
).withColumn(
    # type casting
    "is_smart_phone", 
    col("is_smart_phone").cast(StringType())
).withColumn(
    # type casting
    "is_zero_pay", 
    col("is_zero_pay").cast(StringType())
)


### 배열 삽입을 위한 전처리 과정

# get array type columns
arr_col_lst = [field.name for field in hospital_df.schema.fields if isinstance(field.dataType, ArrayType)]

# concat_ws to array type columns
for arr_col in arr_col_lst:
    hospital_df = hospital_df.withColumn(arr_col, concat_ws(",", arr_col))

### 3. root key의 json object 전처리

In [None]:
### root key의 json object를 불러온다. root key의 alias는 r, 변수명은 root_data이다.
root_data = data.select(explode("root").alias("r"))

### root key의 json object에서 필요한 item을 가져와서 root_df에 할당했다.
root_df = root_data.select(
    col("r.hospital.base.__ref").alias("root_id"),
    col("r.hospital.fsasReviews.total").alias("fsas_reviews_count"),
    col("r.hospital.kinQna.answerCount").alias("kin_qna_count")
)

### root_df의 문자열 전처리를 진행했다.
root_df = root_df.withColumn(
    "root_id",
    regexp_extract("root_id", "HospitalBase:([\\w]+)", 1)
)

### 4. merge dataframes

In [None]:
### hospital_df와 root_df를 left outer join해서 df로 만들었다.

# 올바른 값이 가져오기 위해 id를 비교한다.
df = hospital_df.join(root_df, hospital_df.id == root_df.root_id, "left_outer")

# 불필요해진 root_id 값은 drop한다.
df = df.drop("root_id")

### 5. save merged dataframe

In [None]:
### df를 저장한다.

# 중복열을 제거한다.
df.dropDuplicates()

# parquet type으로 df를 저장한다. option은 overwrite이다.
df.write.mode('overwrite').parquet(save_path)

### 6. check task time

In [None]:
### 시간을 측정한다.
ft = time.time()
print(f"pyspark task time: {ft - st}")

### 7. upload to redshift

In [None]:
### 필요한 변수들을 할당한다.
jdbc_url = jdbc_url
temp_dir = temp_dir
db_table = db_table

In [None]:
### df를 redshift에 적재한다.
df.write \
  .format("io.github.spark_redshift_community.spark.redshift") \  # df.write의 format 설정
  .option("driver", "com.amazon.redshift.jdbc42.Driver") \        # df.write의 driver 설정
  .option("forward_spark_s3_credentials", True) \                 # df.write의 forward_spark_s3_credentials 설정
  .option("url", jdbc_url) \
  .option("dbtable", db_table) \
  .option("tempdir", temp_dir) \
  .mode("overwrite") \
  .save()

In [None]:
# spark의 사용을 종료한다.
spark.stop()