# 2. Read CSV Data and Sort, Count and View Shuffles and Execution Plans, Filter Operations #
Examples based on *Spark: Definitive Guide: Big Data processing Made Simple*, by Mate Zaharia and Bill Chambers - Chapter 2.  

In this example, sample flight data for 2010 to 2015 is processed and the execution plan for a wide transformation (shuffle) is demonstrated.  Simple Sort and Count operations are performed against the data.  **Prerequisite:** The sample data can be downloaded to `./datain/flight-data` with the `data-download.ipynb` notebook.


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-nb-2-execplan").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.eventLog.enabled", "true").\
        config("spark.eventLog.dir", "file:///opt/workspace/events").\
        getOrCreate()      

22/01/02 18:08:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Read all the flight-data CSV files in the sample `../datain/flight-data` directory:

In [2]:
flightData = spark.read.option("inferSchema", True).option("header", True).csv("/opt/workspace/datain/flight-data/*.csv")

                                                                                

## Explore Data ##

View the Schema

In [3]:
flightData.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



View a small sample of the data-set:

In [4]:
flightData.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=24),
 Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1)]

Get summary statistics about the data

In [5]:
flightData.describe().show()

+-------+-----------------+-------------------+------------------+
|summary|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|             count|
+-------+-----------------+-------------------+------------------+
|  count|             1502|               1502|              1502|
|   mean|             null|               null|1718.3189081225032|
| stddev|             null|               null|22300.368619668898|
|    min|      Afghanistan|        Afghanistan|                 1|
|    max|         Zimbabwe|           Zimbabwe|            370002|
+-------+-----------------+-------------------+------------------+



There are 1502 Destination countries and 1502 origin countries listed, each with a count value.

## Viewing Execution Plans ##
The Spark `explain()` method can be used to show the execution strategy that will be chosen by Spark to execute a statement.  
  
In the example below, the `sort()` action requires all data from all partitions to be compared - this causes a *shuffle* AKA *partition exchange* which is shown in the execution plan as *Exchange rangepartitioning*.  This happens after the previous *FileScan* operation which reads all the data in to be processed.

In [6]:
flightData.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/opt/workspace/datain/flight-data/2010-summary.csv, file:/opt/workspace/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In the execution plan, the stage
```
 +- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 200)
```
shows that data from 200 partitions is exchanged between cluster nodes to perform the sort.

In [7]:
flightData.sort("count",ascending=False).take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=370002),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=358354),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=352742),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=348113),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=347452)]

#### Query Plan Exchange operations ####
The term `Exchange` means *shuffle* data between physical nodes in the cluster.  In general, try to reduce the number of partition exchanges.
*Exchange* is typically triggered by:
+ **Repartition** - re-organising data in *n* partitions - triggers *RoundRobinPartitioning Exchange*  
+ **Coalesce** - This reduces the number of partitions without a full re-partition.  EG, move all data to a single CSV file (single executor) will trigger *SinglePartitionExchange*  
+ **Sort** - to sort the output data, a *RangePartitioning Exchange* is used  
+ **Join** - when a Hash Join occurs, *Hash Partitioning Exchange* is triggered.


#### Set the Shuffle Partition Configuration ####
By setting the `spark.sql.shuffle.partitions` parameter, we can specify how many partitions to use in the data shuffle operation.  The default is 200 - we probably only need 2 for a 2 node cluster.

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", "2")
flightData.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#12 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 2)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/opt/workspace/datain/flight-data/2010-summary.csv, file:/opt/workspace/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


Now we see
```
+- Exchange rangepartitioning(count#12 ASC NULLS FIRST, 2)
```
in the execution plan (only 2 partitions are exchanged between cluster nodes).

In [9]:
# Due to Spark "lazy execution", our sort finally gets executed now (not at the Explain stage)
flightData.sort("count",ascending=False).take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=370002),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=358354),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=352742),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=348113),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=347452)]

#### Example of a Narrow Execution followed by Wide  - sum up all the counts ####
Count can be performed at each partition (a Narrow operation - no shuffle) then the results combined to a single count (a Wide operation / Shuffle)

