### 1. 준비

In [1]:
from functools import reduce
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import col, concat_ws, split, regexp_replace, regexp_extract, when, length, get_json_object, explode, size, array_contains, array, flatten

In [2]:
spark = SparkSession.builder \
    .appName("medi_test") \
    .getOrCreate()

23/12/07 00:19:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
import time
st = time.time()

In [4]:
root_path = '/Users/b06/Desktop/yeardream/medi-05'
json_root_path = f'{root_path}/data/naverplace_meta'
text_root_path = f'{root_path}/spark-scala-project/test.txt'

In [5]:
test_json_path = f'{json_root_path}/naverplace_meta_1.json'

In [6]:
data = spark.read.option("multiline", "true").json(test_json_path)

23/12/07 00:19:03 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
hospital_data = data.select(explode("hospital").alias("h"))

In [8]:
hospital_df = hospital_data.select(
    col("h.id").alias("id"),
    col("h.name").alias("name"),
    col("h.category").alias("category"),
    col("h.category_code").alias("category_code"),
    col("h.category_code_list").alias("category_code_list"),
    col("h.category_count").alias("category_count"),
    col("h.description").alias("description"),
    col("h.road_address").alias("road_address"),
    col("h.road").alias("road"),
    col("h.rcode").alias("rcode"),
    col("h.virtual_phone").alias("virtual_phone"),
    col("h.phone").alias("phone"),
    col("h.payment_info").alias("payment_info"),
    col("h.conveniences").alias("conveniences"),
    col("h.review_setting.keyword").alias("review_keyword"),
    col("h.keywords").alias("keywords"),
    col("h.booking_business_id").alias("booking_business_id"),
    col("h.booking_display_name").alias("booking_display_name"),
    col("h.visitor_reviews_score").alias("visitor_reviews_score"),
    col("h.visitor_reviews_total").alias("visitor_reviews_total"),
    col("h.visitor_reviews_text_review_total").alias("visitor_reviews_text_review_total"),
    col("h.images").alias("images"),
    col("h.homepages.etc").alias("homepages_etc"),
    col("h.homepages.repr").alias("homepages_repr"),
    col("h.homepages.repr.url").alias("is_rep"), # isRep?
    col("h.booking_url").alias("booking_url"),
    col("h.talktalk_url").alias("talktalk_url"),
    col("h.coordinate.x").alias("lon"),
    col("h.coordinate.y").alias("lat"),
)

In [9]:
# hospital_df.columns

In [10]:
hospital_df = hospital_df.withColumn(
    "description",
    regexp_replace("description", "[\n\r*,]", "")
).withColumn(
    "road",
    regexp_replace("road", "[\n\r*,]", "")
).withColumn(
    "review_keyword",
    regexp_replace("review_keyword", "[\\\"]", "")
)

In [11]:
hospital_df = hospital_df.withColumn(
    "description_length",
    length("description")
).withColumn(
    "images_count", 
    size("images")
).withColumn(
    'photo_review_ratio',
    (col('visitor_reviews_total')-col('visitor_reviews_text_review_total'))/col('visitor_reviews_total')
).withColumn(
    'homepages_url', 
    flatten(array(array('homepages_repr.url'), 'homepages_etc.url'))
).withColumn(
    'homepages_type', 
    flatten(array(array('homepages_repr.type'), 'homepages_etc.type'))
).withColumn(
    'homepages_order', 
    when(
        col('homepages_repr.order').isNull(), 0
    ).otherwise(
        size(flatten(array(array('homepages_repr.order'), 'homepages_etc.order')))
    )
).withColumn(
    'is_smart_phone',
    col('phone').startswith('010')
).withColumn(
    'is_zero_pay',
    array_contains(col('payment_info'), '제로페이')
).withColumn(
    'isDeadUrl',
    flatten(array(array('homepages_repr.isDeadUrl'), 'homepages_etc.isDeadUrl'))
).withColumn(
    'keywords_1',
    col('keywords')[0]
).withColumn(
    'keywords_2',
    col('keywords')[1]
).withColumn(
    'keywords_3',
    col('keywords')[2]
).withColumn(
    'keywords_4',
    col('keywords')[3]
).withColumn(
    'keywords_5',
    col('keywords')[4]
)
# hospital_df.withColumn(
#     'is_blog_exposed',
#     flatten(col('homepages_type')).contains('블로그')
# )
hospital_df = hospital_df.drop("images", "keywords", "homepages_repr", "homepages_etc")

In [12]:
arr_col_lst = [field.name for field in hospital_df.schema.fields if isinstance(field.dataType, ArrayType)]
arr_col_lst

['category_code_list',
 'payment_info',
 'conveniences',
 'homepages_url',
 'homepages_type',
 'isDeadUrl']

In [13]:
for arr_col in arr_col_lst:
    hospital_df = hospital_df.withColumn(arr_col, concat_ws(",", arr_col))

### 3. root dataframe

In [14]:
root_data = data.select(explode("root").alias("r"))
# root_df.select("root.base.__ref").show()
# root_df.select("root.fsasReviews.total").show()
# root_df.select("root.kinQna.answerCount").show()

In [15]:
root_df = root_data.select(
    col("r.hospital.base.__ref").alias("root_id"),
    col("r.hospital.fsasReviews.total").alias("fsas_reviews_count"),
    col("r.hospital.kinQna.answerCount").alias("kin_qna_count")
)

In [16]:
root_df = root_df.withColumn(
    "root_id",
    regexp_extract("root_id", "HospitalBase:([\\w]+)", 1)
)

In [17]:
# root_df.show()

### 4. join, save dataframe

In [18]:
df = hospital_df.join(root_df, hospital_df.id == root_df.root_id, "left_outer")
df = df.drop("root_id")
# id_check = df.filter(col("root_id") != col("id"))
# id_check.show()

In [19]:
# save_root_path = f'{root_path}/spark-scala-project/output/pyspark/'
# save_path = '{save_root_path}/naverplace_{n}'
# df.write.parquet(save_path)

In [20]:
# print(n)
# n=1
# df.columns

['id',
 'name',
 'category',
 'category_code',
 'category_code_list',
 'category_count',
 'description',
 'road_address',
 'road',
 'rcode',
 'virtual_phone',
 'phone',
 'payment_info',
 'conveniences',
 'review_keyword',
 'booking_business_id',
 'booking_display_name',
 'visitor_reviews_score',
 'visitor_reviews_total',
 'visitor_reviews_text_review_total',
 'is_rep',
 'booking_url',
 'talktalk_url',
 'lon',
 'lat',
 'description_length',
 'images_count',
 'photo_review_ratio',
 'homepages_url',
 'homepages_type',
 'homepages_order',
 'is_smart_phone',
 'is_zero_pay',
 'isDeadUrl',
 'keywords_1',
 'keywords_2',
 'keywords_3',
 'keywords_4',
 'keywords_5',
 'fsas_reviews_count',
 'kin_qna_count']

In [21]:
save_root_path = f'{root_path}/spark-scala-project/output/pyspark/test'
save_path = f'{save_root_path}/naverplace'
df.coalesce(1).write.mode('append').option("encoding", "utf-8").csv(save_path, header=True)

                                                                                

In [22]:
et = time.time()
tt = et-st
print(f'Execution time: {tt}')

Execution time: 4.380701780319214
