### titanic_train.csv 파일을 로드하고, 이를 DataFrame으로 변환

In [0]:
spark.sql("USE dev")
spark.sql("SHOW TABLES")

titanic_sdf = spark.table("titanic_train")
titanic_pdf = titanic_sdf.select('*').toPandas()

display(titanic_sdf.show(5))

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [0]:
titanic_sdf.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Survived: long (nullable = true)
 |-- Pclass: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: long (nullable = true)
 |-- Parch: long (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### spark DataFrame의 orderBy() 알아보기
* spark DataFrame의 orderBy() 메소드는 1개 이상의 컬럼순으로 정렬할 수 있는 기능. orderBy() 결과는 DataFrame으로 반환. 
* 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력할 수 있으며, 정렬 컬럼이 여러개일 경우 개별 컬럼을 인자로 넣거나 list로도 넣을 수 있음. 
* 오름차순, 내림차순 구분은 ascending=True/False로 구분
* 정렬 컬럼이 여러개 일때 개별 컬럼별로 서로 다른 정렬 옵션을 적용할 경우(예를 들어 컬럼1은 오름차순, 컬럼2는 내림차순) ascending=[True, False]와 같은 형태로 이용.

In [0]:
titanic_pdf_sorted_01 = titanic_pdf.sort_values(by=['Name'], ascending=True)

titanic_pdf_sorted_02 = titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=False)

titanic_pdf_sorted_03 = titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=[True, False])

display(titanic_pdf_sorted_01)
display(titanic_pdf_sorted_02)
display(titanic_pdf_sorted_03)

In [0]:
from pyspark.sql.functions import col

titanic_sdf.orderBy(col("Pclass").asc(), col("Name").asc()).show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+------+--------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|    Fare|  Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+--------+-------+--------+
|        731|       1|     1|Allen, Miss. Elis...|female|29.0|    0|    0| 24160|211.3375|     B5|       S|
|        306|       1|     1|Allison, Master. ...|  male|0.92|    1|    2|113781|  151.55|C22 C26|       S|
|        298|       0|     1|Allison, Miss. He...|female| 2.0|    1|    2|113781|  151.55|C22 C26|       S|
|        499|       0|     1|Allison, Mrs. Hud...|female|25.0|    1|    2|113781|  151.55|C22 C26|       S|
|        461|       1|     1| Anderson, Mr. Harry|  male|48.0|    0|    0| 19952|   26.55|    E12|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------+--------+-------+--------+
only showing top 5 rows


In [0]:
# 3 ways to use orderBy to sort by more than one column
from pyspark.sql.functions import col 

titanic_sdf.orderBy('Pclass', 'Name', ascending=[True, False]).show()

titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()

titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show() # select * from titanic_sdf order by Pclass asc, Name desc

In [0]:
# orderBy() == sort()
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()

In [0]:
# select Pclass, Name from titanic_sdf order by Pclass asc, Name desc
display(titanic_sdf.select(col('Pclass'), col('Name')).orderBy(col('Pclass').asc(), col('Name').desc()).limit(5))

#select Pclass, Name from (select * from titanic_sdf order by Pclass asc, Name desc)
display(titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).select(col('Pclass'), col('Name')).limit(5))


Pclass,Name
1,"Young, Miss. Marie Grice"
1,"Wright, Mr. George"
1,"Woolner, Mr. Hugh"
1,"Williams-Lambert, Mr. Fletcher Fellows"
1,"Williams, Mr. Charles Duane"


Pclass,Name
1,"Young, Miss. Marie Grice"
1,"Wright, Mr. George"
1,"Woolner, Mr. Hugh"
1,"Williams-Lambert, Mr. Fletcher Fellows"
1,"Williams, Mr. Charles Duane"


### spark DataFrame에 aggregation 메소드 적용하기
* pandas DataFrame은 DataFrame 객체에서 aggregation 메소드를 많이 가질 수 있음(DataFrame.count(), DataFrame.max())
* pandas DataFrame은 DataFrame 객체에 aggregation 메소드를 적용 시 DataFrame에 속한 전체 컬럼들에 모두 aggregation 메소드를 적용
* spark DataFrame은 DataFrame 객체에서 aggregation 메소드를 별로 가지고 있지 않음. count() 메소드 정도... 
* spark DataFrame에 aggregation 메소드를 적용 시에는 pyspark.sql.functions 모듈의 max, min, sum 등의 함수를 이용해야함.

In [0]:
print('#### pandas dataframe count() aggregation ####')
print(titanic_pdf.count())

print('#### pandas dataframe max() aggregation ####')
print(titanic_pdf.max())

print('#### pandas dataframe count() aggregation type ####')
print(type(titanic_pdf.count()))

