#![Spark Logo](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark.png)

**Objective:**
Analyze Desktop vs Mobile traffic to English Wikipedia

**Time to Complete:**
30 mins

**Data Source:**
pageviews_by_second (<a href="http://datahub.io/en/dataset/english-wikipedia-pageviews-by-second" target="_blank">255 MB</a>)

**Business Questions:**
* Question # 1) How many rows in the table refer to mobile vs desktop?

**Engineering Questions:**
* How is the data partitioned? Why is it partitioned the way it is?

**Technical Accomplishments:**
- Upload a file to Databricks using the Tables UI (optional)
- Learn how Actions kick off Jobs + Stages
- Understand how DataFrame partitions relate to compute tasks
- Use Spark UI to monitor details of Job execution (input read, Shuffle, Storage UI, SQL visualization)
- Cache a DataFrame to memory (and learn how to unpersist it)
- Use the following transformations: `orderBy()`, `filter()`
- Catalyst Optimizer: How DataFrame queries are converted from a Logical plan -> Physical plan
- Configuration Option: `spark.sql.shuffle.partitions`

**NOTE** Please run this notebook in a Spark 2.0 cluster.

Attach to, and then restart your cluster first to clear out old memory caches and get to a default, standard environment. The restart should take 1 - 2 minutes.

![Restart Cluster](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/restart_cluster.png)

####![Wikipedia Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_wikipedia_tiny.png) **Introduction: Pageviews By Second**

Wikipedia.com is the 7th most popular website (measured by page views and unique visitors):

#![Top Ten Global Websites2](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/top_ten_websites.png)

Source: Alexa/Aug 2015: <a href="https://en.wikipedia.org/wiki/List_of_most_popular_websites" target="_blank">List_of_most_popular_websites</a>

In this notebook, we will analyze the traffic patterns to the desktop vs. mobile editions of English Wikipedia.

The Wikimedia Foundation has released 41 days of pageviews data starting March 16, 2015 at midnight. Two rows are collected every second:
- Desktop requests
- Mobile requests

####![Databricks Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_databricks_tiny.png) ** Let's use a premounted version of the data (from S3)**
Let's start by taking another look at what is on our file system: run the following cell and we should see all the datasets from Amazon S3 mounted into our shard:

In [8]:
display(dbutils.fs.ls("/mnt/wikipedia-readonly/"))

path,name,size
dbfs:/mnt/wikipedia-readonly/clickstream/,clickstream/,0
dbfs:/mnt/wikipedia-readonly/clickstream_parquet/,clickstream_parquet/,0
dbfs:/mnt/wikipedia-readonly/edits/,edits/,0
dbfs:/mnt/wikipedia-readonly/en_wikipedia/,en_wikipedia/,0
dbfs:/mnt/wikipedia-readonly/gx_clickstream/,gx_clickstream/,0
dbfs:/mnt/wikipedia-readonly/ipgeocode/,ipgeocode/,0
dbfs:/mnt/wikipedia-readonly/models/,models/,0
dbfs:/mnt/wikipedia-readonly/pagecounts/,pagecounts/,0
dbfs:/mnt/wikipedia-readonly/pagecounts_april_20_2015_3pm_gz/,pagecounts_april_20_2015_3pm_gz/,0
dbfs:/mnt/wikipedia-readonly/pagecounts_april_20_2015_3pm_parquet/,pagecounts_april_20_2015_3pm_parquet/,0


Next, let's take a look in our *pageviews* folder:

In [10]:
display(dbutils.fs.ls("/mnt/wikipedia-readonly/pageviews"))

path,name,size
dbfs:/mnt/wikipedia-readonly/pageviews/pageviews_by_second.parquet/,pageviews_by_second.parquet/,0
dbfs:/mnt/wikipedia-readonly/pageviews/pageviews_by_second.tsv,pageviews_by_second.tsv,262099389


We can import this file directly with the following command:

In [12]:
temp_df = (spark.read
   .option("header", "true")        # Use first line of all files as header
   .option("inferSchema", "true")   # Automatically infer data types. Inferschema demanda maior tempo de processamento pois varre os registros
   .option("delimiter", "\t")       # Use tab delimiter (default is comma-separator)
   .csv("/mnt/wikipedia-readonly/pageviews/pageviews_by_second.tsv")
)
temp_df.createOrReplaceTempView("pageviews_by_second") # Cria view que pode ser utilizada em comando SQL como se fosse uma tabela

