### titanic_train.csv 파일을 로드하고, 이를 DataFrame으로 변환

In [0]:
display(dbutils.fs)

In [0]:
#spark.read.csv() 메소드를 이용하여 csv 파일을 로드하고 DataFrame으로 변환. 
# pandas_df = pd.read_csv('/FileStore/tables/titanic_train.csv', header='infer')
titanic_sdf = spark.read.csv('/FileStore/train.csv', header=True, inferSchema=True)
print('titanic sdf type:', type(titanic_sdf))
# dbfs:/FileStore/train.csv
# Databricks에서는 DataFrame의 컬럼과 데이터를 formatting된 형태로 보기 위해서 display() 지원.  
display(titanic_sdf) #z.show(titanic_sdf)

In [0]:
import pandas as pd

# /FileStore는 DBFS 파일 시스템으로 Spark외에는 접근 불가하여 아래는 오류 발생. 
pandas_df = pd.read_csv('/FileStore/tables/titanic_train.csv', header='infer')

In [0]:
import pandas as pd

# pandas DataFrame을 spark DataFrame으로 부터 생성. 
titanic_pdf = titanic_sdf.select('*').toPandas()
print(type(titanic_pdf))

# display()는 pandas DataFrame에도 적용됨. 
display(titanic_pdf)

In [0]:
# spark DataFrame을 메모리에 cache
titianic_sdf = titanic_sdf.cache()

#### pandas DataFrame의 head()와 spark DataFrame head() 비교 및 print() 적용 시 차이
* pandas DataFrame의 head(N)는 Dataframe의 선두 N개 Record를 가지는 DataFrame을 반환
* spark DataFrame의 head(N)는 DataFrame의 선두 N개 Row Object를 list로 반환. 
* spark DataFrame의 limit(N)가 DataFrame의 선두 N개 Record를 가지는 DataFrame을 반환. 
* pandas DataFrame은 print() 적용 시 DataFrame의 내용을 출력하지만 spark DataFrame은 DataFrame의 schema만 출력

In [0]:
# pandas DataFrame.head(N)는 선두 N개 Record를 가지는 DataFrame 반환
print(type(titanic_pdf.head(10)))
# Pandas DataFrame은 print()로 DataFrame의 내용을 출력. 
print(titanic_pdf.head(10))


In [0]:
# spark DataFrame의 head(N)는 DataFrame의 선두 N개 Row Object를 list로 반환.
print(type(titanic_sdf.head(10)))
print(titanic_sdf.head(10))

In [0]:
titanic_sdf

In [0]:
# spark DataFrame의 limit(N)은 DataFrame의 선두 N개 Record를 가지는 DataFrame을 반환.
print(type(titanic_sdf.limit(10)))

# spark DataFrame은 pandas와 다르게 print() 호출 시 값을 출력하지 않고 DataFrame의 schema만 출력.  
print(titanic_sdf.limit(10))

In [0]:
# spark DataFrame의 내용을 출력하기 위해서는 show() 메소드를 사용해야 함. 
print(titanic_sdf.limit(10).show())
titanic_sdf.limit(10).show()

In [0]:
# Databricks의 display()는 spark DataFrame의 내용을 출력할 수 있음. 
display(titanic_sdf)

# 메모리 절약을 위해 limit() 사용이 바람직 
display(titanic_sdf.limit(10))

### pandas DataFrame의 info()에 대응하는 spark DataFrame 메소드와 로직.
* pandas DataFrame의 info()는 컬럼명, data type과 not null 건수를 함께 출력
* spark DataFrame의 info() 메소드가 없으며 대신 printSchema() 메소드로 스키마(컬럼명, data type)만 출력
* not null 건수를 위해서는 별도의 SQL성 쿼리 작성 필요.

In [0]:
titanic_pdf.info()

In [0]:
# data type은 java object type
titanic_sdf.printSchema()

In [0]:
# SQL의 count case when과 유사한 로직으로 null 값을 포함한 컬럼 확인하기 
from pyspark.sql.functions import count, isnan, when, col

titanic_sdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in titanic_sdf.columns]).show()

### pandas DataFrame describe()와 spark DataFrame describe() 비교
* spark dataframe은 pandas dataframe과 비슷하게 describe()를 통해 모든 컬럼값들의 건수/평균/표준편차/최소값/최대값 등의 정보를 확인할 수 있음. 다만 percentile값을 만들지 않음. 
* pandas dataframe과 다르게 describe()시 숫자형 컬럼 뿐만 아니라 문자형 컬럼에 대해서도 건수/평균/표준편차/최소값/최대값 조사

