# 타입생성기
### 동기
UDF 를 사용할때, 함수가 복잡한 타입을 리턴할때, 매번 udf 타입을 설정하기 번거롭다. <br>
만약.. 흔하지 않지만.. 다음과 같은 타입을 리턴하는 udf 를 사용한다면? 
```python
def myUdf(data):
  """
  ....연산 중략 ....
  """
  res = {
         "A":[1,2,3,345],
         "B":{"C":[1,2,3]},
         "E":{"A":1,"B":0.3,"C":[[1,2],[3,3]]},
         "F":["string1","string2"]
         }
  return res 

u_myUdf = F.udf(myUdf, T.StringType([..ㅜ_ㅜ ..])
```
무척 복잡한 타입을 설정해줘야 한다.<br>
만약 myUdf를 수정해서 리턴타입이 달라진다면, 또 번거로운 작업을 해야한다

### 문제접근
만약 myUdf가 리턴하는 데이터를 입력받고, pyspark의 F.udf 함수 인자들어가는 타입객체를 출력하는 프로그램이 있다면,<br>
유용하지 않을까 생각했다

In [3]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from datetime import datetime, timedelta

## 0. 타입을 추론하는 함수 
``` scala
def typeInferer[tree](myData : tree):String
```
udf함수가 리턴하는 데이터가 트리구조 라고 생각하고, 깊이 우선탐색을 하면서, 안에서 밖으로 문자열 형태의 타입객체를 만든다.

In [5]:
ptypePStypeHash = {list:"T.ArrayType",
             dict:"T.StructType",
             set:"T.ArrayType",
             "long":"T.LongType()",
             "int":"T.IntegerType()",
             float:"T.FloatType()",
             str:"T.StringType()",
             bool:"T.BooleanType()",
             "date":"T.DateType()",
             "datetime":"T.TimestampType()"
            }

def typeInfer(data):
  """
   depth 가 깊은 데이터도 타입을 가져올수 있음
  """
  try:
    re
  except:
    import re   
  typeStr = ""
  dataType = type(data)
  if dataType  == list or dataType  == set:
    typeStr = ptypePStypeHash[dataType] +"(" + typeInfer(data[0]) +",True)"
  elif dataType == dict:
    keys = data.keys()
    typeStr = ptypePStypeHash[dataType] + "(["
    lenKey = len(keys)
    for i, keyi in enumerate(keys):
        typeStr += "T.StructField("+'"'+str(keyi)+'"'+","
        typeStr += typeInfer(data[keyi])
        if i+1 == lenKey:
            typeStr += ",True)"
        else:
            typeStr += ",True),"
    typeStr += "])"
  
  elif dataType == int :
    if abs(data) >= 2**41 :
      typeStr += ptypePStypeHash["long"]
    else:
      typeStr += ptypePStypeHash["int"]
  elif re.match(".*(datetime')",str(dataType)):
      dataType = "datetime"
      typeStr = ptypePStypeHash[dataType]
  elif re.match(".*(date')",str(dataType)):
      dataType = "date"
      typeStr = ptypePStypeHash[dataType]
  else:
      try:
        typeStr = ptypePStypeHash[dataType]
      except:
        typeStr = ptypePStypeHash[str]
  return typeStr

#### 0-1.테스트데이터를 만들어본다

In [7]:
from random import randint
ranType = ["list","int","dict","float","long"]#,"date","datetime"]

def randomData(depth = 0):
  """
  테스트를 위해서 랜덤 데이터 생성
  """
  if depth  == 4:
    intOrLong = [randint(0,10000),2**42+100]
    return intOrLong[randint(0,1)]
  data = None
  if depth == 0:
    ranT = "dict"
  else:
    ranT = ranType[randint(0,len(ranType)-1)]
  if ranT == "list":
    data = [randomData(depth+1)]
  elif ranT ==  "dict":
    data = {chr(i+65): randomData(depth+1) for i in range(0,randint(3,10)) }
  elif ranT == "float":
    ranI,ranII = randint(1,100),randint(2,200)
    data = ranI/ranII
  elif ranT == "int":
    data = randint(1,100000)
  elif ranT == "date":
    data = datetime.now().date().strftime("%Y-%m-%d ")
  elif ranT == "datetime":
    data = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  else:
    data = 2**41 +10
  return data
    
  

