# Data Processing

## Import modules

#### 執行 row 2、4即可以跑

In [1]:
# import SparkSession => native 不需要，執行 row 2、4即可以跑
from pyspark.sql import SparkSession

In [11]:
import pyspark.sql.functions as fn  
from pyspark.sql.types import StringType,DoubleType,IntegerType

## Set spark session

In [3]:
# only for standalone(lightweight) pyspark
# create spar session object
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [4]:
spark.sparkContext.appName # 代表正在用互動式的spark 

'PySparkShell'

## Load data

In [13]:
# Load csv Dataset 
df=spark.read.csv('data/sample_data.csv',inferSchema=True,header=True) 
# inferSchema,header預設為 False，inferSchema=True 才會檢查欄位資料型態，不檢查會把數字當文字
# header=True 才會判斷第一列是否為標題
df.createOrReplaceTempView("dfTable")

In [4]:
# df.explain() 
df.explain(extended=True)

== Parsed Logical Plan ==
Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Optimized Logical Plan ==
Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Physical Plan ==
FileScan csv [ratings#16,age#17,experience#18,family#19,mobile#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ratings:int,age:int,experience:double,family:int,mobile:string>



## Inspect data

In [6]:
# columns of dataframe
df.columns

['ratings', 'age', 'experience', 'family', 'mobile']

In [7]:
# check number of columns
len(df.columns)

5

In [8]:
# number of records in dataframe
df.count()

33

In [9]:
# shape of dataset
df.count(),len(df.columns)

(33, 5)

In [10]:
# print dataframe schema
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: integer (nullable = true)
 |-- mobile: string (nullable = true)



In [6]:
# display fisrt few rows of dataframe => 預設20筆
# df.show()
df.show(10)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
|      4| 27|       9.0|     0|   Oppo|
|      5| 37|      23.0|     5|   Vivo|
|      5| 37|      23.0|     5|Samsung|
|      3| 22|       2.5|     0|  Apple|
|      3| 27|       6.0|     0|     MI|
+-------+---+----------+------+-------+
only showing top 10 rows



In [5]:
# truncate=True => If set to True, truncate strings longer than 20 chars by default，truncate=3 :=> 每欄 3個字母
df.show(10, truncate=True) 

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
|      4| 27|       9.0|     0|   Oppo|
|      5| 37|      23.0|     5|   Vivo|
|      5| 37|      23.0|     5|Samsung|
|      3| 22|       2.5|     0|  Apple|
|      3| 27|       6.0|     0|     MI|
+-------+---+----------+------+-------+
only showing top 10 rows



In [12]:
# display fisrt 5 rows of dataframe
df.head(5)

[Row(ratings=3, age=32, experience=9.0, family=3, mobile='Vivo'),
 Row(ratings=3, age=27, experience=13.0, family=3, mobile='Apple'),
 Row(ratings=4, age=22, experience=2.5, family=0, mobile='Samsung'),
 Row(ratings=4, age=37, experience=16.5, family=4, mobile='Apple'),
 Row(ratings=5, age=27, experience=9.0, family=1, mobile='MI')]

In [13]:
# display last 5 rows of dataframe
df.tail(5)

[Row(ratings=2, age=32, experience=16.5, family=2, mobile='Oppo'),
 Row(ratings=3, age=27, experience=6.0, family=0, mobile='MI'),
 Row(ratings=3, age=27, experience=6.0, family=0, mobile='MI'),
 Row(ratings=4, age=22, experience=6.0, family=1, mobile='Oppo'),
 Row(ratings=4, age=37, experience=6.0, family=0, mobile='Vivo')]

In [14]:
# display first row of dataframe
df.first()

Row(ratings=3, age=32, experience=9.0, family=3, mobile='Vivo')

In [15]:
# 因為有 df.createOrReplaceTempView("dfTable") 才可以跑
spark.sql('''select * from dfTable limit 3''').show()

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
+-------+---+----------+------+-------+



## Descriptive statistics

In [7]:
# info about dataframe
# df.describe() => 只執行 plan
df.describe().show() # show() => 才會 actoin 

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  null|
|    min|                 1|                22|               2.5|                 0| Apple|
|    max|                 5|                42|              23.0|                 5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+



In [17]:
# info about dataframe
df.summary().show()

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  null|
|    min|                 1|                22|               2.5|                 0| Apple|
|    25%|                 3|                27|               6.0|                 0|  null|
|    50%|                 4|                27|               6.0|                 1|  null|
|    75%|                 4|                37|              16.5|                 3|  null|
|    max|                 5|                42|              23.0|    

## Select columns

In [18]:
# select only 2 columns
df.select('age','mobile').show(5)

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+
only showing top 5 rows



In [19]:
# use spark sql
spark.sql('select age, mobile from dfTable limit 5').show()

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+



In [8]:
df.select('age','mobile').explain(extended=True) # 最佳化引擎，過程不一樣 Physical Plan 相同

== Parsed Logical Plan ==
'Project [unresolvedalias('age, None), unresolvedalias('mobile, None)]
+- Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Analyzed Logical Plan ==
age: int, mobile: string
Project [age#17, mobile#20]
+- Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Optimized Logical Plan ==
Project [age#17, mobile#20]
+- Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Physical Plan ==
FileScan csv [age#17,mobile#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:int,mobile:string>



In [9]:
spark.sql('select age, mobile from dfTable').explain(extended=True) # 最佳化引擎，過程不一樣 Physical Plan 相同

== Parsed Logical Plan ==
'Project ['age, 'mobile]
+- 'UnresolvedRelation [dfTable], [], false

== Analyzed Logical Plan ==
age: int, mobile: string
Project [age#17, mobile#20]
+- SubqueryAlias dftable
   +- Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Optimized Logical Plan ==
Project [age#17, mobile#20]
+- Relation[ratings#16,age#17,experience#18,family#19,mobile#20] csv

== Physical Plan ==
FileScan csv [age#17,mobile#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:int,mobile:string>



In [14]:
# use different pyspark sql functions
df.select(
    fn.expr('ratings'), 
    fn.col('family'), 
    fn.column('mobile'))\
.show(3)

TypeError: 'str' object is not callable

In [16]:
fn.column? # => 無法傳字串資料型態(目前參數打錯，等版更) return col(col)

[0;31mSignature:[0m [0mfn[0m[0;34m.[0m[0mcolumn[0m[0;34m([0m[0mcol[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a :class:`~pyspark.sql.Column` based on the given column name.'

.. versionadded:: 1.3
[0;31mFile:[0m      /usr/local/spark/python/pyspark/sql/functions.py
[0;31mType:[0m      function


In [17]:
# use different pyspark sql functions
df.select(
    fn.expr('ratings'), 
    fn.col('family'), 
    fn.col('mobile'))\
.show(3)

+-------+------+-------+
|ratings|family| mobile|
+-------+------+-------+
|      3|     3|   Vivo|
|      3|     3|  Apple|
|      4|     0|Samsung|
+-------+------+-------+
only showing top 3 rows



In [15]:
df.select(fn.expr('age+1 AS age1')).show(5) # fn.expr 效能最好，所以實作上可取代 fn.col、fn.column

+----+
|age1|
+----+
|  33|
|  28|
|  23|
|  38|
|  28|
+----+
only showing top 5 rows



In [22]:
df.selectExpr('age+1 AS age1').show(5)

+----+
|age1|
+----+
|  33|
|  28|
|  23|
|  38|
|  28|
+----+
only showing top 5 rows



In [23]:
# df.selectExpr() 等同於 df.select(fn.expr())
df.selectExpr(
'*',  # all original columns
'(age>=30) as over30')\
.show(5)

+-------+---+----------+------+-------+------+
|ratings|age|experience|family| mobile|over30|
+-------+---+----------+------+-------+------+
|      3| 32|       9.0|     3|   Vivo|  true|
|      3| 27|      13.0|     3|  Apple| false|
|      4| 22|       2.5|     0|Samsung| false|
|      4| 37|      16.5|     4|  Apple|  true|
|      5| 27|       9.0|     1|     MI| false|
+-------+---+----------+------+-------+------+
only showing top 5 rows



In [24]:
spark.sql('SELECT *, (age>=30) as over30 FROM dfTable LIMIT 5').show()

+-------+---+----------+------+-------+------+
|ratings|age|experience|family| mobile|over30|
+-------+---+----------+------+-------+------+
|      3| 32|       9.0|     3|   Vivo|  true|
|      3| 27|      13.0|     3|  Apple| false|
|      4| 22|       2.5|     0|Samsung| false|
|      4| 37|      16.5|     4|  Apple|  true|
|      5| 27|       9.0|     1|     MI| false|
+-------+---+----------+------+-------+------+



In [18]:
df.selectExpr(
'*',  # all original columns
'(age>=30) as over30')\
.explain(extended=True)

== Parsed Logical Plan ==
'Project [*, ('age >= 30) AS over30#446]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string, over30: boolean
Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#446]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#446]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#446]
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFi

In [19]:
spark.sql('SELECT *, (age>=30) as over30 FROM dfTable').explain(extended=True)

== Parsed Logical Plan ==
'Project [*, ('age >= 30) AS over30#453]
+- 'UnresolvedRelation [dfTable], [], false

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string, over30: boolean
Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#453]
+- SubqueryAlias dftable
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#453]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Project [ratings#409, age#410, experience#411, family#412, mobile#413, (age#410 >= 30) AS over30#453]
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilt

## Add a column

In [23]:
# with column
df.withColumn('age_after_10_yrs',(df['age']+10)).show(10,False) # show( ,False) => 目前差在欄位靠左靠右

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3      |32 |9.0       |3     |Vivo   |42              |
|3      |27 |13.0      |3     |Apple  |37              |
|4      |22 |2.5       |0     |Samsung|32              |
|4      |37 |16.5      |4     |Apple  |47              |
|5      |27 |9.0       |1     |MI     |37              |
|4      |27 |9.0       |0     |Oppo   |37              |
|5      |37 |23.0      |5     |Vivo   |47              |
|5      |37 |23.0      |5     |Samsung|47              |
|3      |22 |2.5       |0     |Apple  |32              |
|3      |27 |6.0       |0     |MI     |37              |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows



In [24]:
# convert data type
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False) # show( ,False) => 目前差在欄位靠左靠右

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|mobile |age_double|
+-------+---+----------+------+-------+----------+
|3      |32 |9.0       |3     |Vivo   |32.0      |
|3      |27 |13.0      |3     |Apple  |27.0      |
|4      |22 |2.5       |0     |Samsung|22.0      |
|4      |37 |16.5      |4     |Apple  |37.0      |
|5      |27 |9.0       |1     |MI     |27.0      |
|4      |27 |9.0       |0     |Oppo   |27.0      |
|5      |37 |23.0      |5     |Vivo   |37.0      |
|5      |37 |23.0      |5     |Samsung|37.0      |
|3      |22 |2.5       |0     |Apple  |22.0      |
|3      |27 |6.0       |0     |MI     |27.0      |
+-------+---+----------+------+-------+----------+
only showing top 10 rows



In [27]:
# use selectExpr method
df.selectExpr(
'*',  # all original columns
'cast(age as double) as age_double')\
.show(5)

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family| mobile|age_double|
+-------+---+----------+------+-------+----------+
|      3| 32|       9.0|     3|   Vivo|      32.0|
|      3| 27|      13.0|     3|  Apple|      27.0|
|      4| 22|       2.5|     0|Samsung|      22.0|
|      4| 37|      16.5|     4|  Apple|      37.0|
|      5| 27|       9.0|     1|     MI|      27.0|
+-------+---+----------+------+-------+----------+
only showing top 5 rows



In [26]:
# convert data type
df.withColumn('age_double',df['age'].cast(DoubleType())).explain(extended=True)

== Parsed Logical Plan ==
Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#652]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string, age_double: double
Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#652]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#652]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#652]
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [], Format: CSV

In [25]:
# use selectExpr method
df.selectExpr(
'*',  # all original columns
'cast(age as double) as age_double')\
.explain(extended=True)

== Parsed Logical Plan ==
'Project [*, cast('age as double) AS age_double#645]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string, age_double: double
Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#645]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#645]
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Project [ratings#409, age#410, experience#411, family#412, mobile#413, cast(age#410 as double) AS age_double#645]
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/da

### Reference:
https://spark.apache.org/docs/latest/api/sql/

https://spark.apache.org/docs/latest/sql-ref-datatypes.html

## Delete column

In [28]:
# ** df_new 重複使用 ** => 控制記憶體
# delete a column 
df_new=df.drop('mobile')
df_new.show(5)

+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      3| 32|       9.0|     3|
|      3| 27|      13.0|     3|
|      4| 22|       2.5|     0|
|      4| 37|      16.5|     4|
|      5| 27|       9.0|     1|
+-------+---+----------+------+
only showing top 5 rows



In [29]:
# ** df_new 重複使用 ** => 控制記憶體
df_new=df.drop('age', 'mobile')
df_new.show(5)

+-------+----------+------+
|ratings|experience|family|
+-------+----------+------+
|      3|       9.0|     3|
|      3|      13.0|     3|
|      4|       2.5|     0|
|      4|      16.5|     4|
|      5|       9.0|     1|
+-------+----------+------+
only showing top 5 rows



## Filter data

In [30]:
# filter the records 
df.filter(df['mobile']=='Vivo').show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      3| 32|       9.0|     3|  Vivo|
|      5| 37|      23.0|     5|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
+-------+---+----------+------+------+



In [27]:
# filter the records 
df.filter(df['mobile']=='Vivo').explain(extended=True)

== Parsed Logical Plan ==
Filter (mobile#413 = Vivo)
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Filter (mobile#413 = Vivo)
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Filter (isnotnull(mobile#413) AND (mobile#413 = Vivo))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Filter (isnotnull(mobile#413) AND (mobile#413 = Vivo))
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [isnotnull(mobile#413), (mobile#413 = Vivo)], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(mobile), EqualTo(mobile,Vivo)], ReadSchema: struct<ratings:int,age:int,experience:double,family:int,mobile:string>



In [31]:
# filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [32]:
# filter the records 
df.filter("mobile=='Vivo'").select('age','ratings','mobile').show()

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [33]:
# filter the records with spark sql 
spark.sql("""select age, ratings, mobile from dfTable where mobile=='Vivo'""").show()

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [34]:
# filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



In [35]:
# filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



In [36]:
# filter the multiple conditions
# SQL Styles
df.filter("mobile=='Vivo' and experience>10").show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



In [37]:
# filter the records with spark sql 
spark.sql('''select * from dfTable where mobile=='Vivo' and experience>10''').show()

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+



## Compare Plan Efficiency

In [28]:
# filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).explain(extended=True)

== Parsed Logical Plan ==
'Filter (experience#411 > 10)
+- Filter (mobile#413 = Vivo)
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Filter (experience#411 > cast(10 as double))
+- Filter (mobile#413 = Vivo)
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [isnotnull(mobile#413), isnotnull(experience#411), (mobile#413 = Vivo), (experience#411 > 10.0)], Format: CSV, Locat

In [29]:
# filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).explain(extended=True)

== Parsed Logical Plan ==
'Filter ((mobile#413 = Vivo) AND (experience#411 > 10))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Filter ((mobile#413 = Vivo) AND (experience#411 > cast(10 as double)))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [isnotnull(mobile#413), isnotnull(experience#411), (mobile#413 = Vivo), (experience#411 > 10.0)], Format: CSV, Location: InMemoryF

In [30]:
# filter the multiple conditions
df.filter("mobile=='Vivo' and experience>10").explain(extended=True)

== Parsed Logical Plan ==
'Filter (('mobile = Vivo) AND ('experience > 10))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Filter ((mobile#413 = Vivo) AND (experience#411 > cast(10 as double)))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [isnotnull(mobile#413), isnotnull(experience#411), (mobile#413 = Vivo), (experience#411 > 10.0)], Format: CSV, Location: InMemoryFileInd

In [31]:
# filter the records with spark sql 
spark.sql('''select * from dfTable where mobile=='Vivo' and experience>10''').explain(extended=True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('mobile = Vivo) AND ('experience > 10))
   +- 'UnresolvedRelation [dfTable], [], false

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Project [ratings#409, age#410, experience#411, family#412, mobile#413]
+- Filter ((mobile#413 = Vivo) AND (experience#411 > cast(10 as double)))
   +- SubqueryAlias dftable
      +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Filter (((isnotnull(mobile#413) AND isnotnull(experience#411)) AND (mobile#413 = Vivo)) AND (experience#411 > 10.0))
+- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [isnotnull(mobile#413), isnotnull(exper

## Distinct Values

In [38]:
# Distinct Values in a columnc
df.select('mobile').distinct().show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [32]:
# 開始 HashAggregate、hashpartitioning
df.select('mobile').distinct().explain(extended=True)

== Parsed Logical Plan ==
Deduplicate [mobile#413]
+- Project [mobile#413]
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
mobile: string
Deduplicate [mobile#413]
+- Project [mobile#413]
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Aggregate [mobile#413], [mobile#413]
+- Project [mobile#413]
   +- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(2) HashAggregate(keys=[mobile#413], functions=[], output=[mobile#413])
+- Exchange hashpartitioning(mobile#413, 200), ENSURE_REQUIREMENTS, [id=#275]
   +- *(1) HashAggregate(keys=[mobile#413], functions=[], output=[mobile#413])
      +- FileScan csv [mobile#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<mobile:string>



In [39]:
# Distinct Values in columns
df.select('ratings', 'mobile').distinct().show()

+-------+-------+
|ratings| mobile|
+-------+-------+
|      5|Samsung|
|      5|     MI|
|      4|  Apple|
|      3|  Apple|
|      2|   Oppo|
|      4|   Oppo|
|      1|     MI|
|      3|     MI|
|      4|Samsung|
|      5|   Vivo|
|      4|   Vivo|
|      3|   Vivo|
|      2|Samsung|
+-------+-------+



In [40]:
# distinct value count !!=> get a value 
df.select('mobile').distinct().count()

5

In [41]:
# use spark sql !!=> get a table
spark.sql('''select distinct(mobile) from dfTable''').show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [42]:
# use spark sql
spark.sql('''select count(distinct(mobile)) as counts from dfTable''').show()

+------+
|counts|
+------+
|     5|
+------+



## Sort rows

In [43]:
# sort age
df.sort('age').show(10)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      4| 22|       6.0|     1|   Oppo|
|      4| 22|       2.5|     0|Samsung|
|      5| 22|       2.5|     0|Samsung|
|      3| 22|       2.5|     0|  Apple|
|      4| 22|       6.0|     1|   Oppo|
|      5| 27|       6.0|     0|     MI|
|      5| 27|       6.0|     2|Samsung|
|      5| 27|       9.0|     1|     MI|
|      4| 27|       6.0|     1|  Apple|
|      3| 27|      13.0|     3|  Apple|
+-------+---+----------+------+-------+
only showing top 10 rows



In [44]:
# order by ratings and family 
df.orderBy('ratings', 'family').show(10)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      1| 37|      23.0|     5|     MI|
|      2| 27|       6.0|     2|   Oppo|
|      2| 32|      16.5|     2|   Oppo|
|      2| 42|      23.0|     2|   Oppo|
|      2| 27|       9.0|     2|Samsung|
|      2| 27|       6.0|     2|   Oppo|
|      3| 22|       2.5|     0|  Apple|
|      3| 27|       6.0|     0|     MI|
|      3| 27|       6.0|     0|     MI|
|      3| 27|       6.0|     0|     MI|
+-------+---+----------+------+-------+
only showing top 10 rows



In [33]:
# order by ratings and family => Exchange rangepartitioning
df.orderBy('ratings', 'family').explain(extended=True)

== Parsed Logical Plan ==
'Sort ['ratings ASC NULLS FIRST, 'family ASC NULLS FIRST], true
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Analyzed Logical Plan ==
ratings: int, age: int, experience: double, family: int, mobile: string
Sort [ratings#409 ASC NULLS FIRST, family#412 ASC NULLS FIRST], true
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Optimized Logical Plan ==
Sort [ratings#409 ASC NULLS FIRST, family#412 ASC NULLS FIRST], true
+- Relation[ratings#409,age#410,experience#411,family#412,mobile#413] csv

== Physical Plan ==
*(1) Sort [ratings#409 ASC NULLS FIRST, family#412 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(ratings#409 ASC NULLS FIRST, family#412 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#286]
   +- FileScan csv [ratings#409,age#410,experience#411,family#412,mobile#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/hadoop/sparkcodes/data/sample_data

In [45]:
# descending order
df.orderBy('experience', ascending=False).show(10)

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      5| 37|      23.0|     5|   Vivo|
|      5| 37|      23.0|     5|Samsung|
|      1| 37|      23.0|     5|     MI|
|      3| 42|      23.0|     5|     MI|
|      2| 42|      23.0|     2|   Oppo|
|      3| 37|      16.5|     5|  Apple|
|      2| 32|      16.5|     2|   Oppo|
|      4| 37|      16.5|     4|  Apple|
|      3| 37|      16.5|     5|  Apple|
|      3| 27|      13.0|     3|  Apple|
+-------+---+----------+------+-------+
only showing top 10 rows



In [46]:
# use spark sql
spark.sql("""select ratings, age, mobile from dfTable 
        order by age desc""").show(10)

+-------+---+-------+
|ratings|age| mobile|
+-------+---+-------+
|      2| 42|   Oppo|
|      3| 42|     MI|
|      4| 37|  Apple|
|      5| 37|   Vivo|
|      3| 37|  Apple|
|      1| 37|     MI|
|      5| 37|Samsung|
|      4| 37|Samsung|
|      4| 37|   Vivo|
|      3| 37|  Apple|
+-------+---+-------+
only showing top 10 rows



## Null values

In [35]:
# create dataframe
data = [('Alice', 22, 52),
        ('John', None , 68),
        ('Mary', 24, 55),
        ('Alan', None, None),
        ('Jane', 32, 48)]

emp=spark.createDataFrame(data, ['name', 'age', 'weight'])
emp.show()

+-----+----+------+
| name| age|weight|
+-----+----+------+
|Alice|  22|    52|
| John|null|    68|
| Mary|  24|    55|
| Alan|null|  null|
| Jane|  32|    48|
+-----+----+------+



In [48]:
# fill null value
emp.fillna(1).show()

+-----+---+------+
| name|age|weight|
+-----+---+------+
|Alice| 22|    52|
| John|  1|    68|
| Mary| 24|    55|
| Alan|  1|     1|
| Jane| 32|    48|
+-----+---+------+



In [49]:
# fill null value
val={'age': 30, 'weight': 50}
emp.fillna(val).show()

+-----+---+------+
| name|age|weight|
+-----+---+------+
|Alice| 22|    52|
| John| 30|    68|
| Mary| 24|    55|
| Alan| 30|    50|
| Jane| 32|    48|
+-----+---+------+



In [50]:
# drop null data
emp.dropna().show()

+-----+---+------+
| name|age|weight|
+-----+---+------+
|Alice| 22|    52|
| Mary| 24|    55|
| Jane| 32|    48|
+-----+---+------+



In [51]:
# drop null data， Alan 被丟掉
emp.dropna(thresh=2).show()

+-----+----+------+
| name| age|weight|
+-----+----+------+
|Alice|  22|    52|
| John|null|    68|
| Mary|  24|    55|
| Jane|  32|    48|
+-----+----+------+



In [36]:
emp.dropna?

[0;31mSignature:[0m [0memp[0m[0;34m.[0m[0mdropna[0m[0;34m([0m[0mhow[0m[0;34m=[0m[0;34m'any'[0m[0;34m,[0m [0mthresh[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0msubset[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a new :class:`DataFrame` omitting rows with null values.
:func:`DataFrame.dropna` and :func:`DataFrameNaFunctions.drop` are aliases of each other.

.. versionadded:: 1.3.1

Parameters
----------
how : str, optional
    'any' or 'all'.
    If 'any', drop a row if it contains any nulls.
    If 'all', drop a row only if all its values are null.
thresh: int, optional
    default None
    If specified, drop rows that have less than `thresh` non-null values.
    This overwrites the `how` parameter.
subset : str, tuple or list, optional
    optional list of column names to consider.

Examples
--------
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
+---+-----

## Duplicate data

In [37]:
# create dataframe
data = [('Alice', 22, 'USA'),
        ('John', 18 , 'Japan'),
        ('Mary', 24, 'Germany'),
        ('Alice', 22, 'USA'),
        ('Jane', 24, 'Germany')]

cust=spark.createDataFrame(data, ['name', 'age', 'country'])
cust.show()

+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 22|    USA|
| John| 18|  Japan|
| Mary| 24|Germany|
|Alice| 22|    USA|
| Jane| 24|Germany|
+-----+---+-------+



In [53]:
# drop duplicate rows
cust.dropDuplicates().show()

+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 22|    USA|
| Jane| 24|Germany|
| Mary| 24|Germany|
| John| 18|  Japan|
+-----+---+-------+



In [38]:
cust.dropDuplicates? # 沒有 keep 可以設

[0;31mSignature:[0m [0mcust[0m[0;34m.[0m[0mdropDuplicates[0m[0;34m([0m[0msubset[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Return a new :class:`DataFrame` with duplicate rows removed,
optionally only considering certain columns.

For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming
:class:`DataFrame`, it will keep all data across triggers as intermediate state to drop
duplicates rows. You can use :func:`withWatermark` to limit how late the duplicate data can
be and system will accordingly limit the state. In addition, too late data older than
watermark will be dropped to avoid any possibility of duplicates.

:func:`drop_duplicates` is an alias for :func:`dropDuplicates`.

.. versionadded:: 1.4.0

Examples
--------
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=1

In [54]:
# drop duplicate rows base on some columns
cust.dropDuplicates(subset=['age', 'country']).show()

+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 22|    USA|
| John| 18|  Japan|
| Mary| 24|Germany|
+-----+---+-------+



In [55]:
# use spark sql
cust.createOrReplaceTempView("custTable")
spark.sql('''select distinct * from custTable''').show()

+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 22|    USA|
| Jane| 24|Germany|
| Mary| 24|Germany|
| John| 18|  Japan|
+-----+---+-------+

