In [1]:
import requests

In [11]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

In [12]:
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")

print("Type of Response: ", type(r))

In [14]:
wc=r.json()

In [15]:
print (type(wc), type(wc[0]))

<class 'list'> <class 'dict'>


In [16]:
wc[0]

{'Competition': 'World Cup',
 'Year': 1930,
 'Team': 'Argentina',
 'Number': '',
 'Position': 'GK',
 'FullName': 'Ãngel Bossio',
 'Club': 'Club AtlÃ©tico Talleres de Remedios de Escalada',
 'ClubCountry': 'Argentina',
 'DateOfBirth': '1905-5-5',
 'IsCaptain': False}

In [21]:
_wcDf=spark.createDataFrame(wc)

별표 1개 *: 리스트에서 인자 풀어내기

예를 들어, range() 함수는 시작과 끝을 인자로 가지는 함수이다. 시작과 끝이라는 2개의 인자를 함수에 넘겨주어 출력해보자.

In [25]:
myList = [1, 6]

In [26]:
list(range(1,6))

[1, 2, 3, 4, 5]

In [28]:
list(range(myList))

TypeError: 'list' object cannot be interpreted as an integer

In [30]:
list(range(*myList))
#시작, 끝 변수가 따로 없을 경우, 리스트를 넘겨주고 여기서 
#풀어서 하나씩 인자를 사용할 경우 *를 사용한다.

[1, 2, 3, 4, 5]

In [31]:
def f(args):
    for i in args:
        print(i, end="~")

f(0, 1, 2, 3)
#복수의 인자를 받을경우 *사용

TypeError: f() takes 1 positional argument but 4 were given

In [32]:

def f(*args):
    for i in args:
        print(i, end="~")

f(0, 1, 2, 3)

0~1~2~3~

### 별표 2 개 **: dictionary에서 인자 풀어내기
dictionary를 넘겨주고 여기서 key, value를 하나씩 사용할 경우 **를 사용한다.

In [33]:

def printCapital(name, year):
    print(f"{name} in {year}")

myDict = {"name": "jsl", "year": 2020}
printCapital(**myDict)

jsl in 2020


dictionary인자를 풀어서 Row에 넘겨주기

In [23]:
from pyspark.sql import Row

wcDf = spark.createDataFrame(Row(**x) for x in wc)

In [24]:
wcDf.printSchema()

root
 |-- Competition: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)



In [34]:
wcDf.take(1)

[Row(Competition='World Cup', Year=1930, Team='Argentina', Number='', Position='GK', FullName='Ãngel Bossio', Club='Club AtlÃ©tico Talleres de Remedios de Escalada', ClubCountry='Argentina', DateOfBirth='1905-5-5', IsCaptain=False)]

## RDD생성
RDD를 통해 DF생성

In [35]:
wcRdd=spark.sparkContext.parallelize(wc)

In [36]:
wcRdd.take(1)

[{'Competition': 'World Cup',
  'Year': 1930,
  'Team': 'Argentina',
  'Number': '',
  'Position': 'GK',
  'FullName': 'Ãngel Bossio',
  'Club': 'Club AtlÃ©tico Talleres de Remedios de Escalada',
  'ClubCountry': 'Argentina',
  'DateOfBirth': '1905-5-5',
  'IsCaptain': False}]

In [37]:
wcDfFromRdd = spark.createDataFrame(wcRdd)
wcDfFromRdd.printSchema()

root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)



## 결측 값
* null은 결측, 즉 "no value" 또는 "nothing"을 말한다.
* NaN은 "Not a Number", 즉 수학에서 0.0/0.0과 같이 의미가 없는 연산의 결과를 말한다.

IsCaptain 항목은 boolean 타입이라 isnan() 함수가 오류를 발생한다. 'isnan(`IsCaptain`)' due to data type mismatch: argument 1 requires (double or float) type, however, '`IsCaptain`' is of boolean type. 그래서 항목에서 제거하고 확인하기로 하자.

In [43]:
cols = wcDf.columns

