# Spark DataFrame 에서 복잡한 데이터 포맷 다루기 

참조
* [Working with Complex Data Formats with Structured Streaming in Apache Spark 2.1](https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html)

In [1]:
import os
from pyspark.sql import SparkSession

# Python Version Mismatch Error 일 경우, os.environ 으로 직접 설정 후 실행
# Exception: Python in worker has different version 2.7 than that in driver 3.5, 
# PySpark cannot run with different minor versions.Please check environment variables 
# PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

# os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
# os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark_home = os.environ.get('SPARK_HOME', None)
print(spark_home)

spark = SparkSession.builder.master("local[*]").appName("spark")
spark = spark.config("spark.driver.memory", "4g")
spark = spark.config("spark.executor.memory", "4g")
spark = spark.config("spark.python.worker.memory", "4g")
spark = spark.getOrCreate()

sc = spark.sparkContext

/usr/local/spark-2.4.1


## DataFrame Complex Data Type

웹에서 시작된 JSON 포맷은 현재 인기있는 Raw Data Format 으로 자리잡았습니다. 따라서, Spark을 통해서 데이터 엔지니어링을 할때 많이 다루게 되는 포맷이 JSON 포맷입니다. 여기서는 JSON 에서 대표적으로 사용되는 Complex Data Format을 다루는 방법에 대해서 알아보겠습니다.

![complex data format](https://databricks.com/wp-content/uploads/2017/02/various-data-types.png)

In [2]:
# import pyspark class
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql import types as t

json_str_struct = """
{
  "a_struct": {
     "field_1": 1,
     "field_2": "b",
     "field_3": {
         "key_1": 1,
         "key_2": 2,
         "key_3": 3
     }
  }
}
"""

rdd_struct = sc.parallelize([json_str_struct])
df_struct = spark.read.json(rdd_struct)
print("{}".format(df_struct.printSchema()))

root
 |-- a_struct: struct (nullable = true)
 |    |-- field_1: long (nullable = true)
 |    |-- field_2: string (nullable = true)
 |    |-- field_3: struct (nullable = true)
 |    |    |-- key_1: long (nullable = true)
 |    |    |-- key_2: long (nullable = true)
 |    |    |-- key_3: long (nullable = true)

None


In [3]:
# json string 은 기본적으로 struct type 으로 parsing 되기 때문에 map type 으로 받으려면 schema 를 지정해줘야 합니다.
schema_map = t.StructType([
    t.StructField("a_map", t.MapType(t.StringType(), t.LongType()), False)
])

json_str_map = """
{
  "a_map": {
     "key_1": 1,
     "key_2": 2,
     "key_3": 3
  }
}
"""

rdd_map = sc.parallelize([json_str_map])
df_map = spark.read.json(rdd_map, schema_map)
print("{}".format(df_map.printSchema()))

root
 |-- a_map: map (nullable = false)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)

None


In [4]:
json_str_array = """
{
  "an_array": [
      "Allice", 
      "Bob", 
      "Chris"
  ]
}
"""

rdd_array = sc.parallelize([json_str_array])
df_array = spark.read.json(rdd_array)
print("{}".format(df_array.printSchema()))

root
 |-- an_array: array (nullable = true)
 |    |-- element: string (containsNull = true)

None


## Seleting from nested columns 
struct 와 map 의 nested column 접근시에는 '.' 문자를 사용합니다.

In [5]:
json_str = """
{
  "a": {
     "b": 1
  }
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select("a.b")
df_res.printSchema()
df_res.show()

df_res = spark.sql("select a.b from view")
df_res.printSchema()
df_res.show()

root
 |-- b: long (nullable = true)

+---+
|  b|
+---+
|  1|
+---+

root
 |-- b: long (nullable = true)

+---+
|  b|
+---+
|  1|
+---+



## Flattening structs

struct 에서 모든 서브 필드를 가져오고 싶으면 '*' 문자를 사용하면 됩니다.

In [6]:
json_str = """
{
  "a": {
     "b": 1,
     "c": 2
  }
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select("a.*")
df_res.printSchema()
df_res.show()

df_res = spark.sql("select a.* from view")
df_res.printSchema()
df_res.show()

root
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)

+---+---+
|  b|  c|
+---+---+
|  1|  2|
+---+---+

root
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)

+---+---+
|  b|  c|
+---+---+
|  1|  2|
+---+---+



## Nesting Columns

struct 함수를 이용하면 새로운 struct 생성이 가능합니다.

