In [15]:
# load spark
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("CsvReader").getOrCreate()

# read csv data
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("C:/Users/MaciejGumulka/OneDrive/Dokumenty/Github/spark_the_definite_guide/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

In [17]:
# inspect data
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [None]:
# explain how the transformations are working
flightData2015.sort("count").explain()

In [21]:
# set number of partitions of data, then sort by count
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(5)

[Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [38]:
# run another sql query
flightData2015.createOrReplaceTempView("flight_data_2015")

sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
sqlWay.take(5)

# sqlWay.explain()

[Row(DEST_COUNTRY_NAME='Moldova', count(1)=1),
 Row(DEST_COUNTRY_NAME='Bolivia', count(1)=1),
 Row(DEST_COUNTRY_NAME='Algeria', count(1)=1),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', count(1)=1),
 Row(DEST_COUNTRY_NAME='Pakistan', count(1)=1)]

In [42]:
# select maximum number of flights using "max" sql function
spark.sql("""
SELECT max(count) 
from flight_data_2015""").take(1)

[Row(max(count)=370002)]

In [9]:
# achieve the same by using max function inbuilt in spark
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [10]:
# select top 5 countries with highest amount of flights
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [47]:
# run the same but using spark inbuilt functions
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [48]:
# explain
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#419L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#194,destination_total#419L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#194], functions=[sum(cast(count#196 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#194, 5)
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#194], functions=[partial_sum(cast(count#196 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#194,count#196] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/MaciejGumulka/OneDrive/Dokumenty/Github/spark_the_definite_guide..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
