### 1. 라이브러리 불러오기 및 세션 생성

In [1]:
import json
import pandas as pd
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType
from pyspark.sql.functions import explode, map_keys, col, first, get_json_object, array, to_json, struct, regexp_replace, split

In [2]:
# Spark 세션 생성
spark = SparkSession \
        .builder \
        .appName("processingJson") \
        .getOrCreate()

23/11/20 16:08:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### 2. 데이터 불러오기

In [3]:
# 분할된 JSON 파일 경로 선택하는 함수
def nth_json_path(n):
    return f'/Users/b06/Desktop/yeardream/medi-05/data/naverplace_meta/naverplace_meta_{n}.json'

In [4]:
# 첫번째 JSON 파일 데이터 불러오기
n = 1
data = spark.read.json(nth_json_path(n))

23/11/20 16:08:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


### 3. 변수

In [5]:
columns = data.columns

In [6]:
hospital_bases = [c for c in columns if "HospitalBase" in c]

In [7]:
target_columns = [
    'id',
    'name', 
    'road', 
    'reviewSettings', 
    'conveniences', 
    'keywords', 
    'phone', 
    'virtualPhone', 
    'naverBookingUrl', 
    'talktalkUrl', 
    'paymentInfo', 
    'homepages',
    'visitorReviewsTotal',
    'description',
    'Images'
]

In [8]:
string_columns = [
    'id',
    'name', 
    'road',
    'phone',
    'virtualPhone',
    'naverBookingUrl',
    'talktalkUrl',
    'visitorReviewsTotal',
    'description'
]

In [9]:
struct_columns = [
    'id',
    'reviewSettings',
    'homepages'
]

In [10]:
review_keyword_columns = [
    'id',
    'reviewSettings.keyword'
]

In [11]:
homepages_columns = [
    'id',
    'homepages.repr.url',
    'homepages.repr.type',
    'homepages.repr.isDeadUrl',
    'homepages.repr.landingUrl'
]

In [12]:
conveniences_columns = [
    'id',
    'conveniences'
]

In [13]:
keywords_columns = [
    'id',
    'keywords'
]

In [14]:
paymentInfo_columns = [
    'id',
    'paymentInfo'
]

In [15]:
string_columns_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("road", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("virtualPhone", StringType(), True),
    StructField("naverBookingUrl", StringType(), True),
    StructField("talktalkUrl", StringType(), True),
    StructField("visitorReviewsTotal", IntegerType(), True),
    StructField("description", StringType(), True)
])

In [16]:
review_keyword_columns_schema = StructType([
    StructField("id", StringType(), True),
    StructField("review_keyword", StringType(), True)
])

In [17]:
homepages_columns_schema = StructType([
    StructField("id", StringType(), True),
    StructField("url", StringType(), True),
    StructField("type", StringType(), True),
    StructField("isDeadUrl", BooleanType(), True),
    StructField("landingUrl", StringType(), True)
])

In [18]:
conveniences_columns_schema = StructType([
    StructField("id", StringType(), True),
    StructField("conveniences", StringType(), True),
])

In [19]:
keywords_columns_schema = StructType([
    StructField("id", StringType(), True),
    StructField("keywords", StringType(), True),
])

In [20]:
string_table = spark.createDataFrame([], string_columns_schema)

In [21]:
review_keyword_table = spark.createDataFrame([], review_keyword_columns_schema)

In [22]:
homepages_table = spark.createDataFrame([], homepages_columns_schema)

In [23]:
conveniences_table = spark.createDataFrame([], conveniences_columns_schema)

In [24]:
keywords_table = spark.createDataFrame([], keywords_columns_schema)

### 4. 함수

In [25]:
def get_table(df, columns, table):
    get_columns = df.select(columns)
    row = remove_null(get_columns)
    return table.union(row)

In [26]:
def remove_null(df):
    return df.filter(~col('name').isNull())

In [27]:
def preprocessing_review_keyword(review_keyword_row):
    review_keyword_row = review_keyword_row.withColumnRenamed("keyword", "review_keyword")
    review_keyword_row = review_keyword_row.withColumn("review_keyword", regexp_replace("review_keyword", " & ", ", "))
    review_keyword_row = review_keyword_row.withColumn("review_keyword", regexp_replace("review_keyword", "[()]", ""))
    review_keyword_row = review_keyword_row.withColumn("review_keyword", explode(split(col("review_keyword"), ", ")))
    return review_keyword_row

