In [1]:
# chap 4.1
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://3efcebd71aef:7077").config('spark.sql.warehouse.dir', '/tmp/warehouse').appName("MyAppFiles2").getOrCreate()



In [2]:
def to_date_format_udf(d_str):
  l = [char for char in d_str]
  return "".join(l[0:2]) + "/" +  "".join(l[2:4]) + " " + " " +"".join(l[4:6]) + ":" + "".join(l[6:])

to_date_format_udf("02190925")

'02/19  09:25'

In [3]:
spark.udf.register("to_date_format_udf", to_date_format_udf, StringType())

<function __main__.to_date_format_udf(d_str)>

In [4]:
df = (spark.read.format("csv")
      .schema("date STRING, delay INT, distance INT, origin STRING, destination STRING")
      .option("header", "true")
      .option("path", "/opt/spark-data/departuredelays.csv")
      .load())

display(df)

DataFrame[date: string, delay: int, distance: int, origin: string, destination: string]

In [5]:
df.selectExpr("to_date_format_udf(date) as data_format").show(10, truncate=False)

+------------+
|data_format |
+------------+
|01/01  12:45|
|01/02  06:00|
|01/02  12:45|
|01/02  06:05|
|01/03  12:45|
|01/03  06:05|
|01/04  12:43|
|01/04  06:05|
|01/05  12:45|
|01/05  06:05|
+------------+
only showing top 10 rows



In [6]:
df.createOrReplaceTempView("us_delay_flights_tbl")

In [12]:
spark.sql('CACHE TABLE us_delay_flights_tbl')

DataFrame[]

In [13]:
spark.sql("SELECT *, date, to_date_format_udf(date) AS date_fm FROM us_delay_flights_tbl").show(10, truncate=False)

+--------+-----+--------+------+-----------+--------+------------+
|date    |delay|distance|origin|destination|date    |date_fm     |
+--------+-----+--------+------+-----------+--------+------------+
|01011245|6    |602     |ABE   |ATL        |01011245|01/01  12:45|
|01020600|-8   |369     |ABE   |DTW        |01020600|01/02  06:00|
|01021245|-2   |602     |ABE   |ATL        |01021245|01/02  12:45|
|01020605|-4   |602     |ABE   |ATL        |01020605|01/02  06:05|
|01031245|-4   |602     |ABE   |ATL        |01031245|01/03  12:45|
|01030605|0    |602     |ABE   |ATL        |01030605|01/03  06:05|
|01041243|10   |602     |ABE   |ATL        |01041243|01/04  12:43|
|01040605|28   |602     |ABE   |ATL        |01040605|01/04  06:05|
|01051245|88   |602     |ABE   |ATL        |01051245|01/05  12:45|
|01050605|9    |602     |ABE   |ATL        |01050605|01/05  06:05|
+--------+-----+--------+------+-----------+--------+------------+
only showing top 10 rows



In [15]:
spark.sql("SELECT COUNT(*) FROM us_delay_flights_tbl").show() 

+--------+
|count(1)|
+--------+
| 1391578|
+--------+



In [16]:
spark.sql("SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC").show(10, truncate=False)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows



In [17]:
df.select("distance", "origin", "destination").where(col("distance") > 1000).orderBy(desc("distance")).show(10, truncate=False)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
|4330    |HNL   |JFK        |
+--------+------+-----------+
only showing top 10 rows



In [18]:
df.select("distance", "origin", "destination").where("distance > 1000").orderBy("distance", ascending=False).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [19]:
df.select("distance", "origin", "destination").where("distance > 1000").orderBy(desc("distance")).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [20]:
spark.sql("""
SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC
""").show(10, truncate=False)

+--------+-----+------+-----------+
|date    |delay|origin|destination|
+--------+-----+------+-----------+
|02190925|1638 |SFO   |ORD        |
|01031755|396  |SFO   |ORD        |
|01022330|326  |SFO   |ORD        |
|01051205|320  |SFO   |ORD        |
|01190925|297  |SFO   |ORD        |
|02171115|296  |SFO   |ORD        |
|01071040|279  |SFO   |ORD        |
|01051550|274  |SFO   |ORD        |
|03120730|266  |SFO   |ORD        |
|01261104|258  |SFO   |ORD        |
+--------+-----+------+-----------+
only showing top 10 rows



In [21]:
spark.sql("""SELECT delay, origin, destination,
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay > 120 AND delay < 360 THEN  'Long Delays '
                  WHEN delay > 60 AND delay < 120 THEN  'Short Delays'
                  WHEN delay > 0 and delay < 60  THEN   'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'No Delays'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10, truncate=False)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|333  |ABE   |ATL        |Long Delays  |
|305  |ABE   |ATL        |Long Delays  |
|275  |ABE   |ATL        |Long Delays  |
|257  |ABE   |ATL        |Long Delays  |
|247  |ABE   |DTW        |Long Delays  |
|247  |ABE   |ATL        |Long Delays  |
|219  |ABE   |ORD        |Long Delays  |
|211  |ABE   |ATL        |Long Delays  |
|197  |ABE   |DTW        |Long Delays  |
|192  |ABE   |ORD        |Long Delays  |
+-----+------+-----------+-------------+
only showing top 10 rows



In [22]:
df1 =  spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")

In [23]:
df1.createOrReplaceGlobalTempView("us_origin_airport_SFO_tmp_view")

In [24]:
spark.sql('DROP VIEW IF EXISTS global_temp.us_origin_airport_JFK_tmp_view')

DataFrame[]

In [25]:
df2 = spark.sql("SELECT date, delay, origin, destination from us_delay_flights_tbl WHERE origin = 'JFK'")

In [27]:
df2.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

In [29]:
spark.sql("DROP VIEW IF EXISTS us_origin_airport_JFK_tmp_view")

DataFrame[]

In [30]:
spark.catalog.listTables(dbName="global_temp")

[Table(name='us_origin_airport_SFO_tmp_view', catalog=None, namespace=['global_temp'], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='us_delay_flights_tbl', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]