In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Flight Data Analysis").getOrCreate()
flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("./data/flight-data/csv/2015-summary.csv")

flightData2015.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [13]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#98 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#98 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=134]
      +- FileScan csv [DEST_COUNTRY_NAME#96,ORIGIN_COUNTRY_NAME#97,count#98] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/apple/Documents/GitHub/DataBricksLearning/spark/data/fligh..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [14]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [15]:
flightData2015.createOrReplaceTempView("flightData2015")

In [16]:
data_frameWay = flightData2015.groupBy('DEST_COUNTRY_NAME').count()
data_frameWay.show()

+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|             Moldova|    1|
|             Bolivia|    1|
|             Algeria|    1|
|Turks and Caicos ...|    1|
|            Pakistan|    1|
|    Marshall Islands|    1|
|            Suriname|    1|
|              Panama|    1|
|         New Zealand|    1|
|             Liberia|    1|
|             Ireland|    1|
|              Zambia|    1|
|            Malaysia|    1|
|               Japan|    1|
|    French Polynesia|    1|
|           Singapore|    1|
|             Denmark|    1|
|               Spain|    1|
|             Bermuda|    1|
|            Kiribati|    1|
+--------------------+-----+
only showing top 20 rows



In [18]:
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME, count(1) FROM flightdata2015 GROUP BY DEST_COUNTRY_NAME""")

In [19]:
sqlWay.explain()
data_frameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#96, 5), ENSURE_REQUIREMENTS, [plan_id=201]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#96] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/apple/Documents/GitHub/DataBricksLearning/spark/data/fligh..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#96, 5), ENSURE_REQUIREMENTS, [plan_id=214]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#96] Batched: false, DataFilters: [], Format: CSV, Location: InMe

In [21]:
sql_like = spark.sql("SELECT max(count) from flightdata2015").take(1)
sql_like

[Row(max(count)=370002)]

In [22]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [26]:
maxSql = spark.sql("SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM flightdata2015 GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [30]:
from pyspark.sql.functions import desc
maxDataFrame = flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5)
maxDataFrame.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [31]:
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#284L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#96,destination_total#284L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[sum(count#98)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#96, 5), ENSURE_REQUIREMENTS, [plan_id=461]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#96], functions=[partial_sum(count#98)])
            +- FileScan csv [DEST_COUNTRY_NAME#96,count#98] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/apple/Documents/GitHub/DataBricksLearning/spark/data/fligh..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>