In [28]:
def get_review_keyword_table(struct_df, review_keyword_columns, review_keyword_df):
    get_review_keyword_columns = struct_df.select(review_keyword_columns)
    review_keyword_row = remove_null(get_review_keyword_columns)
    review_keyword_row = preprocessing_review_keyword(review_keyword_row)
    return review_keyword_df.union(review_keyword_row)

In [29]:
def check_null(df, column):
    cnt = df.filter(col(column).isNull()).count()
    return True if cnt == 10 else False

In [30]:
def get_homepages_table(struct_df, homepages_columns, homepages_table):
    if check_null(struct_df, 'homepages.repr'):
        return homepages_table
    else:
        return get_table(struct_df, homepages_columns, homepages_table)

In [31]:
def get_table_and_explode(df, columns, table, column):
    if check_null(df, 'column'):
        return table
    else:
        get_columns = df.select(columns)
        row = remove_null(get_columns)
        rows = row.withColumn(column, explode(row[column]))
        return table.union(rows)

### 5. 데이터 전처리

In [32]:
for hospital_base in hospital_bases:    
    hospital_base_data = data.select(hospital_base)

    get_columns = [col(hospital_base + "." + t).alias(t) for t in target_columns]
    df = hospital_base_data.select(get_columns)
    
    string_table = get_table(df, string_columns, string_table)
    struct_df = df.select(struct_columns)
    review_keyword_table = get_review_keyword_table(struct_df, review_keyword_columns, review_keyword_table)
    homepages_table = get_homepages_table(struct_df, homepages_columns, homepages_table)
    conveniences_table = get_table_and_explode(df, conveniences_columns, conveniences_table, 'conveniences')
    keywords_table = get_table_and_explode(df, keywords_columns, keywords_table, 'keywords')

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `column` cannot be resolved. Did you mean one of the following? [`id`, `name`, `road`, `phone`, `Images`].;
'Filter isnull('column)
+- Project [HospitalBase:11779766#79.id AS id#556, HospitalBase:11779766#79.name AS name#557, HospitalBase:11779766#79.road AS road#558, HospitalBase:11779766#79.reviewSettings AS reviewSettings#559, HospitalBase:11779766#79.conveniences AS conveniences#560, HospitalBase:11779766#79.keywords AS keywords#561, HospitalBase:11779766#79.phone AS phone#562, HospitalBase:11779766#79.virtualPhone AS virtualPhone#563, HospitalBase:11779766#79.naverBookingUrl AS naverBookingUrl#564, HospitalBase:11779766#79.talktalkUrl AS talktalkUrl#565, HospitalBase:11779766#79.paymentInfo AS paymentInfo#566, HospitalBase:11779766#79.homepages AS homepages#567, HospitalBase:11779766#79.visitorReviewsTotal AS visitorReviewsTotal#568L, HospitalBase:11779766#79.description AS description#569, HospitalBase:11779766#79.Images AS Images#570]
   +- Project [HospitalBase:11779766#79]
      +- Relation [BaseNaverBlog:betbetter#8,BaseNaverBlog:bondiolsc#9,BaseNaverBlog:dainhani#10,BaseNaverBlog:kundaeclinic#11,BaseNaverBlog:memeetsworld#12,BusStation:104094#13,BusStation:104154#14,BusStation:104172#15,BusStation:104181#16,BusStation:104212#17,BusStation:104222#18,BusStation:104231#19,BusStation:104321#20,BusStation:104459#21,BusStation:104500#22,BusStation:104532#23,BusStation:104554#24,BusStation:104573#25,BusStation:104578#26,BusStation:104582#27,BusStation:104602#28,BusStation:104773#29,BusStation:123595#30,BusStation:123596#31,... 229 more fields] json


In [None]:
# string_table.show(50)
# review_keyword_table.show(50)
# homepages_table.show(50)
conveniences_table.show(50)
keywords_table.show(50)

### Test
---

In [33]:
hb = hospital_bases[2]
hb_data = data.select(hb)
get_columns = [col(hb + "." + t).alias(t) for t in target_columns]
df = hb_data.select(get_columns)

In [34]:
print(hospital_bases[0])

HospitalBase:11779766


In [36]:
get_columns = df.select(conveniences_columns)
get_columns.show()

+--------+------------+
|      id|conveniences|
+--------+------------+
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|    NULL|        NULL|
|12857046|        NULL|
+--------+------------+



In [None]:
row = row.withColumn('conveniences', explode(row['conveniences']))
row.show()