In [7]:
json_str = """
{
  "a": 1,
  "b": 2,
  "c": 3
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.struct(f.col("a").alias("y")).alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql('select named_struct("y", a) as x from view')
df_res.printSchema()
df_res.show()

root
 |-- x: struct (nullable = false)
 |    |-- y: long (nullable = true)

+---+
|  x|
+---+
|[1]|
+---+

root
 |-- x: struct (nullable = false)
 |    |-- y: long (nullable = true)

+---+
|  x|
+---+
|[1]|
+---+



## Nesting all columns

모든 서브필드를 struct로 만들고 싶으면 '*' 문자를 사용하면 됩니다.

In [8]:
json_str = """
{
  "a": 1,
  "b": 2
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.struct("*").alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql('select struct(*) as x from view')
df_res.printSchema()
df_res.show()

root
 |-- x: struct (nullable = false)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = true)

+------+
|     x|
+------+
|[1, 2]|
+------+

root
 |-- x: struct (nullable = false)
 |    |-- a: long (nullable = true)
 |    |-- b: long (nullable = true)

+------+
|     x|
+------+
|[1, 2]|
+------+



## Selecting a single array or map element

getItem 함수를 이용하면 array, map 에서 element 를 가져올 수 있습니다.

In [9]:
json_str = """
{
  "a": [1, 2]
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.col("a").getItem(0).alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql('select a[0] as x from view')
df_res.printSchema()
df_res.show()

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
+---+

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
+---+



In [10]:
json_str = """
{
  "a": {
    "b": 1
  }
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.col("a").getItem("b").alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select a['b'] as x from view")
df_res.printSchema()
df_res.show()

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
+---+

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
+---+



## Creating a row for each array or map element

explode함수를 이용하여 array 나 map 의 key-value 를 위한 새로운 row를 생성할 수 있습니다.

In [11]:
json_str = """
{
  "a": [1, 2]
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.explode("a").alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select explode(a) as x from view")
df_res.printSchema()
df_res.show()

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
|  2|
+---+

root
 |-- x: long (nullable = true)

+---+
|  x|
+---+
|  1|
|  2|
+---+



In [12]:
json_str = """
{
  "a": {
    "b": 1,
    "c": 2
  }
}
"""
schema = t.StructType([
    t.StructField("a", t.MapType(t.StringType(), t.LongType()), False)
])

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd, schema)
df.createOrReplaceTempView("view")

df_res = df.select(f.explode("a").alias("x", "y"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select explode(a) as (x, y) from view")
df_res.printSchema()
df_res.show()

root
 |-- x: string (nullable = false)
 |-- y: long (nullable = true)

+---+---+
|  x|  y|
+---+---+
|  b|  1|
|  c|  2|
+---+---+

root
 |-- x: string (nullable = false)
 |-- y: long (nullable = true)

+---+---+
|  x|  y|
+---+---+
|  b|  1|
|  c|  2|
+---+---+



## Collecting multiple rows into an array

collect_list, collect_set 함수는 array 에 여러 row (또는 연산된 row)를 넣을 수 있습니다.

In [13]:
json_str = """
[{ "x": 1 }, { "x": 2 }]
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.collect_list("x").alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select collect_list(x) as x from view")
df_res.printSchema()
df_res.show()

root
 |-- x: array (nullable = true)
 |    |-- element: long (containsNull = true)

+------+
|     x|
+------+
|[1, 2]|
+------+

root
 |-- x: array (nullable = true)
 |    |-- element: long (containsNull = true)

+------+
|     x|
+------+
|[1, 2]|
+------+



In [14]:
json_str = """
[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.groupBy("y").agg(f.collect_list("x").alias("x"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select y, collect_list(x) as x from view group by y")
df_res.printSchema()
df_res.show()

root
 |-- y: string (nullable = true)
 |-- x: array (nullable = true)
 |    |-- element: long (containsNull = true)

+---+---+
|  y|  x|
+---+---+
|  b|[2]|
|  a|[1]|
+---+---+

root
 |-- y: string (nullable = true)
 |-- x: array (nullable = true)
 |    |-- element: long (containsNull = true)

+---+---+
|  y|  x|
+---+---+
|  b|[2]|
|  a|[1]|
+---+---+



## Selecting one field from each item in an array

array 에서 '.' 문자를 사용하면 값을 array 형태로 돌려주게 됩니다. 

In [15]:
json_str = """
{
  "a": [
    {"b": 1},
    {"b": 2}
  ]
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.col("a.b"))
df_res.printSchema()
df_res.show()

df_res = spark.sql("select a.b from view")
df_res.printSchema()
df_res.show()

root
 |-- b: array (nullable = true)
 |    |-- element: long (containsNull = true)

+------+
|     b|
+------+
|[1, 2]|
+------+

root
 |-- b: array (nullable = true)
 |    |-- element: long (containsNull = true)

+------+
|     b|
+------+
|[1, 2]|
+------+



# Power of to_json() and from_json()

DataFrame 내에 컬럼에 대해 to_json 및 from_json 함수로 JSON 포맷 데이터에 대해 encode/decode 를 편리하게 할 수 있습니다.

## Encode a struct as json

In [16]:
json_str = """
{
  "a": {
    "b": 1
  }
}
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)

# encdoe json_str
df_res = df.select(f.to_json("a").alias("a"))
df_res.printSchema()
df_res.show()

root
 |-- a: string (nullable = true)

+-------+
|      a|
+-------+
|{"b":1}|
+-------+



## Decode json column as a struct 

In [17]:
json_str = """{\"b\":1}"}"""

rdd = sc.parallelize([json_str])
row_rdd = rdd.map(lambda x: Row(x))
schema = t.StructType().add("a", t.StringType())

df = spark.createDataFrame(row_rdd, schema)
df.show()

schema = t.StructType().add("b", t.IntegerType())
df_res = df.select(f.from_json("a", schema).alias("c"))
df_res.printSchema()
df_res.show()

+---------+
|        a|
+---------+
|{"b":1}"}|
+---------+

root
 |-- c: struct (nullable = true)
 |    |-- b: integer (nullable = true)

+---+
|  c|
+---+
|[1]|
+---+



In [18]:
json_str = """{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"""

rdd = sc.parallelize([json_str])
row_rdd = rdd.map(lambda x: Row(x))
schema = t.StructType().add("a", t.StringType())

df = spark.createDataFrame(row_rdd, schema)
df.show(1, False)

schema = t.StructType().add("b", t.StructType().add("x", t.IntegerType()).add("y", t.StringType()))
df_res = df.select(f.from_json("a", schema).alias("c"))
df_res.printSchema()
df_res.show()

+-------------------------+
|a                        |
+-------------------------+
|{"b":{"x":1,"y":{"z":2}}}|
+-------------------------+

root
 |-- c: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- x: integer (nullable = true)
 |    |    |-- y: string (nullable = true)

+--------------+
|             c|
+--------------+
|[[1, {"z":2}]]|
+--------------+



## Parse a set of fields from a column containing JSON

JSON Value 의 String 형태가 JSON 포맷이라면 json_tuple 함수를 통해 바로 파싱후 필드 값을 가져오는 것도 가능합니다.

In [19]:
json_str = """{\"b\":1}"""

rdd = sc.parallelize([json_str])
row_rdd = rdd.map(lambda x: Row(x))
schema = t.StructType().add("a", t.StringType())

df = spark.createDataFrame(row_rdd, schema)
df.createOrReplaceTempView("view")
df.show(1, False)

df_res = df.select(f.json_tuple("a", "b").alias("c"))
df_res.printSchema()
df_res.show()

df_res = spark.sql('select json_tuple(a, "b") as c from view')
df_res.printSchema()
df_res.show()

+-------+
|a      |
+-------+
|{"b":1}|
+-------+

root
 |-- c: string (nullable = true)

+---+
|  c|
+---+
|  1|
+---+

root
 |-- c: string (nullable = true)

+---+
|  c|
+---+
|  1|
+---+



## Parse a well-formed string column

JSON 의 String Type Value 에서 regrexp_extract 를 이용하여 원하는 문자를 추출할 수 있습니다.

In [20]:
json_str = """
[{ "a": "x: 1" }, { "a": "y: 2" }]
"""

rdd = sc.parallelize([json_str])
df = spark.read.json(rdd)
df.createOrReplaceTempView("view")

df_res = df.select(f.regexp_extract("a", "([a-z]):", 1).alias("c"))
df_res.printSchema()
df_res.show()

df_res = spark.sql('select regexp_extract(a, "([a-z]):", 1) as c from view')
df_res.printSchema()
df_res.show()

root
 |-- c: string (nullable = true)

+---+
|  c|
+---+
|  x|
|  y|
+---+

root
 |-- c: string (nullable = true)

+---+
|  c|
+---+
|  x|
|  y|
+---+



In [21]:
spark.stop()