# Chapter 5. Basic Structured Operations

This chapter focuses exclusively on fundamental
DataFrame operations and avoids aggregations, window functions, and joins. These are
discussed in subsequent chapters.

**DataFrame**:
- `records` = type `Row`
- `columns` : represent a computation expression that can be performed on each individual record in the Dataset
- `Schemas` : *name* and *type* of data in each column
- `Partitioning` : layout of the DataFrame (DataSet)'s physical distribution across the cluster
- `partitioning scheme` : defines how that (partitioning) is allocated. (either based on values in a certain column or nondeterministically.)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
    .master('local[4]')\
    .appName('Cha5')\
    .getOrCreate()

In [6]:
df = spark.read.format("json").load("../Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

In [8]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



## Schemas
Defines: 
- column names
- column types

Two ways to define schemas:
- let a data source to define the schema (*schema-on-read*)
- define it explicitely 

**Warning**:  
Deciding whether you need to define a schema prior to reading in your data depends on your use case.
For ad hoc analysis, schema-on-read usually works just fine (although at times it can be a bit slow with
plain-text file formats like CSV or JSON). However, this can also lead to precision issues like a long
type incorrectly set as an integer when reading in a file. When using Spark for production `Extract,Transform, and Load (ETL)`, it is often a good idea to define your schemas manually, especially when
working with untyped data sources like CSV and JSON because schema inference can vary depending
on the type of data that you read in.

In [11]:
spark.read.format('json').load('../Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json').schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

**Schema**: 
- *StructType*  
    - `StructField`( *name*, *type*, *Boolean flag*, *metadata (optional)*)
    - `StructField`( *name*, *type*, *Boolean flag*, *metadata (optional)*)
    - ...
    
*** Boolean flag: specifies whether that column can contain missing or `null` values

If the types in the data (at runtime) do not match
the schema, Spark will throw an error.

In [22]:
# create a specified schema
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
    StructField('DEST_COUNTRY_NAME', StringType(), True),
    StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
    StructField('count', LongType(), False, metadata = {'hello' : 'world'})
])

df = spark.read.format('json').schema(myManualSchema)\
    .load('../Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json')

In [23]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



## Columns and Expressions

### **Columns**
Construct and refert to a column : 
- `column`
- `col` (they are equal to each other)

In [19]:
from pyspark.sql.functions import col, column
col('someColumnName')

Column<b'someColumnName'>

In [20]:
column('someColumnName')

Column<b'someColumnName'>

**Note** : column might or might not exist in our *DataFrames*. Columns are not resolved until we compare the column names with
those we are maintaining in the `catalog`. Column and table resolution happens in the `analyzer` phase, as discussed in Chapter 4.

**Side note for Scala**:

We just mentioned two different ways of referring to columns. Scala has some unique language
features that allow for more shorthand ways of referring to columns. The following bits of syntactic
sugar perform the exact same thing, namely creating a column, but provide no performance
improvement:

```scala
// in Scala
$"myColumn"
'myColumn
```

