In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [16]:
from datetime import date, datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *


# 1. DataFrame 생성

- SparkSession 객체를 사용해 DataFrame을 생성할 수 있다.
- SparkSession 객체는 pyspark shell을 실행할 때 spark 라는 이름으로 미리 생성된다.



## Row 객체를 사용해 생성하기

- row : DataFrame에서의 한 행

In [2]:
# !pip install pandas
# !pip install pyarrow

import pandas as pd
from datetime import date, datetime
from pyspark.sql import *

# Row class의 생성자로 keyword args를 전달해 생성
df = spark.createDataFrame([
    Row(name='하명도', age=15, birth=date(2022,7,22)),
    Row(name='이제동', age=20, birth=date(2021,7,22)),
    Row(name='김명운', age=25, birth=date(2020,7,22))
])

df.show()



[Stage 1:>                                                          (0 + 1) / 1]

+------+---+----------+
|  name|age|     birth|
+------+---+----------+
|하명도| 15|2022-07-22|
|이제동| 20|2021-07-22|
|김명운| 25|2020-07-22|
+------+---+----------+



                                                                                

In [3]:
# 스키마 확인
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- birth: date (nullable = true)



## schema를 명시하여 DataFrame 생성

In [4]:
# 튜플에 데이터를 저장하고 스키마를 직접 지정

df2 = spark.createDataFrame([
    ('하명도', 15, date(2022,7,22)),
    ('이제동', 20, date(2021,7,22)),
    ('김명운', 25, date(2020,7,22))
],schema='name string, age int, birth date')

df2.show()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

+------+---+----------+
|  name|age|     birth|
+------+---+----------+
|하명도| 15|2022-07-22|
|이제동| 20|2021-07-22|
|김명운| 25|2020-07-22|
+------+---+----------+



## StructType 객체를 사용해 Schema 지정

In [5]:
from pyspark.sql.types import *

data = [
    ('하명도', 15, date(2022,7,22)),
    ('이제동', 20, date(2021,7,22)),
    ('김명운', 25, date(2020,7,22))
]

# 자바 문서나 스칼라 문서를 찾아볼 것
# 파이썬 문서는 상속계층이나 패키지별로 정리가 안되어있고 내용별로 정리되어 있어서 사람이 볼 것이 못 된다...
schema = StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False),
    StructField('birth', DateType(), False)
])

df3 = spark.createDataFrame(data=data, schema=schema)
df3.printSchema()
df3.show()


root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- birth: date (nullable = false)

+------+---+----------+
|  name|age|     birth|
+------+---+----------+
|하명도| 15|2022-07-22|
|이제동| 20|2021-07-22|
|김명운| 25|2020-07-22|
+------+---+----------+



## 중첩스키마적용

In [6]:
data = [
    ('하명도', 15, date(2022,7,22), ('010','1111','2222')),
    ('이제동', 20, date(2021,7,22), ('010','2222','3333')),
    ('김명운', 25, date(2020,7,22), ('010','4444','5555'))
]

schema = StructType([
    StructField('name', StringType(), False, {"desc":"이름"}),
    StructField('age', IntegerType(), False, {"desc":"나이"}),
    StructField('birth', DateType(), False, {"desc":"생일"}),
    StructField('phone', StructType([
        StructField('phone1', StringType(), True),
        StructField('phone2', StringType(), True),
        StructField('phone3', StringType(), True),
    
    ]), False, {"desc":"전화번호"})
])

df4 = spark.createDataFrame(data=data, schema=schema)
df4.printSchema()
df4.show()

# 스키마를 json으로 변환하여 확인
schema_json = df4.schema.json()

# 이름 => \uc774\ub984  유니코드로 나온다
print(schema_json)


print('-----------------------------------------------------')

# 바이트코드로 변환한 뒤 다시 문자열 디코딩을 할 때 unicode_escape 옵션을 추가
schema_json = schema_json.encode().decode('unicode_escape')
print(schema_json)                                          



root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- birth: date (nullable = false)
 |-- phone: struct (nullable = false)
 |    |-- phone1: string (nullable = true)
 |    |-- phone2: string (nullable = true)
 |    |-- phone3: string (nullable = true)

