In [1]:
# create spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Introduction to Spark")
    .master("local[*]")
    .getOrCreate()
)
spark

In [2]:
# read data from file
import os

filepath = os.path.join(os.curdir, "data", "flight-data", "csv", "2015-summary.csv")

flightData2015 = (
    spark.read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv(filepath)
)

In [3]:
# convert data to native object by using action
flightData2015.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [4]:
# use wide transformation (sort)
flightData2015_sorted = flightData2015.sort("count") # nothing happen because sort is a transformation
flightData2015_sorted.explain() # see how spark built computational plan, from top (end result) -> bottom (source)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Intern/Spark-Learning/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [14]:
# query using sql
flightData2015.createOrReplaceTempView("flight_data_2015")
sql_query = spark.sql("""
    SELECT DEST_COUNTRY_NAME, count(1)
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
""")
sql_query.explain()
sql_query.show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=202]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Intern/Spark-Learning/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|             Moldova|       1|
|             Bolivia|       1|
|             Algeria|       1|
|Turks and Caicos ...|       1|
|            Pakistan|       1|
|    Marshall Islands|       1|
|            Suriname|       1|
|              Panama|       1|
|         New Zealand|       1|
|             Liberia|     

In [15]:
# query using dataframe apis
df_query = flightData2015.groupBy("DEST_COUNTRY_NAME").count()
df_query.explain()
df_query.show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=260]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Intern/Spark-Learning/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|             Moldova|    1|
|             Bolivia|    1|
|             Algeria|    1|
|Turks and Caicos ...|    1|
|            Pakistan|    1|
|    Marshall Islands|    1|
|            Suriname|    1|
|              Panama|    1|
|         New Zealand|    1|
|             Liberia|    1|
|             Ireland|    1|
|    

In [None]:
# some statistics information

##  maximum number of flights to and from any given location
####  sql way
maximum_flight_sql = spark.sql("SELECT max(count) FROM flight_data_2015")
maximum_flight_sql.explain()
maximum_flight_sql.show()

####  dataframe way
from pyspark.sql.functions import max
maximum_flight_df = flightData2015.select(max("count"))
maximum_flight_df.explain()
maximum_flight_df.show()


##  top five destination countries
top_5_sql = spark.sql.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(count#19)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=408]
      +- HashAggregate(keys=[], functions=[partial_max(count#19)])
         +- FileScan csv [count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Intern/Spark-Learning/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>


+----------+
|max(count)|
+----------+
|    370002|
+----------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(count#19)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=453]
      +- HashAggregate(keys=[], functions=[partial_max(count#19)])
         +- FileScan csv [count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Intern/Spark-Learning/data/flight-data/csv/2015-summary.c