In [10]:
from pyspark.sql import functions as F
flightData.select(F.sum("count")).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[sum(cast(count#12 as bigint))])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_sum(cast(count#12 as bigint))])
      +- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/opt/workspace/datain/flight-data/2010-summary.csv, file:/opt/workspace/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>


In [11]:
from pyspark.sql import functions as F
flights = flightData.select(F.sum("count")).withColumnRenamed("sum(count)", "flights_count").collect()

In [12]:
print(flights)
print(flights[0][0])

[Row(flights_count=2580915)]
2580915


In [13]:
spark.stop()

## Filter Rows and Columns ##
Filter out all the countries where Destination is not United States and then drop the DEST_COUNTRY_NAME column

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook-2-filter-count").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.eventLog.enabled", "true").\
        config("spark.eventLog.dir", "file:///opt/workspace/events").\
        getOrCreate()      

In [15]:
flightData = spark.read.option("inferSchema", True).option("header", True).csv("/opt/workspace/datain/flight-data/*.csv")

                                                                                

#### Filtering Data ####

Rows in Dataframes can be filtered using the `.filter` method in the Spark SQL API.  
Use `pyspark.sql.functions.col()` to reference a column to be checked in a filter expression.   
Columns in Dataframes can be filtered or dropped using the `drop` method.  

#### Physical and Logical Execution Plans ####
Set the Explain mode to *extended* by providing a Boolean `True` argument to the `explain()` operation causes the Logical execution plan as well as physical plan derived from the logical plan to be displayed.
  
In *Spark 3* additional modes can be specified - *simple*, *extended*, *codegen*, *cost*, *formatted* 


In [16]:
# Use Spark.SQL API syntax to filter - use col Function to reference col-name (needs wrapping in parentheses)
# To drop multiple columns, create a list and unpack in the function-call EG drop(*my_list_of_columns)
flightDataToUSA = flightData.filter((F.col("DEST_COUNTRY_NAME") == "United States")).drop("DEST_COUNTRY_NAME").explain(True)

== Parsed Logical Plan ==
Project [ORIGIN_COUNTRY_NAME#244, count#245]
+- Filter (DEST_COUNTRY_NAME#243 = United States)
   +- Relation[DEST_COUNTRY_NAME#243,ORIGIN_COUNTRY_NAME#244,count#245] csv

== Analyzed Logical Plan ==
ORIGIN_COUNTRY_NAME: string, count: int
Project [ORIGIN_COUNTRY_NAME#244, count#245]
+- Filter (DEST_COUNTRY_NAME#243 = United States)
   +- Relation[DEST_COUNTRY_NAME#243,ORIGIN_COUNTRY_NAME#244,count#245] csv

== Optimized Logical Plan ==
Project [ORIGIN_COUNTRY_NAME#244, count#245]
+- Filter (isnotnull(DEST_COUNTRY_NAME#243) && (DEST_COUNTRY_NAME#243 = United States))
   +- Relation[DEST_COUNTRY_NAME#243,ORIGIN_COUNTRY_NAME#244,count#245] csv

== Physical Plan ==
*(1) Project [ORIGIN_COUNTRY_NAME#244, count#245]
+- *(1) Filter (isnotnull(DEST_COUNTRY_NAME#243) && (DEST_COUNTRY_NAME#243 = United States))
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#243,ORIGIN_COUNTRY_NAME#244,count#245] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/opt/workspace/d

In [17]:
flightDataToUSA = flightData.filter((F.col("DEST_COUNTRY_NAME") == "United States")).drop("DEST_COUNTRY_NAME")

In [18]:
flightDataToUSA.take(10)

[Row(ORIGIN_COUNTRY_NAME='Romania', count=1),
 Row(ORIGIN_COUNTRY_NAME='Ireland', count=264),
 Row(ORIGIN_COUNTRY_NAME='India', count=69),
 Row(ORIGIN_COUNTRY_NAME='Singapore', count=25),
 Row(ORIGIN_COUNTRY_NAME='Grenada', count=54),
 Row(ORIGIN_COUNTRY_NAME='Marshall Islands', count=44),
 Row(ORIGIN_COUNTRY_NAME='Sint Maarten', count=53),
 Row(ORIGIN_COUNTRY_NAME='Afghanistan', count=2),
 Row(ORIGIN_COUNTRY_NAME='Russia', count=156),
 Row(ORIGIN_COUNTRY_NAME='Federated States of Micronesia', count=48)]

In [19]:
spark.stop()