+------+---+----------+-----------------+
|  name|age|     birth|            phone|
+------+---+----------+-----------------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|
|이제동| 20|2021-07-22|{010, 2222, 3333}|
|김명운| 25|2020-07-22|{010, 4444, 5555}|
+------+---+----------+-----------------+

{"fields":[{"metadata":{"desc":"\uc774\ub984"},"name":"name","nullable":false,"type":"string"},{"metadata":{"desc":"\ub098\uc774"},"name":"age","nullable":false,"type":"integer"},{"metadata":{"desc":"\uc0dd\uc77c"},"name":"birth","nullable":false,"type":"date"},{"metadata":{"desc":"\uc804\ud654\ubc88\ud638"},"name":"phone","nullable":false,"type":{"fields":[{"metadata":{},"name":"phone1","nullable":true,"type":"string"},{"metad

## Pandas DataFrame으로 생성

In [7]:
pandas_df = pd.DataFrame({
    'name':['하명도','이제동','김명운'],
    'age':[20, 21, 22],
    'birth':[date(2022,7,1),date(2022,7,2),date(2022,7,3)]
})

type(pandas_df)

df3 = spark.createDataFrame(pandas_df)
df3.show()


pandas.core.frame.DataFrame

+------+---+----------+
|  name|age|     birth|
+------+---+----------+
|하명도| 20|2022-07-01|
|이제동| 21|2022-07-02|
|김명운| 22|2022-07-03|
+------+---+----------+



## DataFrame -> Pandas

In [8]:
pandas_df2 = df3.toPandas()
pandas_df2

# 스파크의 DataFrame을 사용하는 것이 성능상 더 이득
# 스파크는 병렬처리도 해주고... 쿼리실행 최적화도 해주고...
# 하지만 스파크 api가 Pandas에 비해 제공되는 기능이 적어서
# Pandas를 써야만 해결이 가능하다면 Pandas로 가공 이후 스파크 DataFrame으로 변환도 가능

Unnamed: 0,name,age,birth
0,하명도,20,2022-07-01
1,이제동,21,2022-07-02
2,김명운,22,2022-07-03


## DataFrame -> pyspark.pandas


In [9]:
# !pip install pyarrow
df3.to_pandas_on_spark()
type(df3)

                                                                                

Unnamed: 0,name,age,birth
0,하명도,20,2022-07-01
1,이제동,21,2022-07-02
2,김명운,22,2022-07-03


pyspark.sql.dataframe.DataFrame

## 외부파일을 사용해 DataFrame 생성

In [10]:
class_df = spark.read.csv('/dataframe/a_class_info.csv', header=True)
class_df.show(5)
class_df.show(1, vertical=True)


                                                                                

+--------+------+-------------+--------+-----------+-------------+
|class_cd|school|class_std_cnt|     loc|school_type|teaching_type|
+--------+------+-------------+--------+-----------+-------------+
|     6OL| ANKYI|           20|   Urban| Non-public|     Standard|
|     ZNS| ANKYI|           21|   Urban| Non-public|     Standard|
|     2B1| CCAAW|           18|Suburban| Non-public| Experimental|
|     EPS| CCAAW|           20|Suburban| Non-public| Experimental|
|     IQN| CCAAW|           15|Suburban| Non-public| Experimental|
+--------+------+-------------+--------+-----------+-------------+
only showing top 5 rows

-RECORD 0-------------------
 class_cd      | 6OL        
 school        | ANKYI      
 class_std_cnt | 20         
 loc           | Urban      
 school_type   | Non-public 
 teaching_type | Standard   
only showing top 1 row



## DataFrame 컬럼

- withColumn

In [30]:
data = [
    ('하명도', 15, date(2022,7,22), ('010','1111','2222')),
    ('이제동', 20, date(2021,7,22), ('010','2222','3333')),
    ('김명운', 25, date(2020,7,22), ('010','4444','5555')),
    ('홍진호', 36, date(2018,7,22), ('010','3333','4444'))
]

schema = StructType([
    StructField('name', StringType(), False, {"desc":"이름"}),
    StructField('age', IntegerType(), False, {"desc":"나이"}),
    StructField('birth', DateType(), False, {"desc":"생일"}),
    StructField('phone', StructType([
        StructField('phone1', StringType(), True),
        StructField('phone2', StringType(), True),
        StructField('phone3', StringType(), True),
    
    ]), False, {"desc":"전화번호"})
])

col_df = spark.createDataFrame(data, schema=schema)


In [32]:
# withColumn : 컬럼이름, 컬럼
# lit : column 객체를 literal로 만들어주는 함수

# 원하는 컬럼을 DataFrame에 추가
col_df.withColumn('우승여부', lit('')).show()

# 값을 지정해서 추가
col_df.withColumn('우승여부', lit('우승')).show()

# when - otherwise : 조건에 따라 원하는 컬럼객체를 반환
temp = col_df.withColumn('연령대', when(floor(col_df.age / 10) == 1, '10대')
                           .when(floor(col_df.age / 10) == 2, '20대')
                           .otherwise('30대 이상')
                 )

temp.show()

+------+---+----------+-----------------+--------+
|  name|age|     birth|            phone|우승여부|
+------+---+----------+-----------------+--------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|        |
|이제동| 20|2021-07-22|{010, 2222, 3333}|        |
|김명운| 25|2020-07-22|{010, 4444, 5555}|        |
|홍진호| 36|2018-07-22|{010, 3333, 4444}|        |
+------+---+----------+-----------------+--------+

+------+---+----------+-----------------+--------+
|  name|age|     birth|            phone|우승여부|
+------+---+----------+-----------------+--------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|    우승|
|이제동| 20|2021-07-22|{010, 2222, 3333}|    우승|
|김명운| 25|2020-07-22|{010, 4444, 5555}|    우승|
|홍진호| 36|2018-07-22|{010, 3333, 4444}|    우승|
+------+---+----------+-----------------+--------+

+------+---+----------+-----------------+---------+
|  name|age|     birth|            phone|   연령대|
+------+---+----------+-----------------+---------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|     10대|
|이제동| 20|2021-07-22|{

### column  내용  변경

In [36]:
# when - otherwise : 조건에 따라 원하는 컬럼객체를 반환
temp = temp.withColumn('연령대', when(floor(col_df.age / 10) == 1, '어린이')
                           .when(floor(col_df.age / 10) == 2, '청년')
                           .when(floor(col_df.age / 10) == 3, '성인')
                 )

temp.show()

+------+---+----------+-----------------+------+
|  name|age|     birth|            phone|연령대|
+------+---+----------+-----------------+------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|어린이|
|이제동| 20|2021-07-22|{010, 2222, 3333}|  청년|
|김명운| 25|2020-07-22|{010, 4444, 5555}|  청년|
|홍진호| 36|2018-07-22|{010, 3333, 4444}|  성인|
+------+---+----------+-----------------+------+



### column 이름 변경

In [39]:
temp = temp.withColumnRenamed('연령대','분류')
temp.show()

+------+---+----------+-----------------+------+
|  name|age|     birth|            phone|  분류|
+------+---+----------+-----------------+------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|어린이|
|이제동| 20|2021-07-22|{010, 2222, 3333}|  청년|
|김명운| 25|2020-07-22|{010, 4444, 5555}|  청년|
|홍진호| 36|2018-07-22|{010, 3333, 4444}|  성인|
+------+---+----------+-----------------+------+



### column  삭제

In [40]:
temp = temp.drop('분류')
temp.show()

+------+---+----------+-----------------+
|  name|age|     birth|            phone|
+------+---+----------+-----------------+
|하명도| 15|2022-07-22|{010, 1111, 2222}|
|이제동| 20|2021-07-22|{010, 2222, 3333}|
|김명운| 25|2020-07-22|{010, 4444, 5555}|
|홍진호| 36|2018-07-22|{010, 3333, 4444}|
+------+---+----------+-----------------+



# 2. DataFrame 사용 하기

참고 : https://spark.apache.org/docs/3.2.0/api/scala/org/apache/spark/sql/Dataset.html 

- DataFrame의 메서드의 구분
 - transformation
 - action
 - Basic Dataset functions  
 
 
- DataFrame의 사용은 SQL 쿼리 구조를 따라간다