In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
from datetime import date, datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *


# 1. DataFrame 생성

### SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
- data : RDD or iterable
- scheam : pyspark.sql.types.DataType, str or list, optional
- samplingRatio : the sample ratio of rows used for inferring
- verifySchema : verify data types of every row against schema. Enabled by default

- SparkSession 객체를 사용해 DataFrame을 생성할 수 있다.
- SparkSession 객체는 pyspark shell을 실행할 때 spark 라는 이름으로 미리 생성된다.

## Row 객체를 사용해 생성하기

- row : DataFrame에서의 한 행

In [1]:
# !pip install pandas
# pyarrow 아파치가 제공한 빅데이터용 인메모리 분석 플랫
# !pip install pyarrow

In [3]:
import pandas as pd
from datetime import date, datetime
from pyspark.sql import *

In [6]:
##Spark.Row 클래스
??Row

[0;31mInit signature:[0m [0mRow[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m        
[0;32mclass[0m [0mRow[0m[0;34m([0m[0mtuple[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m    [0;34m"""[0m
[0;34m    A row in :class:`DataFrame`.[0m
[0;34m    The fields in it can be accessed:[0m
[0;34m[0m
[0;34m    * like attributes (``row.key``)[0m
[0;34m    * like dictionary values (``row[key]``)[0m
[0;34m[0m
[0;34m    ``key in row`` will search through row keys.[0m
[0;34m[0m
[0;34m    Row can be used to create a row object by using named arguments.[0m
[0;34m    It is not allowed to omit a named argument to represent that the value is[0m
[0;34m    None or missing. This should be explicitly set to None in this case.[0m
[0;34m[0m
[0;34m    .. versionchanged:: 3.0.0[0m
[0;34m        Rows created from named arguments no longer have[0m
[0;34m      

In [None]:
a = 1, 2, 3
type(a)

In [2]:
# spark.Row는 명명된 인수를 사용하여 행 개체를 만드는 데 사용할 수 있음
row = Row(name = '김철수', age = 15, birth = date(2022,7,17))
row

In [None]:
row['name']

In [3]:
# Row class의 생성자로 keyword args를 전달해 생성
df = spark.createDataFrame([
    Row(name = '김철수', age = 15, birth = date(2022,7,22)),
    Row(name = '이제동', age = 20, birth = date(2021,7,22)),
    Row(name = '김명운', age = 55, birth = date(1998,7,22)),
    ]) # 지연연산

df # df 객체 변수명만 출력하면 df의 컬럼과 컬럼의 type만 반환이 된다.
# spark.df 의 내용(원소 값들)을 확인하고자 한다면 show() 함수를 사용한다.
# 단, show() 함수는 모든 값을 다 보여주는 것은 아니다. head 랑 비슷한 함수라 위쪽 20개 data만 출력해준다.
# show(n) n개만 반환해줄 수 있다.
df.show()

- 논리계획 최적화를 위해 스키마를 생성
- 스키마 지정하지 않으면 자동 생성 된다.
- 스키마(구조) 확인 : df.printSchema()

In [4]:
# 스키마(구조) 확인
df.printSchema()

## schema를 명시하여 DataFrame 생성

In [5]:
# 튜플에 데이터를 저장하고 스키마(pands df의 column)를 직접 지정
df2 = spark.createDataFrame([
       Row(name='김철수',age=15,birth=date(2022,7,22)),
       Row(name='이제동',age=20,birth=date(2021,7,22)),
       Row(name='김명운',age=25,birth=date(1998,7,22))
            ], schema='name string, age int, birth date')
df2.show()
df2.printSchema()

## StructType 객체를 사용해 Schema 지정

In [13]:
??StructField

[0;31mInit signature:[0m [0mStructField[0m[0;34m([0m[0mname[0m[0;34m,[0m [0mdataType[0m[0;34m,[0m [0mnullable[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m [0mmetadata[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m        
[0;32mclass[0m [0mStructField[0m[0;34m([0m[0mDataType[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""A field in :class:`StructType`.[0m
[0;34m[0m
[0;34m    Parameters[0m
[0;34m    ----------[0m
[0;34m    name : str[0m
[0;34m        name of the field.[0m
[0;34m    dataType : :class:`DataType`[0m
[0;34m        :class:`DataType` of the field.[0m
[0;34m    nullable : bool, optional[0m
[0;34m        whether the field can be null (None) or not.[0m
[0;34m    metadata : dict, optional[0m
[0;34m        a dict from string to simple type that can be toInternald to JSON automatically[0m
[0;34m[0m
[0;34m    Examples[0m
[0;34m    --------[0m
[0;34m    >>> (StructField("f1", 

In [None]:
from pyspark.sql.types import *

In [None]:
data = [
    ('김철수', 15, date(2022,7,22)),
    ('이제동', 20, date(2021,7,22)),
    ('김명운', 25, date(2020,7,22))
]
    
# 스키마 생성 StructType 객체 이용
schema = StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False),
    StructField('birth', DateType(), False),
])

df3 = spark.createDataFrame(data=data, schema = schema)

In [None]:
df3.printSchema()
df3.show()

## 중첩스키마적용

In [14]:
data = [
    ('김철수', 15, date(2022,7,22), ('010','1111','2222')),
    ('이제동', 20, date(2021,7,22), ('010','2222','3333')),
    ('김명운', 25, date(2020,7,22), ('010','4444','5555'))
]
# StructField(name, dataType, nullable=True, metadata=None) metadata 는 설명문들을 저장, {}(dict) 로 구성해야 한다.

schema = StructType([
    StructField('name', StringType(), False, {'desc' : '이름'}),
    StructField('age', IntegerType(), False, {'desc' : '나이'}),
    StructField('birth', DateType(), False,  {'desc' : '생일'}),
    StructField('phone', StructType([
        StructField('phone1', StringType(), True),
        StructField('phone2', StringType(), True),
        StructField('phone3', StringType(), True)]), False, {'desc' : '전화번호'}) # 중첩스키마
])

df4 = spark.createDataFrame(data = data, schema = schema)

In [None]:
df4.printSchema()
df4.show()

In [6]:

# 스키마를 json으로 변환하여 확인
sch_json = df4.schema.json()

# 이름 => \uc774\ub984  유니코드로 나온다
print(sch_json)
print('-----------------------------------------------------')
schema_json = sch_json.encode().decode('unicode_escape')
# 바이트코드로 변환한 뒤 다시 문자열 디코딩을 할 때 unicode_escape 옵션을 추가
print(schema_json)      

-----------------------------------------------------


## Pandas DataFrame으로 생성
- 스키마를 포함하고 있는 spark dataframe 은 pandas dataframe 으로 변환 가능
    - spark df 는 pandas df 보다 기능이 적게 구성되어 있음
    - pandas 의 기능이 필요할 때 변환해서 사용
    - sparkDF.toPandas(): sparkDF -> pandasDF
    - spark.createDataFrame(pandasDF) : pandasDF -> sparkDF
        - sparkDF로 변환 시 pd.DataFrame.iteritems 속성값을 전달해야 함
            - pandas 2.0 이상 버전에서는 pd.DataFrame.iteritems 이 빈 값으로 존재
            - pd.DataFrame.items 라는 속성에 iteritems 값이 들어 있다.

In [21]:
pandas_df = pd.DataFrame({
    'name':['김철수','이제동','김명운'],
    'age':[20, 21, 22],
    'birth':[date(2022,7,1),date(2022,7,2),date(2022,7,3)]
})

type(pandas_df)
pandas_df

pandas.core.frame.DataFrame

In [7]:
## pandas 2.0 버전 이상부터 iteritems atrr이 items로 변경됨
# sprk.createDataFrame은 pd.DataFrame.iteritems를 사용하므로 변경 반영 후 사용해야 함
pd.DataFrame.iteritems = pd.DataFrame.items
df_pd_sp = spark.createDataFrame(pandas_df)
df_pd_sp.show()

## DataFrame -> Pandas

In [8]:
# 스파크의 DataFrame을 사용하는 것이 성능상 더 이득
# 스파크는 병렬처리도 해주고... 쿼리실행 최적화도 해주고...
# 하지만 스파크 api가 Pandas에 비해 제공되는 기능이 적어서
# Pandas를 써야만 해결이 가능하다면 Pandas로 가공 이후 스파크 DataFrame으로 변환도 가능
df_pd_sp = df_pd_sp.toPandas()
df_pd_sp # pandas df 로 변환이 됨

## DataFrame -> pyspark.pandas


In [None]:
# !pip install pyarrow

In [None]:
import os
os.environ['PYARROW_IGNORE_TIMEZONE']='1'
# pyarrow 설정에 셋팅을 맞춰주는 코드

In [None]:
pd_df = df_pd_sp.to_pandas_on_spark()

In [None]:
type(pd_df)
pd_df

## 외부파일을 사용해 DataFrame 생성

In [None]:
class_df = spark.read.csv('/dataframe/a_class_info.csv', header = True)

### spark.dataframe.show()
- def show( : 20개의 행을 표시)
- def show(numRows : scala.Int) : 정해진 수 만큼 행 표시
- def show(truncate : scala.Boolean) : 열값이 길어 모두 표현되지 않을경우 표현 여부
    - truncate : True -> 열값을 자르고 표시 / False -> 열값을 모두 표시
- def show(numRows : scala.Int, truncate : scala.Boolean) : 표현할 행과 열값을 자를것인지의 여부
- def show(numRows : scala.Int, truncate : scala.Int ) : 표현할 행과 열값을 몇 글자 보여줄 것인지 여
- def show(numRows : scala.Int, truncate : scala.Int, vertical : scala.Boolean) : 레코드별로 세로로 표시할 것인지의 여부

In [None]:
class_df.show(1)
class_df.show(2,truncate=2) # 각 열의 값을 2글자만 표현
class_df.show(2,truncate=False) # 각 열의 값이 잘리는것을 방지
class_df.show(2, vertical=True) # df를 레코드별로 세로로 표시
class_df.show(2, vertical=False) # 기본값 False

In [None]:
# row 형태로 보여준다.
class_df.show(3, vertical = True)

## DataFrame 컬럼
- 컬럼 추가 변경 삭제 등## DataFrame 컬럼

- withColumn

In [3]:
data = [
    ('김철수', 15, date(2022,7,22), ('010','1111','2222')),
    ('이제동', 20, date(2021,7,22), ('010','2222','3333')),
    ('김명운', 25, date(2020,7,22), ('010','4444','5555')),
    ('홍진호', 36, date(2018,7,22), ('010','3333','4444'))
]

schema = StructType([
    StructField('name',StringType(),False,{'desc':'이름'}),
    StructField('age',IntegerType(),False,{'desc':'나이'}),    
    StructField('birth',DateType(),False,{'desc' :'생일'}),
    StructField('phone', StructType([
        StructField('phone1',StringType(),True),
        StructField('phone2',StringType(),True),
        StructField('phone3',StringType(),True)]),False,{'desc':'전화번호'}) # 중첩스키마
])

col_df = spark.createDataFrame(data, schema = schema)

In [None]:
col_df.show()

In [10]:
# withColumn : 컬럼이름, 컬럼
# lit : column 객체를 literal로 만들어주는 함수
# spark df의 컬럼은 liter
# 원하는 컬럼을 DataFrame에 추가
col_df.withColumn('우승여부',lit('')).show()
col_df.show()

In [11]:
# 값을 지정해서 추가
col_df.withColumn('우승여부',lit('우승')).show()

- pyspark 의 dataframe 에서,
- if 문처럼 사용할 수 있는 SQL function 인 case-when 과 비슷한
    - when (+ otherwise )을 사용할 수 있음

In [12]:
# age 값에 따라 연령대 추가, age를 10으로 나눈 몫에 따라 연령대 축
# when - otherwise : 조건에 따라 원하는 컬럼객체를 반환
type(col_df.age)


In [None]:
# pyspark.sql.column.Column 객체 // 연산자 사용 불가
temp = col_df.withColumn('연령대', when(floor(col_df.age/10)==1,'10대')
                                   .when(floor(col_df.age/10)==2,'20대')
                                   .otherwise('30대 이상') )
temp.show()

### column  내용  변경
- withColumn('컬럼명',값)함수는 컬럼명이 df 에 있으면 컬럼내용수정, 없으면 추가

In [13]:
# when - otherwise : 조건에 따라 원하는 컬럼객체를 반환
temp = col_df.withColumn('연령대', when(floor(col_df.age/10)==1,'청소년')
                                   .when(floor(col_df.age/10)==2,'청년')
                                   .otherwise('성인') )
temp.show()

### column 이름 변경
- widthColumnRenamed(원이름,변경이름)

In [None]:
temp = temp.withColumnRenamed('연령대','분류')
temp.show()

### column  삭제
- spark.df.drop(컬럼명)

# 2. DataFrame 사용 하기

참고 : https://spark.apache.org/docs/3.2.0/api/scala/org/apache/spark/sql/Dataset.html 

- DataFrame의 메서드의 구분
 - transformation
 - action
 - Basic Dataset functions  
 
 
- DataFrame의 사용은 SQL 쿼리 구조를 따라간다