In [0]:
titanic_pdf.describe()

In [0]:
# spark DataFrame의 describe() 역시 DataFrame을 반환하므로 내용 출력을 위해서는 show() 메소드를 호출하거나 Databricks의 display()로 감싸야 함. 
display(titanic_sdf.describe())

In [0]:
titanic_sdf.dtypes

In [0]:
# number형 컬럼들에 대해서만 describe()수행할 수 있도록 select 컬럼 filtering 적용. 
number_columns = [column_name for column_name, dtype in titanic_sdf.dtypes if dtype != 'string']
print(number_columns)

titanic_sdf.select(number_columns).describe().show()

### pandas DataFrame의 shape에 이에 대응하는 spark Dataframe 로직
* pandas DataFrame의 shape는 row건수와 column 개수를 매우 빠르게 제공.
* spark DataFrame은 shape를 제공하지 않음. column 개수는 spark DataFrame의 columns로 확인. row건수는 count() 메소드(쿼리성)로 확인.

In [0]:
titanic_pdf.shape

In [0]:
# spark DataFrame은 columns 속성으로 컬럼명을 list로 반환. 
print('column들:', titanic_sdf.columns)
print('column개수:', len(titanic_sdf.columns))

In [0]:
# spark DataFrame은 count() 메소드로 전체 row 건수를 제공. 
print(titanic_sdf.count())
print(type(titanic_sdf.count()))

In [0]:
print('titanic_sdf shape:', (titanic_sdf.count(), len(titanic_sdf.columns)))

### Spark DataFrame의 select() 메소드 알아보기
* select() 메소드는 SQL의 Select 절과 유사하게 한개 이상의 컬럼들의 값을 DataFrame형태로 반환. 
* 한개의 컬럼명, 또는 여러개의 컬럼명을 인자로 입력할 수 있음.
* 개별 컬럼명을 문자열 형태로 또는 DataFrame의 컬럼 속성으로 지정
* DataFrame의 컬럼 속성으로 지정시에는 DataFrame.컬럼명, DataFrame[컬럼명], col(컬럼명) 으로 지정 가능.

In [0]:
dict_01 = {'Name': ['Chulmin', 'Wansoo','Myunghyun','Hyunjoo', 'Chulman'],
           'Year': [2011, 2016, 2015, 2015, 2011],
           'Gender': ['Male', 'Male', 'Male', 'Female', 'Male']
          }
# 딕셔너리를 DataFrame으로 변환
data_pdf = pd.DataFrame(dict_01)

# pandas DataFrame은 spark DataFrame으로 변환
data_sdf = spark.createDataFrame(data_pdf)

print(type(data_sdf))
display(data_sdf)

In [0]:
print('pandas DataFrame의 단일 컬럼값을 출력')
print(data_pdf['Name'])

print('\npandas DataFrame의 여러개 컬럼값들을 출력')
print(data_pdf[['Name', 'Year']])

In [0]:
# select() 메소드는 DataFrame을 반환. Name 컬럼을 가지는 DataFrame을 반환. 
data_sdf.select('Name').show() # select name from data_sdf;

In [0]:
# 여러개의 컬럼명을 select 시에도 개별 컬럼명을 인자로 넣어줌. pandas와 다르게 list를 사용하지 않음. 
data_sdf.select('Name', 'Year').show() # select Name, Year from data_sdf

# 모든 컬럼을 선택시 SQL과 유사하게 '*' 사용 가능. 
data_sdf.select('*').show() # select * from data_sdf

In [0]:
select_columns = data_sdf.columns
# * list 는 list내부의 원소를 unpack함. 
print('select_columns:', select_columns, *select_columns)

data_sdf.select(*select_columns).show()
#버전 upgrade가 되면서 select 절에 list를 입력해도 여러개의 컬럼을 반환 가능. 하지만 원칙적으로 개별 컬럼들을 인자로 입력하는 것이 바람직. 
data_sdf.select(select_columns).show()

select_columns.remove('Name')
data_sdf.select(*select_columns).show()

In [0]:
# 컬럼 속성으로 지정하여 select 의 인자로 사용 가능. pandas DataFrame의 ['column']는 컬럼값을 포함한 array를 지칭하나 spark DataFrame의 ['column']는 컬럼 자체를 지정함. 
data_sdf.select(data_sdf.Name, data_sdf.Year).show()
data_sdf.select(data_sdf['Name'], data_sdf['Year']).show()