#### pandas dataframe count() aggregation ####
PassengerId    891
Survived       891
Pclass         891
Name           891
Sex            891
Age            714
SibSp          891
Parch          891
Ticket         891
Fare           891
Cabin          204
Embarked       889
dtype: int64
#### pandas dataframe max() aggregation ####
PassengerId                            891
Survived                                 1
Pclass                                   3
Name           van Melkebeke, Mr. Philemon
Sex                                   male
Age                                   80.0
SibSp                                    8
Parch                                    6
Ticket                           WE/P 5735
Fare                              512.3292
dtype: object
#### pandas dataframe count() aggregation type ####
<class 'pandas.core.series.Series'>


  print(titanic_pdf.max())


In [0]:
print(titanic_pdf[['Pclass', 'Age']].max())

Pclass     3.0
Age       80.0
dtype: float64


In [0]:
# spark DataFrame에 count() aggregation을 적용하면 DataFrame의 Record 건수 반환. 
print('count() = Number of rows:', titanic_sdf.count()) # select count(*) from titanic_sdf

count() = Number of rows: 891


In [0]:
from pyspark.sql.functions import max, sum, min

# spark DataFrame에 count()를 제외하고 max(), min(), sum(), avg()와 같은 aggregate 메소드를 바로 호출할 수 없으며, select()메소드 내에서 호출되어야 함. 
titanic_sdf_max = titanic_sdf.select(max('Age')) # select max(Age) from titanic_sdf
print(titanic_sdf_max.show())
print(type(titanic_sdf_max)) # max() aggregation은 단 한개의 값을 반환하지만 DataFrame으로 반환. 

+--------+
|max(Age)|
+--------+
|    80.0|
+--------+

None
<class 'pyspark.sql.connect.dataframe.DataFrame'>


### spark DataFrame의 groupBy() 알아 보기
* pandas DataFrame의 groupby(by='group_by_컬럼명') 수행 시 group_by_컬럼명 레벨로 group by 된 DataFrameGroupBy 객체 반환하고 여기에 aggregation 메소드 적용. 
* spark DataFrame도 groupBy('group_by_컬럼명') 수행 시 group_by_컬럼명 레벨로 group by 된 GroupedData 객체 반환하고 여기에 aggregation 메소드 적용.
* pandas DataFrameGroupBy 객체에 agg() 메소드를 이용하여 서로 다른 컬럼에 서로 다른 aggregation 함수 적용 가능
* spark GroupedData 객체도 agg() 메소드를 이용하여 서로 다른 컬럼에 서로 다른 aggregation 함수 적용 가능
* spark groupBy()는 pandas groupby()의 특징과 SQL의 특징을 함께 가짐.

In [0]:
# pandas DataFrame에 groupby()메소드 호출 시 DataFrameGroupBy 객체 반환. 
titanic_pdf_groupby = titanic_pdf.groupby(by='Pclass')
print('pandas DataFrame의 groupby() 적용 결과 type:', type(titanic_pdf_groupby))

# Group by 된 pandas DataFrameGroupBy 객체에 count()를 적용 시 group by 된 컬럼값 레벨로 모든 컬럼들의 count() 수행. 
print('\n#### group by 레벨로 모든 컬럼에 count 적용 #### ')
print(titanic_pdf.groupby(by='Pclass').count())

print('\n#### group by 레벨로 특정 컬럼에 aggregation 적용 #### ')
# Group by 된 pandas DataFrameGroupBy 객체에 특정 컬럼에 aggregation 을 적용하려면 해당 컬럼을 ['컬럼명'] 추출하여 aggregation 함수 적용. 
print(titanic_pdf.groupby(by='Pclass')['Age'].max()) # select max(Age) from titanic_pdf group by Pclass

# pandas DataFrameGroupBy 객체에 여러 컬럼에 동일 aggregation 을 적용하려면 해당 컬럼들을 [['컬럼명1', '컬럼명2']]로 추출하여 aggregation 함수 적용. 
print('\n####  group by 레벨로 여러 컬럼에 동일 aggregation 적용 #### ')
print(titanic_pdf.groupby(by='Pclass')[['Age', 'Fare']].max()) # select max(Age), max(Fare) from titanic_pdf group by Pclass

# Group by 된 DataFrameGroupBy 객체에 서로 다른 컬럼에 서로 다른 aggregation 함수를 적용하려면 agg() 메소드를 사용. 
# agg()메소드 내부에 인자는 dictionary 형태로 적용 컬럼명과 적용 aggregation 함수 기재
print('\n####  group by 레벨로 여러개의 aggregation 함수를 서로 다른 컬럼에 적용 #### ')
agg_format = {
    'Age':'max', 
    'SibSp':'sum', 
    'Fare':'mean'
}
print(titanic_pdf.groupby(by='Pclass').agg(agg_format))

pandas DataFrame의 groupby() 적용 결과 type: <class 'pandas.core.groupby.generic.DataFrameGroupBy'>

