In [1]:
import pyspark 
from pyspark import SparkContext

In [2]:
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession

In [4]:
#sc =. SparkContext()
sc

# Extract

In [5]:
spark = SQLContext(sc)

In [23]:
flights_data = spark.read.json("gs://amy_data_etl/fight-data/2019-05-06.json") #df

In [24]:
flights_data.registerTempTable("flights_data")#to run sql

## Understand data

In [25]:
spark.sql("select * from flights_data limit 10").show()

+------------+-------+-------------+------------+---------------+--------------+-------------------+--------+-----------+----------+------+--------------+
|airline_code|airtime|arrival_delay|arrival_time|departure_delay|departure_time|destination_airport|distance|flight_date|flight_num|    id|source_airport|
+------------+-------+-------------+------------+---------------+--------------+-------------------+--------+-----------+----------+------+--------------+
|       20304|     88|           13|        1421|              8|          1228|                PHX|     598| 2019-05-06|      2916|200000|           MRY|
|       20304|     69|           -3|        1721|             -6|          1454|                GJT|     438| 2019-05-06|      2916|200001|           PHX|
|       20304|     62|            2|        2203|            -10|          2128|                PHX|     347| 2019-05-06|      2917|200002|           ELP|
|       20304|     67|          -15|        2039|             -8|     

In [26]:
spark.sql("select distinct flight_num from flights_data" ).show()

+----------+
|flight_num|
+----------+
|      5409|
|      1697|
|       474|
|      1806|
|      1950|
|       964|
|        29|
|      3764|
|      2250|
|      3506|
|        26|
|      1677|
|      2214|
|      2453|
|      2509|
|      2529|
|      2040|
|      4894|
|      5385|
|      2927|
+----------+
only showing top 20 rows



In [27]:
spark.sql("select count(*) from flights_data" ).show()

+--------+
|count(1)|
+--------+
|  100001|
+--------+



In [28]:
spark.sql("select max(distance) from flights_data limit 10").show()

+-------------+
|max(distance)|
+-------------+
|         4983|
+-------------+



# Transform

## 1. find average delay by fight number and date

In [29]:
qry = """
        select 
            flight_date , 
            round(avg(arrival_delay),2) as avg_arrival_delay,
            round(avg(departure_delay),2) as avg_departure_delay,
            flight_num 
        from 
            flights_data 
        group by 
            flight_num , 
            flight_date 
      """

In [30]:
avg_delays_by_flight_nums = spark.sql(qry)

In [32]:
avg_delays_by_flight_nums.show()

+-----------+-----------------+-------------------+----------+
|flight_date|avg_arrival_delay|avg_departure_delay|flight_num|
+-----------+-----------------+-------------------+----------+
| 2019-05-06|              5.5|               6.17|      6430|
| 2019-05-06|            -2.48|                3.3|      4480|
| 2019-05-06|           -13.29|              -4.86|      4757|
| 2019-05-06|            11.76|              16.52|      5323|
| 2019-05-06|             4.17|               6.11|      5380|
| 2019-05-06|             5.64|               8.57|      3705|
| 2019-05-06|             6.27|              13.47|      1611|
| 2019-05-06|            -7.29|              -2.66|      1732|
| 2019-05-06|            14.33|               20.0|      1742|
| 2019-05-06|            -1.79|               7.69|       429|
| 2019-05-06|             -3.0|               4.67|       453|
| 2019-05-06|            -5.22|              -1.17|       789|
| 2019-05-06|            -1.56|               1.81|    

## 2. find average delay by distance (1196 values, so you need to bucket them)

In [33]:
spark.sql("select count(distinct distance) from flights_data").show()

+------------------------+
|count(DISTINCT distance)|
+------------------------+
|                    1196|
+------------------------+



In [34]:
spark.sql("select max(distance) from flights_data limit 10").show()

+-------------+
|max(distance)|
+-------------+
|         4983|
+-------------+



## 2.1 bucket them and save it to new coloumn

In [37]:
qry = """
        select 
            *,
            case 
                when distance between 0 and 500 then 1 
                when distance between 501 and 1000 then 2
                when distance between 1001 and 2000 then 3
                when distance between 2001 and 3000 then 4 
                when distance between 3001 and 4000 then 5 
                when distance between 4001 and 5000 then 6 
            END distance_category 
        from 
            flights_data 
        """

In [39]:
spark.sql(qry).show(5)

+------------+-------+-------------+------------+---------------+--------------+-------------------+--------+-----------+----------+------+--------------+-----------------+
|airline_code|airtime|arrival_delay|arrival_time|departure_delay|departure_time|destination_airport|distance|flight_date|flight_num|    id|source_airport|distance_category|
+------------+-------+-------------+------------+---------------+--------------+-------------------+--------+-----------+----------+------+--------------+-----------------+
|       20304|     88|           13|        1421|              8|          1228|                PHX|     598| 2019-05-06|      2916|200000|           MRY|                2|
|       20304|     69|           -3|        1721|             -6|          1454|                GJT|     438| 2019-05-06|      2916|200001|           PHX|                1|
|       20304|     62|            2|        2203|            -10|          2128|                PHX|     347| 2019-05-06|      2917|200

## 2.2 add distance_category to original dataset and register for table

In [40]:
# add distance_category to original dataset
flights_data = spark.sql(qry)

In [42]:
flights_data.registerTempTable("flights_data")

## 2.3 calculate average

In [43]:
# calculate average
qry = """
        select 
            flight_date , 
            round(avg(arrival_delay),2) as avg_arrival_delay,
            round(avg(departure_delay),2) as avg_departure_delay,
            distance_category 
        from 
            flights_data 
        group by 
            distance_category , 
            flight_date 
      """


#spark.sql(qry).show()
avg_delays_by_distance_category = spark.sql(qry)

In [44]:
avg_delays_by_distance_category.show()

+-----------+-----------------+-------------------+-----------------+
|flight_date|avg_arrival_delay|avg_departure_delay|distance_category|
+-----------+-----------------+-------------------+-----------------+
| 2019-05-06|            -5.42|               3.96|                6|
| 2019-05-06|            -0.02|               7.56|                4|
| 2019-05-06|             6.72|               9.91|                2|
| 2019-05-06|             6.16|               8.11|                1|
| 2019-05-06|            -4.92|               5.52|                5|
| 2019-05-06|              5.1|               9.75|                3|
+-----------+-----------------+-------------------+-----------------+



# Load to Bucket with timestamp

In [45]:
from datetime import date 
current_date = date.today()
file_name = str(current_date)

In [53]:
bucket_name = "gs://amy_data_etl"
output_flight_nums = bucket_name+"/flights_data_output/"+file_name+"_flight_nums"
output_distance_category = bucket_name+"/flights_data_output/"+file_name+"_distance_category"

avg_delays_by_flight_nums.coalesce(1).write.format("json").save(output_flight_nums)
avg_delays_by_distance_category.coalesce(1).write.format("json").save(output_distance_category)


In [48]:
output_flight_nums

'gs://amy_data_etl/flights_data_output/2021-11-20_flight_nums'

In [49]:
output_distance_category

'gs://amy_data_etl/flights_data_output/2021-11-20_distance_category'

# Create table and Load outputs from gs to  Bigquery

## path: /Users/amy_a/Desktop/gcp-data-engineering-master/bigquery-sparksql-batch-etl/spark-sql

### 1. create_tables.sh
### 2. load_json_partitioned.sh