# Note: In versions of Spark prior to 2.0, do this, instead:
#temp_df = (sqlContext.read
#   .format("com.databricks.spark.csv")
#   .option("header", "true")        # Use first line of all files as header
#   .option("inferSchema", "true")   # Automatically infer data types
#   .option("delimiter", "\t")       # Use tab delimiter (default is comma-separator)
#   .load("/mnt/wikipedia-readonly/pageviews/pageviews_by_second.tsv")
#)
#temp_df.registerTempTable("pageviews_by_second")

Lastly, we can verify that the "table" exists by using `spark` to create the `pageviewsDF` from the "temp" table "pageviews_by_second"

OBS: Quando usamos Python Pandas e R, por exemplo, estamos trabalhando na máquina driver do Spark, ou seja, o processamento não é paralelizado
Spark SQL não tem diferença de performance com pyspark.

In [15]:
spark.sql("select * from pageviews_by_second limit 10").show()

In [16]:
# Lê view em memória e coloca no dataframe
pageviews_df = spark.read.table("pageviews_by_second")

And then we can take a look at the first 10 records.

In [18]:
pageviews_df.show(10)

Next, take note that the timestamps and/or sites are out of order. We will dig into this more later.

Click the down arrows in the cell above to see that the `show()` action kicked off 1 job and 1 stage.

We will learn more about Jobs and Stages later in this lab.

`printSchema()` prints out the schema for the table, the data types for each column and whether a column can be null:

In [22]:
pageviews_df.printSchema()

Notice above that the first 2 columns are typed as `string`, while the requests column holds an `integer`.

####![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **Partitions and Tasks**

DataFrames are made of one or more partitions.  To see the number of partitions a DataFrame is made of:

In [26]:
pageviews_df.rdd.getNumPartitions()

Why? Let's see how many "cores" we have in this cluster:

The above cell first converts the DataFrame to an RDD, then calls the partitions method followed by the size method on the RDD. We will learn more about RDDs in a future lab. For now, just remember this handy trick to figure out the number of partitions in a DataFrame.

Here is what the DataFrame looks like:

![4 partitions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/4_partitions_dashed.png)

The dashed lines for the borders indicates that the DataFrame is still on disk and has not been cached into memory.

Count the number of records (rows) in the DataFrame:

In [33]:
pageviews_df.count()

Let's understand how Spark is actually computing the result of 7.2 million for the count action. It is important to understand the relationship between the number of partitions in a DataFrame and the number of tasks required to process a DataFrame.

In the cell above, where you ran the count, expand the Spark Jobs and Stages:

![Expand Jobs and Stages](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/count_jobs_stages_tasks.png)