The $ allows us to designate a string as a special string that should refer to an expression. The tick
mark (') is a special thing called a symbol; this is a Scala-specific construct of referring to some
identifier. They both perform the same thing and are shorthand ways of referring to columns by name.
You’ll likely see all of the aforementioned references when you read different people’s Spark code.
We leave it to you to use whatever is most comfortable and maintainable for you and those with whom
you work.

### Explicit column references
Refer to a specific DataFrame’s column: use `col` method on a DataFrame  
**an added benefit** : Spark does not need to resolve this column itself (during the analyzer phase) because we did that for Spark


In [71]:
df['count']

Column<b'count'>

In [69]:
df.DEST_COUNTRY_NAME

Column<b'DEST_COUNTRY_NAME'>

### Expressions
Columns are expressions
**expression:** a set of transformations on one or more values in a record in a DataFrame.

In the simplest case, an expression, created via the `expr` function, is just a DataFrame column
reference. In the simplest case, `expr("someCol")` is equivalent to `col("someCol")`.

In [78]:
from pyspark.sql.functions import expr
expr('count') 

Column<b'count'>

In [77]:
col('count')

Column<b'count'>

#### **Columns as expressions**
Columns provide a subset of expression functionality. If you use `col()` and want to perform
transformations on that column, you must perform those on that column reference. When using
an expression, the `expr` function can actually parse transformations and column references from
a string and can subsequently be passed into further transformations. 

Examples:

- `expr('someCol - 5')` 
- `col('someCol') - 5`
- `expr("someCol") - 5`

They are equivalent because Spark compiles these to a logical tree specifying the order of operations.

Key points:
- Columns are just expressions.
- Columns and transformations of those columns compile to the same logical plan as
parsed expressions.

Example 
```
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
```

![](images/log-tree.png)

In [81]:
expr('(((someCol + 5) * 200)  - 6) < OtherCol')

Column<b'((((someCol + 5) * 200) - 6) < OtherCol)'>

---

SQL expression and DataFrame code compile to the same underlying logical tree prior to execution.

This means that you can write your expressions as DataFrame code or as SQL expressions and get the exact **same performance characteristics**.

#### Accessing a DataFrame’s columns


In [82]:
## .columns would print out the columns of a dataframe as a list
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

## Records and Rows
single record = `Row`

`Row` : internally, arrays of bytes

In [89]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

### Creating Rows
if you create a `Row` manually, you must specify the values in the same order as the schema of the DataFrame 

In [94]:
from pyspark.sql import Row
myRow = Row('Hello', None, 1, False)

In [100]:
print(
    myRow[0],
    "\n",
    myRow[2]
)

Hello 
 1


## DataFrame Transformations
core operations:
- add rows or columns
- remove rows or columns
- transform a row into a column (or vice versa)
- change the order of rows based on the values in columns
![](images/trans.png)

### Creating DataFrames (more details in cha9)

In [103]:
## create DataFrame from data source
df = spark.read.format('json').load('../Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json')
df.createOrReplaceTempView('dfTable')

In [108]:
## create on the fly
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

# specify schemas
myManualSchema = StructType([
    StructField("some", StringType(), True),
    StructField("col", StringType(), True),
    StructField("names", LongType(), False)
])

# create rows
myRow1 = Row("Hello", None, 1)
myRow2 = Row("World", None, 2)

# create DataFrame with Rows and Schemas
myDf = spark.createDataFrame([myRow1, myRow2], myManualSchema)

myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
|World|null|    2|
+-----+----+-----+



**Three most useful transformations**:
- `select` : with columns and expressions
- `selectExpr` : with expressions in strings
- `pyspark.sql.functions` : a group of functions 

#### **select and selectExpr**

`select` and `selectExpr` allow you to do the DataFrame **equivalent of SQL queries** on a table of data:

```sql
-- 1.
-- in SQL
SELECT DEST_COUNTRY_NAME 
FROM dfTable 
LIMIT 2
```

equivalent to 

In [110]:
df.select('DEST_COUNTRY_NAME').show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



```sql
-- 2.
-- in SQL
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME 
FROM dfTable 
LIMIT 2
```
equivalent to

In [111]:
df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [113]:
## refer to columns in a number of different ways
df.select(
    'DEST_COUNTRY_NAME',
    expr('DEST_COUNTRY_NAME'),
    col("DEST_COUNTRY_NAME"),
    column('DEST_COUNTRY_NAME')
).show(2)

+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+
only showing top 2 rows



`expr` is the most flexible. It can refer to:
- plain column
- a string manipulation of a column

```sql
-- in SQL
SELECT DEST_COUNTRY_NAME as destination 
FROM dfTable 
LIMIT 2
```
equivalent to 

In [118]:
df.select(expr('DEST_COUNTRY_NAME as destination')).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [126]:
df.select(
    expr('DEST_COUNTRY_NAME as destination')\
    .alias('change_to_something_else')
).show(2)

+------------------------+
|change_to_something_else|
+------------------------+
|           United States|
|           United States|
+------------------------+
only showing top 2 rows



`selectExpr` : combine `select` wih `expr`, most convenient interface for everyday use

In [129]:
df.selectExpr('DEST_COUNTRY_NAME as destination', 'DEST_COUNTRY_NAME').show(2)

+-------------+-----------------+
|  destination|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



`selectExpr` : a simple way to build up complex expressions

```sql
-- in SQL
SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry
FROM dfTable
LIMIT 2
```

In [148]:
# adds a new column withinCountry
df.selectExpr(
    '*',
    '(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry'    
).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



**Aggregation**
```sql
-- in SQL
SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) 
FROM dfTable 
LIMIT 2
```

In [133]:
df.selectExpr(
    'avg(count)',
    'count(distinct(DEST_COUNTRY_NAME))'
).show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



#### **Converting to Spark Types (Literals)**

**`literals`** : explicit values that are just a value (**rather than a new column**). This might be a constant value or something we’ll need to compare to later on.   
similar to **`mutate()`** in R

using `lit()`

```sql
-- in SQL
SELECT *, 1 as One 
FROM dfTable 
LIMIT 2
```

In [137]:
from pyspark.sql.functions import lit
df.select(
    expr('*'),
    lit(1).alias('One')
).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [138]:
df.selectExpr(
    '*',
    '1 as One'
).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



#### **Adding Columns**
more formal way : using `.withColumn` method.

`.withColumn`: take two arguments
- column name
- expression

```sql
-- in SQL
SELECT *, 1 as numberOne 
FROM dfTable 
LIMIT 2
```

In [142]:
df.withColumn(
    'numberOne', lit(1)
).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [149]:
## another more complicated one
df.withColumn(
    'withinCountry',
    expr('ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME')
).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [160]:
# rename column
df.withColumn(
    'Destination',
    expr('DEST_COUNTRY_NAME')
).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

#### **Renaming Columns**
`.withColumnRenamed`


In [161]:
df.withColumnRenamed(
    'DEST_COUNTRY_NAME',
    'dest'
).columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

#### **Reserved Characters and Keywords**
using backtick ( **\`** ) characters

In [163]:
dfWithLongColName = df.withColumn(
    'This Long Column-Name',
    expr('ORIGIN_COUNTRY_NAME')
)
dfWithLongColName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



We don’t need escape characters here because the first argument to withColumn is just a string
for the new column name. In this example, however, we need to use backticks because we’re
referencing a column in an expression:
```sql
-- in SQL
SELECT `This Long Column-Name`, `This Long Column-Name` as `new col`
FROM dfTableLong 
LIMIT 2
```


In [165]:
dfWithLongColName.selectExpr(
    '`This Long Column-Name`',
    '`This Long Column-Name` as `new col`'
).show(2)

# create sql temp view
dfWithLongColName.createOrReplaceTempView('dfTableLong')

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



- **col()** : *explicit string-to-column reference, which is interpreted as a `literal`* - **not escape**
- **expr()** : *expression* - **escape**

In [169]:
# using `col()`
dfWithLongColName.select(col("This Long Column-Name")).columns

['This Long Column-Name']

In [170]:
# using `expr()`
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

['This Long Column-Name']

#### ** Case Sensitivity**

By default Spark is case insensitive; however, you can make Spark case sensitive by setting the
configuration:

```sql
-- in SQL
set spark.sql.caseSensitive true
```

#### **Removing Columns**
`.drop`

In [174]:
df.drop('ORIGIN_COUNTRY_NAME').columns

['DEST_COUNTRY_NAME', 'count']

In [176]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

['count', 'This Long Column-Name']

#### ** Changing a Column’s Type (cast)**
convert our count column from an `integer` to a type `Long`
```sql
-- in SQL
SELECT *, cast(count as long) AS count2 
FROM dfTable
```

In [184]:
df.withColumn(
    'count2',
    col('count').cast('long')
)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: bigint]

In [183]:
## convert ot string
df.withColumn(
    'count2',
    col('count').cast('string')
)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

#### **Filtering Rows**
Two methods (they are the same)
- `.filter`
- `.where`

**Note**  
When using the Dataset API from either *Scala or Java*, filter also accepts an **arbitrary function** that
Spark will apply to each record in the Dataset. See Chapter 11 for more information.

```sql
-- in SQL
SELECT * 
FROM dfTable 
WHERE count < 2 
LIMIT 2
```

In [187]:
df.where('count < 2').show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [194]:
df.filter(expr('count < 2')).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [195]:
df.filter(col('count') < 2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



**multiple filters** : multiple AND filters - chain them sequentially. 

*because Spark automatically performs all filtering operations at
the same time regardless of the filter ordering.*

```sql
-- in SQL
SELECT * 
FROM dfTable 
WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2
```

In [273]:
df.where(expr('count < 2'))\
    .where(col('ORIGIN_COUNTRY_NAME') != "Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



#### **Getting Unique Rows**
- `.distinct()`

```sql
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) 
FROM dfTable
```

In [202]:
df.select(
    'ORIGIN_COUNTRY_NAME',
    'DEST_COUNTRY_NAME'
).distinct().count()

256

```sql
-- in SQL
SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME) 
FROM dfTable
```


In [203]:
df.select(
    'ORIGIN_COUNTRY_NAME'
).distinct().count()

125

#### **Random Samples**
- `.sample()`

In [210]:
seed = 5
withReplcement = False
fraction= 0.5
df.sample(withReplcement, fraction, seed).count()

126

#### **Random Splits**
- `.randomSplit()`

In [212]:
dataFrames = df.randomSplit([0.25, 0.75], seed = 5)
dataFrames[0].count() > dataFrames[1].count()

False

In [222]:
print(type(dataFrames))
dataFrames

<class 'list'>


[DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint],
 DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]]

In [217]:
df.count() == dataFrames[0].count() + dataFrames[1].count()

True

#### ** Concatenating and Appending Rows (Union)**
DataFrames are `immutable` --- >>>> cannot append to DataFrames 

Solution: `union` the original DataFrame with the new DataFrame.

Makre sure:
- same schemas
- same number of columns

**WARNING**  
`Unions` are currently performed based on location, not on the schema. This means that columns will not automatically line up the way you think they might.

In [231]:
from pyspark.sql import Row

# copy same schema
schema = df.schema

# create new rows
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New Country 2", "Other Country 3", 1)
]

# parallelize the rows
parallelizedRows = spark.sparkContext.parallelize(newRows)

# create new DF
newDF = spark.createDataFrame(parallelizedRows, schema)

In [233]:
df.union(newDF)\
    .where('count = 1')\
    .where(col('ORIGIN_COUNTRY_NAME') != 'United States')\
    .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



~ You’ll need to use this new DataFrame reference in order to refer to the DataFrame
with the newly appended rows.   

A common way to do this 
- make the DataFrame into a view
- register it as a table so that you can reference it more dynamically in your code.

#### ** Sorting Rows**
two equivalent operations:   
*default = ascending*
- `sort`
- `orderBy`

accept both 
- column expressions
- strings

In [237]:
df.sort('count').show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [238]:
df.orderBy('count', 'DEST_COUNTRY_NAME').show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [239]:
df.orderBy(
    col('count'), 
    col('DEST_COUNTRY_NAME')
).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



To specify sorting directions:
- `desc`
- `asc`

```sql
-- in SQL
SELECT * 
FROM dfTable 
ORDER BY count DESC, DEST_COUNTRY_NAME ASC 
LIMIT 2
```


In [243]:
from pyspark.sql.functions import desc, asc
df.orderBy(
    expr('count desc')
).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [246]:
df.sort(
    col('count').desc(), col('DEST_COUNTRY_NAME').asc()
).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [247]:
df.sort(
    desc('count'), asc('DEST_COUNTRY_NAME')
).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



Advanced tip : specify where you would like your null values to appear in an ordered DataFrame. 

( Not currently supported in `pyspark -v 2.3.0` ; new pull request have been made, check newest version in GitHub.) 
- `asc_nulls_first`, `desc_nulls_first`
- `asc_nulls_last`, `desc_nulls_last`

For optimization, it is advisable to sort within partition before anothe transformation. (more in part III)
- `.sortWithinPartitions`

In [261]:
spark.read.format('json').load('../Spark-The-Definitive-Guide-master/data/flight-data/json/*-summary.json')\
    .sortWithinPartitions('count')

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

#### **Limit**
- `.limit`

restrict what you extract from a DataFrame. 

```sql
-- in SQL
SELECT * 
FROM dfTable 
LIMIT 6
```

In [270]:
df.limit(6).show()
# different from `df.show(6)`

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+



```sql
-- in SQL
SELECT * 
FROM dfTable 
ORDER BY count desc 
LIMIT 6
```

In [271]:
df.orderBy(expr('count desc')).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



#### **Repartition and Coalesce**

Another important optimization opportunity is to partition the data according to some frequently
filtered columns, which control the physical layout of data across the cluster including the
*partitioning scheme* and the *number of partitions*.

**Repartition** will incur a full shuffle of the data, regardless of whether one is necessary. This
means that you should typically only repartition when the future number of partitions is greater
than your current number of partitions or when you are looking to partition by a set of columns:

In [280]:
print('Current number of partitions : %d' % df.rdd.getNumPartitions())

Current number of partitions : 1


In [283]:
p = df.repartition(5).rdd.getNumPartitions()
print('Repartitioned number : %d' % p )

Repartitioned number : 5


If you will be filtering a by certain **column** frequently, may repartition based on that column

In [289]:
df.repartition('DEST_COUNTRY_NAME')
df.repartition(col('DEST_COUNTRY_NAME'))

df.repartition(5, col('DEST_COUNTRY_NAME'))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

**Coalesce** will not incur a full shuffle and will try to combine partitions. This
operation will shuffle your data into five partitions based on the destination country name, and
then coalesce them (without a full shuffle)

In [298]:
df.repartition(5, col('DEST_COUNTRY_NAME')).coalesce(2)\
    .rdd.getNumPartitions()

2

#### ** Collecting Rows to the Driver**
Spark maintains the state of the cluster in the driver. 

To collect some of your data to the driver in order to manipulate it on your local machine.
- `collect` : gets all data from the entire DataFrame
- `take` : select the first N rows
- `show` : print out a number of N rows *nicely*
- `toLocalIterator` : collects partitions to the driver as an iterator. (

**WARNING**  
Any collection of data to the driver can be a very expensive operation! If you have a large dataset and
call `collect`, you can crash the driver. If you use `toLocalIterator` and have very large partitions,
you can easily crash the driver node and lose the state of your application. This is also expensive
because we can operate on a one-by-one basis, instead of running computation in parallel.

In [300]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [303]:
collectDF.collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [301]:
collectDF.show() # this prints out nicely

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [307]:
collectDF.show(5, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



To iterate over the entire dataset 
- `toLocalIterator` : collects partitions to the driver as an iterator. ( This
method allows you to iterate over the entire dataset partition-by-partition in a serial manner)

In [309]:
collectDF.toLocalIterator()

<itertools.chain at 0x7fc2da001fd0>