#### 0-2.무작위 DataFrame을 생성한다

In [9]:
import json
j = randomData()
a=[json.dumps(j)]
jsonRDD = sc.parallelize(a)
dfjosn = spark.read.json(jsonRDD)
display(dfjosn)

A,B,C,D,E,F,G,H,I
2199023255562,29546,2199023255562,45937,"List(1.8333333333333333, 2199023255562, 2199023255562, List(List(4247), 1.9795918367346939, 95036, 13854, 54294, List(4398046511204), 81873, List(362), 99259, 2199023255562), List(2199023255562, List(4398046511204, 4398046511204, 4398046511204, 4398046511204, 1894, 4737, 4398046511204, 1837, 4398046511204, 4398046511204), 61423, List(4007)), 2199023255562, List(List(8641, 4398046511204, 4209, 8151, 3947, 2652, 4398046511204)), List(List(4398046511204, 8400, 6014, 5197, 2346), 0.29906542056074764, List(4398046511204), List(4398046511204), List(4398046511204), 10006, 42406), List(29329))",0.1558441558441558,7.7,2199023255562,0.6240601503759399


#### 0-3.테스트함수
##### 되도록 복잡한 타입을 리턴하는 함수를 만든다.
1. flatJson
  - 깊이가있는 json을 flatten한다.(하위 구조체는 . 으로 구분)

2. combineJson
  - flatten 된 각 Array&lt;json&gt;을 묶는다

In [11]:
def flatJson(data):
  typeStr = str(type(data))
  if typeStr == "<class 'list'>":
    return [flatJson(i) for i in data]  
  elif typeStr == "<class 'pyspark.sql.types.Row'>":  
    res = {}
    rowDict = data.asDict()
    cols = rowDict.keys()
    for col in cols:
      val = rowDict[col]
      valTypeStr = str(type(val))
      if valTypeStr == "<class 'pyspark.sql.types.Row'>":
        val = flatJson(val)
        for colj in val.keys():
          res.update({col+"."+colj: val[colj] })
      else:
        res.update({col:flatJson(val) })
    return res
  else:
    return data

def combineColumn(*cols):
  res =  {chr(i+65):flatJson(col) for i,col in enumerate(cols) }
  res.update({"createdatetime":datetime.now(),"createdate":datetime.now().date()})
  return res

## 1. 테스트 출력데이터 생성
테스트데이터를 입력해서 테스트-리턴데이터를 받는다.<br>
__테스트-리턴데이터는 타입을 추론하는 함수의 입력데이터로 쓰인다.__

In [13]:
cols  = [i for i in dfjosn.collect()[0]]
sampleFuncResult = combineColumn(*cols)

#### 테스트함수가 반환하는 데이터샘플

In [15]:
sampleFuncResult

## 2. 타입객체생성
타입객체를 생성한다. F.udf함수의 두번째 인자로 쓰일것이다.

In [17]:
ReturnTypeString = typeInfer(sampleFuncResult)
ReturnType = eval(ReturnTypeString)

#### 생성된 타입객체

In [19]:
ReturnType

## 3. UDF테스트 
복잡한 형태를 반환하는 udf지만, 알맞게 타입이 추론되었다.

In [21]:
dfjsonCols = [F.col(col) for col in dfjosn.columns]

In [22]:
resulfdf = dfjosn.withColumn("funcResult",F.udf(combineColumn, ReturnType )(*dfjsonCols)).select("funcResult")

In [23]:
display(resulfdf)

funcResult
"List(2199023255562, 0.62406015, 2018-11-24, 2199023255562, 29546, 2018-11-24T17:33:21.744+0000, 0.15584415, 7.7, 2199023255562, 45937, List(4398046511204, List(362), 61423, 10006, 4398046511204, 4398046511204, List(List(4209, 4398046511204, 2652, 4398046511204, 8641, 8151, 3947)), 4398046511204, 8400, List(4247), 95036, 1894, 4737, 6014, List(4398046511204), 5197, List(4007), 99259, 0.2990654, 1837, 1.9795918, List(4398046511204), 2199023255562, 4398046511204, List(4398046511204), 2199023255562, 13854, 2199023255562, 2346, 4398046511204, 81873, List(4398046511204), 2199023255562, 42406, 4398046511204, 4398046511204, 1.8333334, List(29329), 54294, 2199023255562))"
