In [2]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

import warnings
warnings.filterwarnings('ignore')

from pyspark.sql.functions import *

In [3]:
sc = SparkContext()

23/05/16 13:04:50 WARN Utils: Your hostname, Mums-MacBook.local resolves to a loopback address: 127.0.0.1; using 10.40.58.22 instead (on interface en0)
23/05/16 13:04:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/16 13:04:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark = SparkSession(sc)

## I. DATA CLEANING

In [5]:
df = spark.read.csv('/Users/tranhuonggiang/Documents/BI_DA/KHTN/KHTN_BigData in ML/b5/2017_StPaul_MN_Real_Estate.csv', inferSchema = True, header = True)

In [6]:
df.toPandas().shape

23/05/14 13:38:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


(5000, 74)

In [7]:
df.toPandas().columns

Index(['No', 'MLSID', 'StreetNumberNumeric', 'streetaddress', 'STREETNAME',
       'PostalCode', 'StateOrProvince', 'City', 'SalesClosePrice', 'LISTDATE',
       'LISTPRICE', 'LISTTYPE', 'OriginalListPrice', 'PricePerTSFT',
       'FOUNDATIONSIZE', 'FENCE', 'MapLetter', 'LotSizeDimensions',
       'SchoolDistrictNumber', 'DAYSONMARKET', 'offmarketdate', 'Fireplaces',
       'RoomArea4', 'roomtype', 'ROOF', 'RoomFloor4', 'PotentialShortSale',
       'PoolDescription', 'PDOM', 'GarageDescription', 'SQFTABOVEGROUND',
       'Taxes', 'RoomFloor1', 'RoomArea1', 'TAXWITHASSESSMENTS', 'TAXYEAR',
       'LivingArea', 'UNITNUMBER', 'YEARBUILT', 'ZONING', 'STYLE', 'ACRES',
       'CoolingDescription', 'APPLIANCES', 'backonmarketdate',
       'ROOMFAMILYCHAR', 'RoomArea3', 'EXTERIOR', 'RoomFloor3', 'RoomFloor2',
       'RoomArea2', 'DiningRoomDescription', 'BASEMENT', 'BathsFull',
       'BathsHalf', 'BATHQUARTER', 'BATHSTHREEQUARTER', 'Class', 'BATHSTOTAL',
       'BATHDESC', 'RoomArea5', 'RoomF

In [8]:
df_sub = df.select('StreetNumberNumeric','Fireplaces','LotSizeDimensions','LISTTYPE','ACRES','AssumableMortgage',
                   'SalesClosePrice','LISTPRICE','DAYSONMARKET','YEARBUILT')

In [9]:
df_sub.toPandas().head(2)

Unnamed: 0,StreetNumberNumeric,Fireplaces,LotSizeDimensions,LISTTYPE,ACRES,AssumableMortgage,SalesClosePrice,LISTPRICE,DAYSONMARKET,YEARBUILT
0,11511,0,279X200,Exclusive Right,1.28,,143000,139900,10,1950
1,11200,0,100x140,Exclusive Right,0.32,,190000,210000,4,1971


Xoá các thuộc tính không liên quan

In [10]:
df_sub = df_sub.drop(*['StreetNumberNumeric','LotSizeDimensions'])
df_sub.toPandas().head(2)

Unnamed: 0,Fireplaces,LISTTYPE,ACRES,AssumableMortgage,SalesClosePrice,LISTPRICE,DAYSONMARKET,YEARBUILT
0,0,Exclusive Right,1.28,,143000,139900,10,1950
1,0,Exclusive Right,0.32,,190000,210000,4,1971


Lọc dữ liệu

In [11]:
df_sub.select('AssumableMortgage').distinct().show()

+-------------------+
|  AssumableMortgage|
+-------------------+
|  Yes w/ Qualifying|
| Information Coming|
|               null|
|Yes w/No Qualifying|
|      Not Assumable|
+-------------------+



In [12]:
yes_values = ['Yes w/ Qualifying','Yes w/No Qualifying']

#filter yes but keep null value
text_filter = ~df_sub['AssumableMortgage'].isin(yes_values) | df_sub['AssumableMortgage'].isNull()
df_sub = df_sub.where(text_filter)

df_sub.count()

4976

Xoá outliers theo phân phối chuẩn

In [13]:
df_sub = df_sub.withColumn('log_SalesClosePrice', log(col('SalesClosePrice')))

In [14]:
#cal values used for outlier filtering 
mean_val = df_sub.agg({'log_SalesClosePrice':'mean'}).collect()[0][0]
stddev_val = df_sub.agg({'log_SalesClosePrice':'stddev'}).collect()[0][0]

#create lower and upper bounds 
low_bound = mean_val - (3*stddev_val) 
up_bound = mean_val + (3*stddev_val) 

#filter data betwen upper & lower bound 
df_sub = df_sub.where((df_sub['log_SalesClosePrice'] < up_bound) & (df_sub['log_SalesClosePrice'] > low_bound))

In [15]:
df_sub.count()

4946

xoá dữ liệu null & nan

In [16]:
#drop records if both ListPrice and SalesClosePrice are NULL 
df_sub = df_sub.dropna(how='all', subset = ['ListPrice','SalesClosePrice'])
df_sub.count()

4946

In [17]:
#drop records where at least two columns have NULL values
df_sub = df_sub.dropna(thresh = 2)
df_sub.count()

4946

Xoá dữ liệu trùng lắp

In [18]:
df_sub = df_sub.drop_duplicates()
df_sub.count()
# có trùng 1 row

4945

Scale dữ liệu:
1. Minmax scaling: khi data không có pp chuẩn 
2. Standard scaling: khi data có pp chuẩn

In [19]:
#define min max value 
max_price = df_sub.agg({'SalesClosePrice':'max'}).collect()[0][0]
min_price = df_sub.agg({'SalesClosePrice':'min'}).collect()[0][0]

#create column based on the scaled data 
df_sub = df_sub.withColumn('scaled_price', 
                           (df_sub['SalesClosePrice'] - min_price)/(max_price - min_price))

df_sub.select('scaled_price').show(5)

+-------------------+
|       scaled_price|
+-------------------+
| 0.7718277066356228|
|0.15832363213038417|
|0.21071012805587894|
| 0.1210710128055879|
|0.21769499417927823|
+-------------------+
only showing top 5 rows



In [20]:
#cal values used for outlier filtering 
mean_days = df_sub.agg({'DAYSONMARKET':'mean'}).collect()[0][0]
stddev_days = df_sub.agg({'DAYSONMARKET':'stddev'}).collect()[0][0]

#create column based on the scaled data
df_sub = df_sub.withColumn('standardscaled_days', 
                           (df_sub['DAYSONMARKET'] - mean_days)/stddev_days)
df_sub.select('scaled_price').show(5)

+-------------------+
|       scaled_price|
+-------------------+
| 0.7718277066356228|
|0.15832363213038417|
|0.21071012805587894|
| 0.1210710128055879|
|0.21769499417927823|
+-------------------+
only showing top 5 rows



**Fill null value**
1. Drop column: nếu data thiếu nhiều hoặc thiếu 1 cách ngẫu nhiên 
2. Rule Based: giá trị điền dựa trên business logic
3. Statistics Based: gía trị điền có thể là mean, median...
4. Model Based: gía trị điền là giá trị dự đoán từ model 

## II. DATA FEATURING

Đã xem rồi, chuyển qua làm bài tập luôn cũng được

## III. EXERCISES

#### Exercise 1

1. Đọc dữ liệu => df

In [22]:
df = spark.read.csv(('/Users/tranhuonggiang/Documents/BI_DA/KHTN/KHTN_BigData in ML/b5/voters_data/DallasCouncilVoters.csv'), inferSchema = True, header = True)

2. Cho biết dữ liệu có bao nhiêu dòng, in scheme. Hiển thị 5 dòng dữ liệu đầu tiên.

In [23]:
df.count()

44625

In [24]:
df.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- TITLE: string (nullable = true)
 |-- VOTER_NAME: string (nullable = true)



In [25]:
df.show(5)

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
+----------+-------------+-------------------+
only showing top 5 rows



3. Kiểm tra dữ liệu NaN, null. Nếu dòng nào 'VOTER_NAME' có dữ liệu null thì xóa hết các dòng đó.

In [26]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, col

In [32]:
#Kiểm tra dữ liệu NaN => KHONG CO
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
DATE,0
TITLE,0
VOTER_NAME,0


In [34]:
#Kiểm tra dữ liệu NULL 
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
DATE,0
TITLE,195
VOTER_NAME,503


In [35]:
#del null value in voter_name 
df = df.dropna(subset = 'VOTER_NAME')

In [36]:
df.count()

44122

4. Kiểm tra dữ liệu trùng. Xóa dữ liệu trùng.

In [37]:
num_rows = df.count()
num_dist_rows = df.distinct().count()
dup_rows = num_rows - num_dist_rows

In [38]:
display(num_rows, num_dist_rows, dup_rows)

44122

1273

42849

In [39]:
df = df.drop_duplicates()

In [40]:
df.count()

1273

5. Tìm các VOTER_NAME duy nhất và hiển thị 10 thông tin đầu tiên.

In [43]:
df.select(df['VOTER_NAME']).distinct().show(10)

+--------------------+
|          VOTER_NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|       Casey  Thomas|
+--------------------+
only showing top 10 rows



6. Lọc dữ liệu theo điều kiện 'VOTER_NAME' có chiều dài từ 1-20 ký tự.

In [44]:
from pyspark.sql.functions import *

In [49]:
df = df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

In [50]:
df.show(2)

+----------+--------------------+---------------+
|      DATE|               TITLE|     VOTER_NAME|
+----------+--------------------+---------------+
|04/11/2018|Deputy Mayor Pro Tem|   Adam Medrano|
|02/14/2018|       Councilmember|Lee M. Kleinman|
+----------+--------------------+---------------+
only showing top 2 rows



7. Loại bỏ các dữ liệu mà trong 'VOTER_NAME' có chứa dấu '_' (underscore)

In [51]:
df = df.filter(~ col('VOTER_NAME').contains('_'))

8. Tạo cột 'splits' chứa thông tin được cắt theo khoảng trắng từ 'VOTER_NAME'

In [52]:
df = df.withColumn('split', split(df.VOTER_NAME, '/s+')) 
df.show(1)

+----------+--------------------+------------+--------------+
|      DATE|               TITLE|  VOTER_NAME|         split|
+----------+--------------------+------------+--------------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|
+----------+--------------------+------------+--------------+
only showing top 1 row



9. Tạo cột 'first_name' lấy dữ liệu từ phần tử đầu tiên trong cột 'splits'

In [54]:
df = df.withColumn('first_name', df.split.getItem(0))
df.show(1)

+----------+--------------------+------------+--------------+------------+
|      DATE|               TITLE|  VOTER_NAME|         split|  first_name|
+----------+--------------------+------------+--------------+------------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|Adam Medrano|
+----------+--------------------+------------+--------------+------------+
only showing top 1 row



10. Tạo cột 'last_name' lấy dữ liệu từ phần tử cuối cùng trong cột 'splits'

In [59]:
df = df.withColumn('last_name', df.split.getItem(size('split') - 1))
df.show(1)

+----------+--------------------+------------+--------------+------------+------------+
|      DATE|               TITLE|  VOTER_NAME|         split|  first_name|   last_name|
+----------+--------------------+------------+--------------+------------+------------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|Adam Medrano|Adam Medrano|
+----------+--------------------+------------+--------------+------------+------------+
only showing top 1 row



11. Tạo cột 'random_val' theo điều kiện: nếu cột 'TITLE' có nội dung là 'Councilmember' thì
'random_val' sẽ có giá trị rand(), nếu có nội dung là 'Mayor' thì 'random_val' sẽ có giá trị là 2,
ngược lại sẽ có giá trị là 0.

In [60]:
df = df.withColumn('random_val', when(df.TITLE == 'Councilmember', rand()))
df.show(1)

+----------+--------------------+------------+--------------+------------+------------+----------+
|      DATE|               TITLE|  VOTER_NAME|         split|  first_name|   last_name|random_val|
+----------+--------------------+------------+--------------+------------+------------+----------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|Adam Medrano|Adam Medrano|      null|
+----------+--------------------+------------+--------------+------------+------------+----------+
only showing top 1 row



In [62]:
df = df.withColumn('random_val',
 when(df.TITLE == 'Councilmember', rand())
 .when(df.TITLE == 'Mayor', 2)
 .otherwise(0))
df.show(1)

+----------+--------------------+------------+--------------+------------+------------+----------+
|      DATE|               TITLE|  VOTER_NAME|         split|  first_name|   last_name|random_val|
+----------+--------------------+------------+--------------+------------+------------+----------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|Adam Medrano|Adam Medrano|       0.0|
+----------+--------------------+------------+--------------+------------+------------+----------+
only showing top 1 row



12. Lọc các dòng dữ liệu có 'random_val' = 0. Hiển thị.

In [64]:
df.filter(df.random_val == 0).show(1)

+----------+--------------------+------------+--------------+------------+------------+----------+
|      DATE|               TITLE|  VOTER_NAME|         split|  first_name|   last_name|random_val|
+----------+--------------------+------------+--------------+------------+------------+----------+
|04/11/2018|Deputy Mayor Pro Tem|Adam Medrano|[Adam Medrano]|Adam Medrano|Adam Medrano|       0.0|
+----------+--------------------+------------+--------------+------------+------------+----------+
only showing top 1 row



13. Xây dựng function: getFirstAndMiddle(names) trả về kết quả gồm First và Middle (names).
Khai báo function vừa viết dưới dạng udf đặt tên là udfFirstAndMiddle.

In [65]:
from pyspark.sql.types import *

In [66]:
def getFirstAndMiddle(names):
 # Return a space separated string of names
 return ' '.join(names[:-1])

In [67]:
udfFirstAndMiddle = udf(getFirstAndMiddle, StringType())

14. Tạo cột first_and_middle_name bằng cách gọi udf trên với tham số truyền vào là cột 'splits'. In
kết quả.

In [69]:
df = df.withColumn('first_and_middle_name', udfFirstAndMiddle(df.split))

15. Xóa bỏ các cột 'first_name', 'splits'. In kết quả.

In [70]:
df = df.drop('first_name')
df = df.drop('splits')

In [71]:
df.show(3)

+----------+--------------------+---------------+-----------------+---------------+-------------------+---------------------+
|      DATE|               TITLE|     VOTER_NAME|            split|      last_name|         random_val|first_and_middle_name|
+----------+--------------------+---------------+-----------------+---------------+-------------------+---------------------+
|04/11/2018|Deputy Mayor Pro Tem|   Adam Medrano|   [Adam Medrano]|   Adam Medrano|                0.0|                     |
|02/14/2018|       Councilmember|Lee M. Kleinman|[Lee M. Kleinman]|Lee M. Kleinman|  0.368239777636053|                     |
|04/25/2018|       Councilmember| Tennell Atkins| [Tennell Atkins]| Tennell Atkins|0.32299455596231186|                     |
+----------+--------------------+---------------+-----------------+---------------+-------------------+---------------------+
only showing top 3 rows



16. Thêm cột 'ROW_ID' bằng phương thức: monotonically_increasing_id() (trong
pyspark.sql.functions).

In [72]:
df = df.withColumn('ROW_ID', monotonically_increasing_id())

17. Hiển thị 10 dòng đầu của dữ liệu với ROW_ID tăng dần.

In [73]:
df.orderBy(df.ROW_ID.desc()).show(10)

+----------+-------------+-------------------+--------------------+-------------------+------------------+---------------------+------+
|      DATE|        TITLE|         VOTER_NAME|               split|          last_name|        random_val|first_and_middle_name|ROW_ID|
+----------+-------------+-------------------+--------------------+-------------------+------------------+---------------------+------+
|11/22/2016|Councilmember|       Mark Clayton|      [Mark Clayton]|       Mark Clayton|0.7285887795180894|                     |  1258|
|05/17/2017|Councilmember|       Lee Kleinman|      [Lee Kleinman]|       Lee Kleinman|0.2247095866492842|                     |  1257|
|03/22/2017|Councilmember| Rickey D. Callahan|[Rickey D. Callahan]| Rickey D. Callahan|0.0553367594734675|                     |  1256|
|01/25/2017|Councilmember|      Sandy Greyson|     [Sandy Greyson]|      Sandy Greyson|0.9525553632611952|                     |  1255|
|01/24/2018|Mayor Pro Tem|  Dwaine R. Caraway| [

#### Exercise 2

In [5]:
df = spark.read.csv(('/Users/tranhuonggiang/Documents/BI_DA/KHTN/KHTN_BigData in ML/b5/AA_data/AA_DFW_2017_Departures_Short.csv'), inferSchema = True, header = True)

In [6]:
df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|            5|                HNL|                          537|
|       01/01/2017|            7|                OGG|                          498|
|       01/01/2017|           37|                SFO|                          241|
|       01/01/2017|           43|                DTW|                          134|
|       01/01/2017|           51|                STL|                           88|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



5. Trong df, thêm cột 'airport' lấy dữ liệu từ cột 'Destination Airport', định dạng chữ thường cho
nội dung

In [7]:
from pyspark.sql.functions import *

In [9]:
df = df.withColumn('airport', lower(df['Destination Airport']))

6. Trong df, thêm cột 'date' lấy dữ liệu từ cột 'Date (MM/DD/YYYY)', sau đó xóa bỏ cột 'Date
(MM/DD/YYYY)'

In [13]:
df = df.withColumn('date', df['Date (MM/DD/YYYY)'])
df = df.drop('Date (MM/DD/YYYY)')

7. Trong df, đổi tên cột "Flight Number" thành "flight_num", cột "Actual elapsed time (Minutes)"
thành "actual_time"

In [11]:
df = df.withColumnRenamed("Flight Number", "flight_num")
df = df.withColumnRenamed("Actual elapsed time (Minutes)", "actual_time")

8. Lưu df dưới dạng Parquet format với tên là "AA_DFW_ALL.parquet"

In [14]:
df.show(3)

+----------+-------------------+-----------+-------+----------+
|flight_num|Destination Airport|actual_time|airport|      date|
+----------+-------------------+-----------+-------+----------+
|         5|                HNL|        537|    hnl|01/01/2017|
|         7|                OGG|        498|    ogg|01/01/2017|
|        37|                SFO|        241|    sfo|01/01/2017|
+----------+-------------------+-----------+-------+----------+
only showing top 3 rows



In [15]:
df.write.parquet('AA_DFW_ALL.parquet.1', mode='overwrite')

                                                                                

9. Đọc parquet "AA_DFW_ALL.parquet" => df_new

In [16]:
df_new = spark.read.parquet('AA_DFW_ALL.parquet.1')

In [17]:
print(df_new.count())

139358


In [18]:
df_new.show(2)

+----------+-------------------+-----------+-------+----------+
|flight_num|Destination Airport|actual_time|airport|      date|
+----------+-------------------+-----------+-------+----------+
|         5|                HNL|        537|    hnl|01/01/2017|
|         7|                OGG|        498|    ogg|01/01/2017|
+----------+-------------------+-----------+-------+----------+
only showing top 2 rows



10. Tạo một bảng tạm 'flights'. Cho biết trung bình của 'actual_time' trong 'flight'

In [19]:
df_new.createOrReplaceTempView('flights')

In [20]:
avg_duration = spark.sql('SELECT avg(actual_time) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

The average flight time is: 151


11. Caching các dòng dữ liệu duy nhất của df_new. Đếm số dòng. Cho biết thời gian thực hiện
các công việc này.

In [21]:
import time

In [22]:
start_time = time.time()
# Add caching to the unique rows in df_new
df_new = df_new.distinct().cache()
# Count the unique rows in df_new, noting how long the operation takes
print("Counting %d rows took %f seconds" %
 (df_new.count(), time.time() - start_time))

Counting 139358 rows took 0.934001 seconds


12. Đếm lại số dòng. Cho biết thời gian thực hiện các công việc này.

In [23]:
# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" %
 (df_new.count(), time.time() - start_time))

Counting 139358 rows again took 0.214738 seconds


13. Kiểm tra xem df_new có trong cache hay không? Nếu có thì bỏ df_new ra khỏi cache.

In [24]:
# Determine if df_new is in the cache
print("Is df_new cached?: %s" % df_new.is_cached)
print("Removing df_new from cache")
# Remove df_new from the cache
df_new.unpersist()
# Check the cache status again
print("Is df_new cached?: %s" % df_new.is_cached)

Is df_new cached?: True
Removing df_new from cache
Is df_new cached?: False
