In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.conf import SparkConf

from pyspark.sql import SparkSession
from pyspark.sql import types 
from pyspark.sql.functions import col
from pyspark.sql.types import DateType
from pyspark.sql.functions import isnan, isnull, when, count
from pyspark.sql.functions import lower

In [3]:
sc = SparkContext()
spark = SparkSession(sc)

### 1. Đọc dữ liệu => df

In [4]:
df = spark.read.csv(["../../Data/AA_data"], header=True, inferSchema=True)

### 2. Cho biết dữ liệu có bao nhiêu dòng, in scheme. Hiển thị 5 dòng dữ liệu đầu tiên.

In [5]:
df.count()

583718

In [6]:
df.printSchema()

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: integer (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): integer (nullable = true)



In [7]:
df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|            5|                HNL|                          519|
|       01/01/2014|            7|                OGG|                          505|
|       01/01/2014|           35|                SLC|                          174|
|       01/01/2014|           43|                DTW|                          153|
|       01/01/2014|           52|                PIT|                          137|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



### 3. Kiểm tra dữ liệu NaN, null.

In [8]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
Date (MM/DD/YYYY),0
Flight Number,0
Destination Airport,0
Actual elapsed time (Minutes),0


=> Không có dữ liệu NaN

In [9]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas().T

Unnamed: 0,0
Date (MM/DD/YYYY),0
Flight Number,0
Destination Airport,0
Actual elapsed time (Minutes),0


=> Không có dữ liệu Null

### 4. Kiểm tra dữ liệu trùng. Xóa dữ liệu trùng.

In [10]:
num_rows = df.count()
num_dist_rows = df.distinct().count()
dup_rows = num_rows - num_dist_rows

In [11]:
dup_rows

0

=> Không có dữ liệu trùng

## Lazy processing operations

### 5. Trong df, thêm một cột 'airport' lấy dữ liệu từ cột 'Destination Airport'. định dạng chữ thường cho nội dung.

In [12]:
df = df.withColumn('airport', lower(df['Destination Airport']))
df = df.drop('Destination Airport')
df.show(5)

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2014|            5|                          519|    hnl|
|       01/01/2014|            7|                          505|    ogg|
|       01/01/2014|           35|                          174|    slc|
|       01/01/2014|           43|                          153|    dtw|
|       01/01/2014|           52|                          137|    pit|
+-----------------+-------------+-----------------------------+-------+
only showing top 5 rows



### 6. Trong df, thêm một cột 'date' lấy dữ liệu từ cột 'Date (MM/DD/YYYY)', sau đó xóa cột 'Date (MM/DD/YYYY)').

In [13]:
df = df.withColumn('date', lower(df['Date (MM/DD/YYYY)']))
df = df.drop('Date (MM/DD/YYYY)')
df.show(5)

+-------------+-----------------------------+-------+----------+
|Flight Number|Actual elapsed time (Minutes)|airport|      date|
+-------------+-----------------------------+-------+----------+
|            5|                          519|    hnl|01/01/2014|
|            7|                          505|    ogg|01/01/2014|
|           35|                          174|    slc|01/01/2014|
|           43|                          153|    dtw|01/01/2014|
|           52|                          137|    pit|01/01/2014|
+-------------+-----------------------------+-------+----------+
only showing top 5 rows



### 7. Trong df, đổi tên cột 'Flight Number' thành 'flight_num', cột 'Actual elapsed time (Minutes)' thành 'actual_time'.

In [14]:
df = df.withColumnRenamed('Flight Number', 'flight_num')
df = df.withColumnRenamed('Actual elapsed time (Minutes)', 'actual_time')
df.show(5)

+----------+-----------+-------+----------+
|flight_num|actual_time|airport|      date|
+----------+-----------+-------+----------+
|         5|        519|    hnl|01/01/2014|
|         7|        505|    ogg|01/01/2014|
|        35|        174|    slc|01/01/2014|
|        43|        153|    dtw|01/01/2014|
|        52|        137|    pit|01/01/2014|
+----------+-----------+-------+----------+
only showing top 5 rows



## Parquet Format

### 8. Lưu df dưới dạng Parquet format với tên là "AA_DFW_ALL.parquet".

In [15]:
# Save the df DataFrame in Parquet format
df.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

### 9. Đọc parquet "AA_DFW_ALL.parquet" => df_new.

In [16]:
df_new = spark.read.parquet('AA_DFW_ALL.parquet')

In [17]:
df_new.count()

583718

## SQL and Parquet

### 10. Tạo một bảng tạm 'flights'. Cho biết trung bình của 'actual_time' trong 'flights'.

In [18]:
df_new.createOrReplaceTempView('flights')

In [19]:
# Run a SQL query of the average Actual elapsed time
avg_duration = spark.sql('SELECT AVG(actual_time) FROM flights').collect()[0]
print("The average flight time is: %d" % avg_duration)

The average flight time is: 147


# Improve Performance
#### Caching a DataFrame
* Caching can improve performance when reusing DataFrame

In [20]:
import time

### 11. Caching các dòng dữ liệu duy nhất của df_new. Đếm số dòng. Cho biết thời gian thực hiện các công việc này.

In [21]:
start_time = time.time()
# Add caching to the unique rows in df_new
df_new = df_new.distinct().cache()
# Count the unique rows in df_new, nothing how long the operation takes
print("Counting %d rows took %f seconds" % (df_new.count(), time.time() - start_time))

Counting 583718 rows took 2.353077 seconds


### 12. Đếm lại số dòng. Cho biết thời gian thực hiện các công việc này.

In [22]:
# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (df_new.count(), time.time() - start_time))

Counting 583718 rows again took 0.580008 seconds


## Removing a DataFrame from cache

### 13. Kiểm tra xem df_new có trong cache hay không? Nếu có thì bỏ df_new khỏi cache.

In [23]:
# Determine if df_new is in the cache
print('Is df_new cached?: %s' % df_new.is_cached)

Is df_new cached?: True


In [24]:
print('Removing df_new from cache')
# Remove df_new from the cache
df_new.unpersist()
# Check the cache status again
print('Is df_new cached?: %s' % df_new.is_cached)

Removing df_new from cache
Is df_new cached?: False


* Note: Converting to a larger number of files with approximately equal quantity of rows lets Spark decide how best the data.

## Cluster configurations

In [25]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')
# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')
# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 61139
Number of partitions: 200


In [26]:
# Store the number of partitions in variable
before = df_new.rdd.getNumPartitions()
# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
# Recreate the DataFrame using the departure data file
df_new = spark.read.parquet('AA_DFW_ALL.parquet').distinct()
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % df_new.rdd.getNumPartitions())

Partition count before change: 200
Partition count after change: 500


In [27]:
# Save data to json file
df_new.write.json('AA_DFW_ALL.json')