#DataFrame Column Expressions

** Data Source **
* One hour of Pagecounts from the English Wikimedia projects captured August 5, 2016, at 12:00 PM UTC.
* Size on Disk: ~23 MB
* Type: Compressed Parquet File
* More Info: <a href="https://dumps.wikimedia.org/other/pagecounts-raw" target="_blank">Page view statistics for Wikimedia projects</a>

**Technical Accomplishments:**
* Continue exploring the `DataFrame` set of APIs.
* Continue to work with the `Column` class and introduce the `Row` class
* Introduce the transformations...
  * `orderBy(..)`
  * `sort(..)`
  * `filter(..)`
  * `where(..)`
* Introduce the actions...
  * `collect()`
  * `take(n)`
  * `first()`
  * `head()`

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) **The Data Source**

We will be using the same data source as our previous notebook.

As such, we can go ahead and start by creating our initial `DataFrame`.

In [0]:
# (source, sasEntity, sasToken) = getAzureDataSource()
# spark.conf.set(sasEntity, sasToken)

# parquetFile = source + "/wikipedia/pagecounts/staging_parquet_en_only_clean/"

In [0]:
%fs head /databricks-datasets/Rdatasets/data-001/csv/car/Greene.csv

In [0]:
file='/databricks-datasets/Rdatasets/data-001/csv/car/Greene.csv'

from pyspark.sql.functions import *

df= spark.read.option("header", "true").csv(file)
sorted_df = df.orderBy(df["nation"].desc())
display(sorted_df)


# sorted_df = df.orderBy(["column1", "column2"])  # sort by column1 ascending, then column2 ascending
# sorted_df = df.orderBy(df.column1.desc(), "column2")  # sort by column1 descending, then column2 ascending

_c0,judge,nation,rater,decision,language,location,success
15,Heald,Sri.Lanka,no,no,English,Toronto,-0.75377
76,Pratte,Sri.Lanka,no,no,English,Toronto,-0.75377
89,Iacobucci,Sri.Lanka,no,no,English,Toronto,-0.75377
120,Hugessen,Sri.Lanka,no,no,French,Montreal,-0.75377
263,Mahoney,Sri.Lanka,no,yes,English,Toronto,-0.75377
282,MacGuigan,Sri.Lanka,yes,yes,English,Toronto,-0.75377
285,MacGuigan,Sri.Lanka,yes,yes,English,Toronto,-0.75377
313,Iacobucci,Sri.Lanka,yes,no,English,Toronto,-0.75377
348,MacGuigan,Sri.Lanka,yes,no,English,Toronto,-0.75377
477,MacGuigan,Sri.Lanka,yes,yes,English,other,-0.75377


Let's look at the data once more...

In [0]:
# value_counts - ascending
sorted_df.groupby("nation").count().orderBy('count').display()

nation,count
Fiji,1
India,3
Pakistan,4
Argentina,5
Guatemala,5
Nicaragua,6
Nigeria,7
Ghana,9
Poland,11
Iran,16


In [0]:
# value_counts - descending
df= spark.read.option("header", "true").csv(file).groupby("nation").count()
sorted_df = df.orderBy(df["count"].desc())
display(sorted_df)

nation,count
Lebanon,71
China,68
Sri.Lanka,63
Bulgaria,36
Somalia,29
El.Salvador,26
Czechoslovakia,24
Iran,16
Poland,11
Ghana,9


##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) filter(..) & where(..)

If you look at the API docs, `filter(..)` and `where(..)` are described like this:
> Filters rows using the given condition.

Both `filter(..)` and `where(..)` return a new dataset containing only those records for which the specified condition is true.
* Like `distinct()` and `dropDuplicates()`, `filter(..)` and `where(..)` are aliases for each other.
  * `filter(..)` appealing to functional programmers.
  * `where(..)` appealing to developers with an SQL background.
* Like `orderBy(..)` there are two variants of these two methods:
  * `filter(Column)`
  * `filter(String)`
  * `where(Column)`
  * `where(String)`
* Unlike `orderBy(String)` which requires a column name, `filter(String)` and `where(String)` both expect an SQL expression.

Let's start by looking at the variant using an SQL expression:

### filter(..) & where(..) w/SQL Expression

In [0]:
df= spark.read.option("header", "true").csv(file).where( "nation = 'Poland'" ).show(10, False)

+----+--------+------+-----+--------+--------+--------+--------+
|_c0 |judge   |nation|rater|decision|language|location|success |
+----+--------+------+-----+--------+--------+--------+--------+
|83  |Stone   |Poland|yes  |no      |English |other   |-1.81529|
|161 |Hugessen|Poland|yes  |no      |French  |Montreal|-1.81529|
|366 |Heald   |Poland|no   |no      |English |other   |-1.81529|
|480 |Hugessen|Poland|yes  |no      |French  |Montreal|-1.81529|
|522 |Marceau |Poland|no   |no      |French  |Montreal|-1.81529|
|1004|Hugessen|Poland|no   |no      |French  |Montreal|-1.81529|
|1032|Urie    |Poland|no   |no      |English |other   |-1.81529|
|1494|Hugessen|Poland|no   |no      |French  |Montreal|-1.81529|
|1743|Pratte  |Poland|no   |no      |English |Toronto |-1.81529|
|1886|Marceau |Poland|no   |no      |French  |Montreal|-1.81529|
+----+--------+------+-----+--------+--------+--------+--------+
only showing top 10 rows



### filter(..) & where(..) w/Column

In [0]:
df= spark.read.option("header", "true").csv(file).filter( col("nation") == 'China' ).show(5, False)


+---+---------+------+-----+--------+--------+--------+--------+
|_c0|judge    |nation|rater|decision|language|location|success |
+---+---------+------+-----+--------+--------+--------+--------+
|52 |Pratte   |China |no   |no      |English |other   |-0.99462|
|77 |MacGuigan|China |no   |no      |French  |Montreal|-0.99462|
|323|Urie     |China |no   |no      |English |Toronto |-0.99462|
|328|MacGuigan|China |no   |no      |French  |Montreal|-0.99462|
|365|Marceau  |China |no   |no      |English |Toronto |-0.99462|
+---+---------+------+-----+--------+--------+--------+--------+
only showing top 5 rows



In [0]:
df = spark.read.option("header", "true").csv(file)
filtered_df = df.filter(col("nation").startswith("Ni"))
filtered_df.show(5)


+---+---------+---------+-----+--------+--------+--------+--------+
|_c0|    judge|   nation|rater|decision|language|location| success|
+---+---------+---------+-----+--------+--------+--------+--------+
|105|   Pratte|Nicaragua|  yes|      no| English|   other|-1.58563|
|107|Iacobucci|  Nigeria|   no|      no| English| Toronto|-1.20831|
|115| Hugessen|Nicaragua|  yes|      no| English| Toronto|-1.58563|
|507|  Mahoney|  Nigeria|  yes|     yes| English| Toronto|-1.20831|
|533|MacGuigan|  Nigeria|   no|      no| English| Toronto|-1.20831|
+---+---------+---------+-----+--------+--------+--------+--------+
only showing top 5 rows



##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) first() & head()

If you look at the API docs, both `first(..)` and `head(..)` are described like this:
> Returns the first row.

Just like `distinct()` & `dropDuplicates()` are aliases for each other, so are `first(..)` and `head(..)`.

However, unlike `distinct()` & `dropDuplicates()` which are **transformations** `first(..)` and `head(..)` are **actions**.

Once all processing is done, these methods return the object backing the first record.

In the case of `DataFrames` (both Scala and Python) that object is a `Row`.

In the case of `Datasets` (the strongly typed version of `DataFrames` in Scala and Java), the object may be a `Row`, a `String`, a `Customer`, a `PendingApplication` or any number of custom objects.

Focusing strictly on the `DataFrame` API for now, let's take a look at a call with `head()`:

In [0]:
filtered_df.first()

Out[62]: Row(_c0='105', judge='Pratte', nation='Nicaragua', rater='yes', decision='no', language='English', location='other', success='-1.58563')

In [0]:
filtered_df.head(5)