Each Spark action (like count) kicks off one or more Jobs. Above we see that one job (Job #2) was launched. *(your specific job # may be different)*

Each job is comprised of one or more Stages. Above we see that two stages (Stage #2 and #3) were launched to compute the result.

To learn more details about the Job and Stages, open the Spark UI in a new tab:

![Open Spark UI](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/view-spark-ui.png)

When you go to the new "Spark UI" tab, you should see the Jobs page, with a few completed jobs. Click on the link under Description for the Job # used to run the count action:

![Two Completed Jobs](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/three_completed_jobs.png)

On the "Details for Job #" page, you can now see several metrics about the count Job:

![Two Stages Colored](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/two_stages_colored.png)

In the screenshot above, we can see (in purple) that Stage 2 (the first Stage for the count job) took 21 seconds to run, while Stage 3 only took 0.2 seconds.

Under the "Input" column, (in green) notice that Stage 2 read about 250 MB of data and (in orange) wrote 168 Bytes of shuffle data.

Under the "Shuffle Read" column, we can also see that Stage 3, (in orange) read the 168 Bytes of data that Stage 2 had written.

To learn more about the details of Stage 2, click the link (in red) under the Description column for Stage 2:

On the "Details for Stage 2" page, scroll all the way to the bottom till you see the 8 Tasks:

![Stage-1, 4 tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/stageone_4tasks.png)

In the Tasks screenshot above, we can see (in green) that the first 3 tasks read 64 MB of the file, while the last task read 58 MB of the file. Also notice (in green) that the each of the 64 MB buffers that the first 3 tasks read was comprised of about 1.8 million records, but the last task that read 58 MB only read about 1.6 million records.

We can also see (in purple) that each task emitted a single 42 Byte record as the Shuffle Write.

When Spark reads CSV files from S3, the input split is 64 MB. That means that Spark will launch a new task/thread to read each 64 MB split of the file.

In this case, after reading the first three input splits, only 58 MB remain, so the last task reads 58 MB:

![64 MB input split](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/input_split.png)

Click back in your browser to return to the "Details of Job #" page, then click on the link under Description to see the details of the next Stage (Stage #3):

![Click Stage 2](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/two_stages_clickstagetwo.png)

Once again, scroll all the way to the bottom till you see the 1 Task for Stage 3:

![Stage two, 1 task](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/stage2_onetask.png)

Notice in the screenshot above, that the single task in Stage 3 read 168 Bytes of data (4 records.)

The diagram below explains what's going on. The count Job is kicking off two stages.

Stage 2 (the first stage of the job) has 4 tasks and each task reads between 1.6 million to 1.8 million records.

Each task in Stage 2 emits one record with the aggregated count that it saw in its local partition of data.

Then all four tasks in Stage 2 complete.

Stage 3 (the second stage of the job) starts with only one task. The task reads the 4 records from Stage 2 and performs a final aggregation and emits the number 7.2 million back to our Notebook cell as the final result of the computation!

![Count, Physical Model](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/count_physicalmodelwjob.png)

####![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **Transformation: `orderBy()`**

The `orderBy()` transformation can be used to order the table by the timestamp column:

In [58]:
(pageviews_df
  .orderBy("timestamp")  # transformation
  .show(10)                            # action
)

The first 2 rows show data from March 16, 2015 at **00:00:00** (midnight).

The 3rd and 4th rows show data from a second after midnight, **00:00:01**.

The DataFrame contains 2 rows for every second, one for desktop and one for mobile.

Did you notice that the first 6 rows in the DataFrame are ordered by `desktop`, then `mobile` traffic, but the last 4 rows are ordered by `mobile`, then `desktop`:

![Out of Order](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/out_of_order.png)

The following command orders the rows by first the timestamp (ascending), then the site (descending) and then shows the first 10 rows again:

In [63]:
pageviews_df.orderBy(pageviews_df["timestamp"], pageviews_df["site"].desc()).show(10)

The `orderBy()` transformation takes from 20 to 30 seconds to run against the 255 MB pageviews file on S3.

####![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **Reading from Disk vs Memory**

The 255 MB pageviews file is currently on S3, which means each time you scan through it, your Spark cluster has to read the 255 MB of data remotely over the network.

Once again, use the `count()` action to scan the entire 255 MB file from disk and count how many total records (rows) there are:

In [68]:
pageviews_df.count()

As we saw earlier, the pageviews DataFrame contains 7.2 million rows.

Hmm, that took about at least 20 seconds. Let's cache the DataFrame into memory to speed it up.

In [71]:
# pageviews_by_second = view criada no início do notebook
spark.table("pageviews_by_second").cache()

Caching is a lazy operation (meaning it doesn't take effect until you call an action that needs to read all of the data). So let's call the `count()` action again:

In [73]:
# During this count() action, the data is read from S3 and counted, and also cached
# Note that when the count action has to also cache data, it takes longer than simply a count (like above)

pageviews_df.count()

The DataFrame should now be cached, let's run another `count()` to see the speed increase:

In [75]:
pageviews_df.count()

Notice that scanning the DataFrame takes significantly faster!

Now that the pageviews DataFrame is cached in memory, if you go to the Spark UI tab and click on "Storage" (1 in image below) you'll see the "pageviews_by_second" DataFrame in memory:

![Storage UI](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/storage_ui.png)

Notice above that the DataFrame is made of 4 partitions totaling 192 MB in size.

The Storage Level for DataFrames is actually the new Tungsten Binary format.

Click on the DataFrame name link under the RDD Name column (2 in image above) to see more details about the DataFrame.

![Storage UI](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/storage_ui_details.png)

Although the first 3 input splits read from S3 were 64 MB, when they got cached in memory using the Tungsten binary format, they became 49 MB each. The last 58 MB input split became a 44 MB partition in memory:

![df in memory](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/df_in_mem.png)

####![Wikipedia + Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/wiki_spark_small.png) Q-1) How many rows in the table refer to mobile vs desktop?

Use the `filter()` transformation to keep only the rows where the site column is equal to mobile:

In [85]:
pageviews_df.filter(pageviews_df['site'] == 'mobile').count()

**Challenge 1.**:

Open the Spark UI and answer the following questions:

 * How much time did the previous job take?
 * How many stages did is start?
 * How many bytes were read by the second stage?
 * How much time did the task in the second stage take to complete?

In [87]:
# TODO
# Type your answers here...

# Total time taken: 0.4s
# Number of stages started: 2
# Shuffle bytes read in the second stage: 472 bytes
# Time taken for the task in the second stage to complete: 8ms

Expand the Spark Jobs above and notice that even though we added a `filter()` transformation, the Job still requires 2 Stages with 4 tasks in the first Stage and 1 task in the second Stage:

![Filter Count Expand](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/filter_count_expand.png)

**Challenge 2:** How many rows refer to desktop?

In [91]:
# TODO
# Type your answer here...
pageviews_df.filter(pageviews_df['site'] == 'desktop').count()

So, 3.6 million rows refer to the mobile page views and 3.6 million rows refer to desktop page views.

Let's compare the above `filter()` + `count()` from a Logical Model vs Physical Model perspective.

Reading a DataFrame from a Databricks table and running a filter() on it are both lazy operations, so technically no work is done yet:

![Logical Model: Filter](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/filter_lazy.png)

However, when you call the count() action, it triggers the read from S3, and the filter() and count() to run:

![Logical Model: Filter and Count](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/filter_count_run.png)

The Physical Model looks different. The filter() + count() job kicks off 2 Stages:

![Physical Model: Filter and Count](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/filter_physical_model.png)

Each of the four tasks in the 1st Stage are actually doing 4 things:
- Read input split from S3
- Filter for just mobile or desktop traffic
- Do a local aggregation on the input split partition
- Write a single record to local SSD with the count # seen in the partition

![Pipelining](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/pipelining.png)

####![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) ** SQL Query Plan Visualization & the Catalyst Optimizer**

Recall that the last command we just ran above was:

In [103]:
# pageviews_df.filter(pageviews_df['site'] == 'desktop').count()

To see the SQL Query Plan for the `filter()` + `count()` query, click on the SQL tab in the Spark UI:

![SQL viz](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/sql_viz.png)

In the diagram above, you can see that 7.2 million records are read from memory (green) to create the DataFrame, then filtered for just desktop (or mobile) traffic. The 3.6 million rows that pass the filter are projected out to an aggregator, which outputs 4 records to the Shuffle.

Everything above the TungstenExchange shuffle (in purple) is part of the 1st Stage. After the shuffle, in the 2nd stage, an aggregation is done on 4 input rows to emit 1 output row.

You can expand the "Details" in the SQL visualization UI to see the logical and physical plans:

![SQL details](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/sql_details.png)

At the core of Spark SQL is the Catalyst optimizer, which all DataFrame, SQL and Dataset queries flow through to generate a physical plan that gets executed using RDDs:

![Catalyst Optimizer](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/catalyst.png)

Catalyst is one of the newest and most technically involved components of Spark. It leverages advanced programming language features (e.g. Scala?s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer.

The main data type in Catalyst is a tree composed of zero or more child node objects. Trees can be manipulated using rules (functions that turn a tree into a new tree). You can read more about how Catalyst works in this Databricks blog post: <a href="https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html" target="_blank">April 2015: Deep Dive into Spark SQL?s Catalyst Optimizer</a>

**You can always check the logical and phisical plans of how Spark will calculate a DataFrame in the notebook, too**:

In [113]:
pageviews_df.filter(pageviews_df["site"] == "desktop").explain(True)

####![Spark Logo Tiny](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/logo_spark_tiny.png) **Memory persistence and Shuffle Partitions **

Recall from the first notebook that your Spark local mode cluster is running with 3 slots:

![Notebook + Micro Cluster](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/book_intro/notebook_microcluster.png)

For best performance, we should cache DataFrames into memory with a number of partitions that is a multiple of the number of slots (3 or 6 or 9, etc).

For example, here is a DataFrame in memory (orange) with 3 partitions:

![Arch 3 slots](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/arch_3slots.png)

When running transformations on the DataFrame, all 3 partitions can be analyzed in parallel:

![Arch 3 tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/arch_3tasks.png)

Three seems to be a more ideal number of partitions than four.

First, unpersist the original base DataFrame, `pageviewsDF`. Then re-read the 255 MB file from S3, order it by the timestamp column, and re-cache it with 3 partitions:

In [122]:
pageviews_df.unpersist()

The Storage UI will now be empty:

![Storage UI empty](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/storage_empty.png)

**Challenge 2:** Reload the table from S3, order it by the timestamp and site column (like above) and cache it:

Hint: Name the new DataFrame `pageviewsOrderedDF`

In [126]:
# TODO
# Type your answer here...
pageviews_ordered_df = pageviews_df.orderBy(pageviews_df['timestamp'], pageviews_df['site'].desc()).cache()

In [127]:
# Materialize the cache
pageviews_ordered_df.count()

How many partitions are in the new DataFrame?

In [129]:
pageviews_ordered_df.rdd.getNumPartitions()

#### **200 Partitions!**

What could have happened? Expand the Job details in the `count()` command above by clicking the two down arrows:

![200 tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/200tasks.png)

The first stage (Stage 17 above) is reading the 4 input splits from S3. The next Stage seems to be using 200 tasks to do the `orderBy()` transformation (in purple). This is when the DataFrame is being snapshotted and cached into memory. The final stage (Stage 19 above) is doing the final aggregation for the count.

By clicking on the Jobs tab in the Spark UI and then clicking into the details for the last job, you can see the same 3 stages:

![3stages_200partitions.png](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/3stages_200partitions.png)

Notice that the first stage read 250 MB of input data (in green) and wrote 110 MB of shuffle data (in purple).

The second stage read the 110 MB of shuffle data from the earlier stage and wrote 8.2 KB of shuffle data (in orange).

The third stage read the 8.2 KB of shuffle data from the middle stage.

The trick to understanding what's going on lies in the following Spark SQL configuration option.

In [138]:
spark.conf.get("spark.sql.shuffle.partitions")

The option is set to 200. This configures the number of partitions to use when shuffling data for joins or aggregations.

We can change this value programmatically. What should be change it to? Some small multiple of the number of available threads in our cluster is a good starting point. Since we're using a local mode cluster, we can easily see how many threads we have.

In [140]:
spark.conf.get("spark.master")

Let's start with a multiple of 1, so we'll set the number of shuffle partitions to 8.

In [142]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

Verify the change:

In [144]:
spark.conf.get("spark.sql.shuffle.partitions")

Unpersist the DataFrame and re-run the read/orderBy/cache/count to store the DataFrame in memory with 3 partitions:

In [146]:
pageviews_ordered_df.unpersist()

In [147]:
pageviews_ordered_df = (
  spark
    .read
    .table("pageviews_by_second")
    .orderBy(pageviews_ordered_df['timestamp'], pageviews_ordered_df['site'].desc())
    .cache()
)

In [148]:
pageviews_ordered_df.count()

Expand the Spark Jobs and Job # details in the cell above and note that this time the middle stage only used 3 tasks:

![Middle Stage, 3 Tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/middlestage_3tasks.png)

Check the size of the DataFrame now:

In [152]:
pageviews_ordered_df.rdd.getNumPartitions()

If you drill into the details of the Spark UI's Storage tab and click on the RDD name, you will now see the DataFrame in memory with 3 partitions:

![Storage UI, 3 tasks](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/pageviews/storage_3partitions.png)

You can learn more about the different configuration options in Spark SQL in the <a href="https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options" target="_blank">Apache Spark Docs</a>.