#### group by 레벨로 모든 컬럼에 count 적용 #### 
        PassengerId  Survived  Name  Sex  ...  Ticket  Fare  Cabin  Embarked
Pclass                                    ...                               
1               216       216   216  216  ...     216   216    176       214
2               184       184   184  184  ...     184   184     16       184
3               491       491   491  491  ...     491   491     12       491

[3 rows x 11 columns]

#### group by 레벨로 특정 컬럼에 aggregation 적용 #### 
Pclass
1    80.0
2    70.0
3    74.0
Name: Age, dtype: float64

####  group by 레벨로 여러 컬럼에 동일 aggregation 적용 #### 
         Age      Fare
Pclass                
1       80.0  512.3292
2       70.0   73.5000
3       74.0   69.5500

####  group by 레벨로 여러개의 aggregation 함수를 서로 다른 컬럼에 적용 #### 
         Age  SibSp       Fare
Pclass                        
1       80.0     90  84.154687
2       70.0     74  20.662

In [0]:
# pandas DataFrame의 value_counts()는 Series에 적용시 해당 series내의 값 별로 건수를 구함. 
print(titanic_pdf['Pclass'].value_counts())

3    491
1    216
2    184
Name: Pclass, dtype: int64


In [0]:
# pandas 의 value_counts()의 대응될 수 있는 groupBy() 메소드. Spark DataFrame에 groupBy() 적용 시 GroupedData Object 반환.
# GroupedData Object에 count()외에 min(), max(), avg(), sum() 등 다양한 aggregation 메소드를 호출하여 group by, aggregation 결과 DataFrame 반환. 
titanic_sdf.groupBy('Pclass').count().show() # select pclass, count(*) from titanic_sdf group by pclass

print('spark DataFrame groupBy type:', type(titanic_sdf.groupBy('Pclass')))
print('spark GroupedData의 aggregation 메소드 적용 결과 type:', titanic_sdf.groupBy('Pclass').count()) 

+------+-----+
|Pclass|count|
+------+-----+
|     1|  216|
|     3|  491|
|     2|  184|
+------+-----+

spark DataFrame groupBy type: <class 'pyspark.sql.group.GroupedData'>
spark GroupedData의 aggregation 메소드 적용 결과 type: DataFrame[Pclass: int, count: bigint]


In [0]:
# spark DataFrame의 orderBy()메소드를 적용하여 group by 결과 건수 descending 으로 정렬 
titanic_sdf.groupBy('Pclass').count().orderBy(col('count').desc()).show()

+------+-----+
|Pclass|count|
+------+-----+
|     3|  491|
|     1|  216|
|     2|  184|
+------+-----+



In [0]:
#GroupedData 에 count()가 아니고 다른 aggregation 메소드를 적용 시 pandas DataFrame의 groupby와 유사하게 group by된 컬럼 레벨로 전체 컬럼에 대해서 aggregation을 적용. 
titanic_sdf.groupBy('Pclass').max().show() 

+------+----------------+-------------+-----------+--------+----------+----------+---------+
|Pclass|max(PassengerId)|max(Survived)|max(Pclass)|max(Age)|max(SibSp)|max(Parch)|max(Fare)|
+------+----------------+-------------+-----------+--------+----------+----------+---------+
|     1|             890|            1|          1|    80.0|         3|         4| 512.3292|
|     3|             891|            1|          3|    74.0|         8|         6|    69.55|
|     2|             887|            1|          2|    70.0|         3|         3|     73.5|
+------+----------------+-------------+-----------+--------+----------+----------+---------+



In [0]:
# group by 레벨로 특정 컬럼에 aggregation 적용. max('컬럼명')과 같이 aggregation 메소드 내부에 인자로 컬러명 입력
titanic_sdf.groupBy('Pclass').max('Age').show() # select max(Age) from titainic_sdf group by Pclass

#GroupedData에서 aggregation 메소드 호출 시 오직 문자열 컬럼명만 가능. 컬럼형 인자 입력은 오류 발생. 
titanic_sdf.groupBy('Pclass').max(col('Age')).show()

+------+--------+
|Pclass|max(Age)|
+------+--------+
|     1|    80.0|
|     3|    74.0|
|     2|    70.0|
+------+--------+