Out[64]: [Row(_c0='105', judge='Pratte', nation='Nicaragua', rater='yes', decision='no', language='English', location='other', success='-1.58563'),
 Row(_c0='107', judge='Iacobucci', nation='Nigeria', rater='no', decision='no', language='English', location='Toronto', success='-1.20831'),
 Row(_c0='115', judge='Hugessen', nation='Nicaragua', rater='yes', decision='no', language='English', location='Toronto', success='-1.58563'),
 Row(_c0='507', judge='Mahoney', nation='Nigeria', rater='yes', decision='yes', language='English', location='Toronto', success='-1.20831'),
 Row(_c0='533', judge='MacGuigan', nation='Nigeria', rater='no', decision='no', language='English', location='Toronto', success='-1.20831')]

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) The Row Class

Now that we have a reference to the object backing the first row (or any row), we can use it to extract the data for each column.

Before we do, let's take a look at the API docs for the `Row` class.

At the heart of it, we are simply going to ask for the value of the object in column N via `Row.get(i)`.

Python being a loosely typed language, the return value is of no real consequence.

However, Scala is going to return an object of type `Any`. In Java, this would be an object of type `Object`.

What we need (at least for Scala), especially if the data type matters in cases of performing mathematical operations on the value, we need to call one of the other methods:
* `getAs[T](i):T`
* `getDate(i):Date`
* `getString(i):String`
* `getInt(i):Int`
* `getLong(i):Long`

We can now put it all together to get the number of requests for the most requested project:

In [0]:
firstRow = filtered_df.first()
firstRow['nation']

Out[67]: 'Nicaragua'

In [0]:

firstRow = filtered_df.first()
nation = firstRow['nation']
success = firstRow['success']

print(article, success )

Nicaragua -1.58563


##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) collect()

If you look at the API docs, `collect(..)` is described like this:
> Returns an array that contains all of Rows in this Dataset.

`collect()` returns a collection of the specific type backing each record of the `DataFrame`.
* In the case of Python, this is always the `Row` object.
* In the case of Scala, this is also a `Row` object.
* If the `DataFrame` was converted to a `Dataset` the backing object would be the user-specified object.

Building on our last example, let's take the top 10 records and print them out.

In [0]:
rows = (filtered_df
  .limit(10)           # We only want the first 10 records.
  .collect()           # The action returning all records in the DataFrame
)

# rows is an Array. Now in the driver, 
# we can just loop over the array and print 'em out.

listItems = ""
for row in rows:
  nation = row['nation']
  total = row['success']
  listItems += f"    <li><b>{nation}</b> {total}</li>\n"
  
html = """
<body>
  <h1>Top 10 Articles</h1>
  <ol>
    %s
  </ol>
</body>
""" % (listItems.strip())

print(html)




<body>
  <h1>Top 10 Articles</h1>
  <ol>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
  </ol>
</body>



In [0]:
# UNCOMMENT FOR A PRETTIER PRESENTATION
displayHTML(html)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) take(n)

If you look at the API docs, `take(n)` is described like this:
> Returns the first n rows in the Dataset.

`take(n)` returns a collection of the first N records of the specific type backing each record of the `DataFrame`.
* In the case of Python, this is always the `Row` object.
* In the case of Scala, this is also a `Row` object.
* If the `DataFrame` was converted to a `Dataset` the backing object would be the user-specified object.

In short, it's the same basic function as `collect()` except you specify as the first parameter the number of records to return.

In [0]:
rows = filtered_df.take(10)

# rows is an Array. Now in the driver, 
# we can just loop over the array and print 'em out.

listItems = ""
for row in rows:
  nation = row['nation']
  total = row['success']
  listItems += f"    <li><b>{nation}</b> {total}</li>\n"
  
html = """
<body>
  <h1>Top 10 Articles</h1>
  <ol>
    %s
  </ol>
</body>
""" % (listItems.strip())

print(html)



<body>
  <h1>Top 10 Articles</h1>
  <ol>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nicaragua</b> -1.58563</li>
    <li><b>Nigeria</b> -1.20831</li>
    <li><b>Nicaragua</b> -1.58563</li>
  </ol>
</body>



In [0]:

# UNCOMMENT FOR A PRETTIER PRESENTATION
displayHTML(html)