In [44]:
cols.remove('IsCaptain')

In [45]:
from pyspark.sql.functions import isnan, when, count, col
wcDf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()

+-----------+----+----+------+--------+--------+----+-----------+-----------+
|Competition|Year|Team|Number|Position|FullName|Club|ClubCountry|DateOfBirth|
+-----------+----+----+------+--------+--------+----+-----------+-----------+
|          0|   0|   0|     0|       0|       0|   0|          0|          0|
+-----------+----+----+------+--------+--------+----+-----------+-----------+



## 형변환
스키마를 자동으로 유추한 형식을 보면, DateOfBirth, Number 컬럼이 문자열로 인식되어 있어 만족스럽지 못하다. 컬럼 'DateOfBirth'는 'DoB'로 DateType(), 컬럼 'Number'는 'NumberInt'로 "Integer" 형으로 설정해보자.

* DateType 형변환

Python datetime을 사용한 DateType 컬럼 생성
Python datetime은 날짜 문자열을 다음과 같은 년, 월, 일 형식으로 변환해준다.

In [46]:
from datetime import datetime
print (datetime.strptime("11/25/1991", '%m/%d/%Y'))

1991-11-25 00:00:00


In [47]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
toDate = udf(lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

In [51]:
wcDf = wcDf.withColumn('date1', toDate(wcDf['DateOfBirth']))

In [52]:
wcDf.take(1)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 200, in _batched
    for item in iterator:
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in mapper
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in <genexpr>
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 83, in <lambda>
  File "C:\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-47-96a4807fab93>", line 3, in <lambda>
  File "C:\Anaconda3\lib\_strptime.py", line 568, in _strptime_datetime
    tt, fraction, gmtoff_fraction = _strptime(data_string, format)
  File "C:\Anaconda3\lib\_strptime.py", line 349, in _strptime
    raise ValueError("time data %r does not match format %r" %
ValueError: time data '1905-5-5' does not match format '%m/%d/%Y'


In [53]:
wcDf = wcDf.drop('date1')

In [59]:
from pyspark.sql.functions import to_date

_wcDfCasted=wcDf.withColumn('date2', to_date(wcDf['DateOfBirth'], 'yyyy-MM-dd'))

In [60]:
wcDf.take(1)

[Row(Competition='World Cup', Year=1930, Team='Argentina', Number='', Position='GK', FullName='Ãngel Bossio', Club='Club AtlÃ©tico Talleres de Remedios de Escalada', ClubCountry='Argentina', DateOfBirth='1905-5-5', IsCaptain=False)]

In [58]:

from pyspark.sql.types import DateType

wcDfCasted = _wcDfCasted.withColumn('date3', _wcDfCasted['DateOfBirth'].cast(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', wcDfCasted['Number'].cast("integer"))

In [57]:
wcDfCasted.printSchema()

root
 |-- Competition: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- date2: date (nullable = true)
 |-- date3: date (nullable = true)
 |-- NumberInt: integer (nullable = true)




그대로 출력하면 오류가 발생할 수 있다. SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0라는 오류가 발생하면 spark.sql.legacy.timeParserPolicy=LEGACY라고 설정을 변경해주어야 한다.

아래와 같이 to_data() 또는 cast() 함수를 사용하여 일자 형변환 결과를 올바르게 출력하고 있다.

In [61]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
wcDfCasted.take(1)

[Row(Competition='World Cup', Year=1930, Team='Argentina', Number='', Position='GK', FullName='Ãngel Bossio', Club='Club AtlÃ©tico Talleres de Remedios de Escalada', ClubCountry='Argentina', DateOfBirth='1905-5-5', IsCaptain=False, date2=datetime.date(1905, 5, 5), date3=datetime.date(1905, 5, 5), NumberInt=None)]

앞서 사용했던 _myDf를 parquet형식으로 저장해보자.

In [63]:
import os
import requests
from pyspark.sql import Row
from pyspark.sql.types import DateType

import pyspark
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"

myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# read url json
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")
wc=r.json()

# read dictionary into Row
wcDf = spark.createDataFrame(Row(**x) for x in wc)

# cast DoB string into date, Number string into integer
wcDfCasted = wcDf.withColumn('date3', wcDf['DateOfBirth'].cast(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', wcDfCasted['Number'].cast("integer"))

wcDfCasted.take(1)

[Row(Competition='World Cup', Year=1930, Team='Argentina', Number='', Position='GK', FullName='Ãngel Bossio', Club='Club AtlÃ©tico Talleres de Remedios de Escalada', ClubCountry='Argentina', DateOfBirth='1905-5-5', IsCaptain=False, date3=datetime.date(1905, 5, 5), NumberInt=None)]

## S.5 DataFrame API 사용해 보기
빈 DataFrame 생성

비어있는 DataFrame을 생성하려면, 빈 schema를 설정하고 emptyRdd()를 사용해준다.

In [64]:
spark.range(0, 10, 2).show()

+---+
| id|
+---+
|  0|
|  2|
|  4|
|  6|
|  8|
+---+



In [65]:
tDf = spark\
    .read\
    .options(header='false', inferschema='true', delimiter='\t')\
    .csv(os.path.join('data', 'ds_spark_heightweight.txt'))

In [66]:
tDf.columns

['_c0', '_c1', '_c2']

In [67]:
tDf = tDf.withColumn("id", tDf._c0.cast("integer"))
tDf = tDf.withColumn("height", tDf['_c1'].cast("double"))
tDf = tDf.withColumn("weight", tDf['_c2'].cast("double"))

In [69]:
tDf.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- id: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)



In [70]:
tDf = tDf.drop('_c0').drop('_c1').drop('_c2')

In [71]:
tDf.printSchema()

root
 |-- id: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: double (nullable = true)



In [72]:
tDf.take(1)

[Row(id=1, height=65.78, weight=112.99)]


## 사용자정의 함수 udf
UDF User Defined Functions는 사용자 정의함수로서 DataFrame의 withColumn() 함수와 같이 사용되어 새로운 컬럼을만드는 경우 유용하다. 보통 함수와 같이 함수명과 반환 값을 미리 정의해서 lambda 함수 또는 다른 함수를 사용할 수 있다. 다른 함수를 직접 사용할 수 없고, udf()를 통해서 호출해야 한다. udf()는 코드가 복잡한 경우에 함수를 분리해서 처리하면 유용하겠다.

앞서 Pandas로 저장한 csv파일을 읽어서 DataFrame을 만들어 보자. header를 있는 그대로 가져오고, schema도 현재 데이터에서 추정하도록 설정한다.

In [73]:
myDf = spark\
        .read.format('com.databricks.spark.csv')\
        .options(header='true', inferschema='true')\
        .load(os.path.join('data','myDf.csv'))

In [74]:
myDf.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



In [75]:

myDf.show()

+---+----+-------+------+
|_c0|year|   name|height|
+---+----+-------+------+
|  0|   1|kim, js|   170|
|  1|   1|lee, sm|   175|
|  2|   2|lim, yg|   180|
|  3|   2|    lee|   170|
+---+----+-------+------+



In [76]:

def uppercase(s):
    return s.upper()

In [77]:

uppercase("a")

'A'

In [78]:
myDf = myDf.withColumn("nameUpper", uppercase(myDf.name))

TypeError: 'Column' object is not callable

In [79]:

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

upperUdf = udf(uppercase, StringType())

In [80]:

myDf = myDf.withColumn("nameUpper", upperUdf(myDf['name']))

In [81]:

myDf.show()

+---+----+-------+------+---------+
|_c0|year|   name|height|nameUpper|
+---+----+-------+------+---------+
|  0|   1|kim, js|   170|  KIM, JS|
|  1|   1|lee, sm|   175|  LEE, SM|
|  2|   2|lim, yg|   180|  LIM, YG|
|  3|   2|    lee|   170|      LEE|
+---+----+-------+------+---------+