[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-1567257586276313>[0m in [0;36m<module>[0;34m[0m
[1;32m      3[0m [0;34m[0m[0m
[1;32m      4[0m [0;31m#GroupedData에서 aggregation 메소드 호출 시 오직 문자열 컬럼명만 가능. 컬럼형 인자 입력은 오류 발생.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0mtitanic_sdf[0m[0;34m.[0m[0mgroupBy[0m[0;34m([0m[0;34m'Pclass'[0m[0;34m)[0m[0;34m.[0m[0mmax[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m'Age'[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/group.py[0m in [0;36m_api[0;34m(self, *cols)[0m
[1;32m     39[0m     [0;32mdef[0m [0m_api[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m*[0m[0mcols[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m     40[0m         [0mname[0m

In [0]:
# 여러 컬럼으로 Group by 규정할 때 개별 컬럼명을 입력하거나, list 형태로 입력 가능. 
titanic_sdf.groupBy('Pclass', 'Sex').max('Age').show() # select max(Age) from titanic_sdf group by Pclass, Sex
titanic_sdf.groupBy(['Pclass', 'Sex']).max('Age').show()

+------+------+--------+
|Pclass|   Sex|max(Age)|
+------+------+--------+
|     2|female|    57.0|
|     3|  male|    74.0|
|     1|  male|    80.0|
|     3|female|    63.0|
|     1|female|    63.0|
|     2|  male|    70.0|
+------+------+--------+

+------+------+--------+
|Pclass|   Sex|max(Age)|
+------+------+--------+
|     2|female|    57.0|
|     3|  male|    74.0|
|     1|  male|    80.0|
|     3|female|    63.0|
|     1|female|    63.0|
|     2|  male|    70.0|
+------+------+--------+



In [0]:
### 여러개의 aggregation 함수를 적용할 경우는 agg()메소드 내에서 개별 aggregation 함수를 명시 해야함. 

from pyspark.sql.functions import max, avg, sum, min

# select max(age), min(age), sum(age), avg(age) from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').agg(max('Age'), min('Age'), sum('Age'), avg('Age')).show() # select max(age), min(age), sum(age), avg(age) from titanic_sdf group by pclass

+------+--------+--------+--------+------------------+
|Pclass|max(Age)|min(Age)|sum(Age)|          avg(Age)|
+------+--------+--------+--------+------------------+
|     2|    70.0|    0.67| 5168.83| 29.87763005780347|
|     1|    80.0|    0.92| 7111.42|38.233440860215055|
|     3|    74.0|    0.42| 8924.92| 25.14061971830986|
+------+--------+--------+--------+------------------+



In [0]:
#아래와 같이 개별 aggregation 함수 결과 컬럼에 별도의 컬럼명을 alias('새로운 컬럼명')을 활용하여 부여 할 수 있음. 
# agg() 메소드 내에서 aggregation 함수 적용 시에는 col('컬럼명')과 같은 컬럼형으로 컬럼명을 지정해도 됨. 
# select max(age) as max_age, min(age) as min_age, sum(age) as sum_age, avg(age) as avg_age from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').agg(
    max(col('Age')).alias('max_age'), 
    min('Age').alias('min_age'), \
    sum('Age').alias('sum_age'), 
    avg('Age').alias('avg_age') \
    ).orderBy(col('Pclass')).show()

+------+-------+-------+-------+------------------+
|Pclass|max_age|min_age|sum_age|           avg_age|
+------+-------+-------+-------+------------------+
|     1|   80.0|   0.92|7111.42|38.233440860215055|
|     2|   70.0|   0.67|5168.83| 29.87763005780347|
|     3|   74.0|   0.42|8924.92| 25.14061971830986|
+------+-------+-------+-------+------------------+



In [0]:
titanic_sdf.createOrReplaceTempView("titanic_sdf")

In [0]:
%sql

select Pclass, max(Age) as max_age, min(Age) as min_age, sum(Age) as total_age, avg(Age) as avg_age
from titanic_sdf
group by 1
order by 1 asc


Pclass,max_age,min_age,total_age,avg_age
1,80.0,0.92,7111.42,38.233440860215055
2,70.0,0.67,5168.83,29.87763005780347
3,74.0,0.42,8924.92,25.14061971830986


In [0]:
# 아래와 같이 filter()를 적용하여 group by의 aggregation 결과 값을 기준으로 filtering 적용할 수 있음. 
'''
select max(age) as max_age, min(age) as min_age, sum(age) as sum_age, avg(age) as avg_age from titanic_sdf group by pclass having max(age) > 70
또는 
select max_age, min_age, sum_avg, avg_age 
from (
      select max(age) as max_age, min(age) as min_age, sum(age) as sum_age, avg(age) as avg_age from titanic_sdf group by pclass
) where max_age > 70
'''
titanic_sdf.groupBy('Pclass').agg(max(col('Age')).alias('max_age'), min('Age').alias('min_age') , \
                                 sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
                                 ).where(col('max_age') > 70).show()

+------+-------+-------+-------+------------------+
|Pclass|max_age|min_age|sum_age|           avg_age|
+------+-------+-------+-------+------------------+
|     1|   80.0|   0.92|7111.42|38.233440860215055|
|     3|   74.0|   0.42|8924.92| 25.14061971830986|
+------+-------+-------+-------+------------------+