In [0]:
data_pdf.Year

In [0]:
from pyspark.sql.functions import col

# pyspark.sql.functions의 col() 함수를 이용하여 명시적으로 컬럼명을 지정할 수 있음. 
data_sdf.select(col('Name'), col('Year')).show()

In [0]:
print(type(data_sdf.Name), type(data_sdf['Name']))
print(type(col('Name')))

In [0]:
from pyspark.sql.functions import upper, lower, col

# select()에서 컬럼 데이터 가공 후 생성 가능. 
data_sdf.select("*", upper(col('Name'))).show() # Select A.*, upper(Name) from data_view A
data_sdf.select(upper(col('Name')).alias('Cap_Name')).show() # Select upper(Name) as Cap_Name from data_view

### spark DataFrame filter() 메소드 알아보기
* filter()는 SQL의 where와 유사하게 DataFrame내의 특정 조건을 만족하는 레코드를 DataFrame으로 반환. 
* filter()내의 조건 컬럼은 컬럼 속성으로 지정 가능. 조건문 자체는 SQL 과 유사한 문자열로 지정 할 수 있음(조건 컬럼은 문자열 지정이 안됨. )
* where() 메소드는 filter()의 alias이며 SQL where와 직관적인 동일성을 간주하기 위해 생성. 
* 복합 조건 and는 & 를, or를 | 를 사용. 개별 조건은 ()로 감싸야 함.

In [0]:
dict_01 = {'Name': ['Chulmin', 'Wansoo','Myunghyun','Hyunjoo', 'Chulman'],
           'Year': [2011, 2016, 2015, 2015, 2011],
           'Gender': ['Male', 'Male', 'Male', 'Female', 'Male']
          }
# 딕셔너리를 DataFrame으로 변환
data_pdf = pd.DataFrame(dict_01)

# pandas DataFrame은 spark DataFrame으로 변환
data_sdf = spark.createDataFrame(data_pdf)

print(type(data_sdf))
display(data_sdf)

In [0]:
data_sdf.filter(data_sdf['Name'] == 'Chulmin').show() # select * from data_sdf where Name = 'Chulmin'

print('조건 컬럼을 문자열로 지정할 수 없으므로 아래는 오류를 발생 시킵니다. ')
data_sdf.filter('Name' == 'Chulmin').show()

In [0]:
data_sdf.where(data_sdf['Name'] == 'Chulmin').show()

In [0]:
# 단일 컬럼은 string으로 사용할 수 없으며 filter 절의 조건 자체를 SQL의 where 조건절 구문과 유사하게 string으로 만들어야함. 단 SQL과 다르게 동일 비교 시 = 가 아니라 == 이 사용되어야 함.
# 또한 SQL 내부 문자열은 '' 으로 표시해야함. 
data_sdf.filter("Name == 'Chulmin'").show() # select * from data_sdf where Name = 'Chulmin'

In [0]:
data_sdf.filter( (data_sdf['Gender'] == 'Male') & (col('Year') > 2011) ).show() # select * from data_sdf where Gender = 'Male' and Year > 2011
data_sdf.filter( (data_sdf['Gender'] == 'Male') | (col('Year') < 2011) ).show() # select * from data_sdf where Gender = 'Male' or Year < 2011

In [0]:
# 문자열 컬럼의 like 조건 수행. 
print('문자열 컬럼 like() filter() 수행. ')
data_sdf.filter(col('Name').like('Chul%')).show() # select * from data_sdf where Name like 'Chul%'

print('SQL의 Like 조건문을 string으로 filter() 수행. ')
data_sdf.filter(" Name like 'Chul%' ").show()
data_sdf.filter(" Name like 'C%' ").show() # startswith('C') where Name like 'C%'
data_sdf.filter(" Name like '%u%' ").show() # contains('u')
data_sdf.filter(" upper(Name) like '%M%' ").show() #별도의 pyspark.sql.functions 호출없이 수행 가능.  select * from data_sdf where upper(Name) like '%M%'

In [0]:
from pyspark.sql.functions import upper

data_sdf.filter(upper(data_sdf['Name']).like('%M%')).show() #select * from data_sdf where upper(Name) like '%M%'

In [0]:
# filtering 후에 특정 컬럼으로만 DataFrame 생성하려면 select() 적용. 
data_sdf.filter(upper(data_sdf['Name']).like('%M%')).select('Year', 'Gender').show()