# Greenplum Demo - Part 3

This is Part 3 of Greenplum Demo, ***MPP Fundamentals and Partitioning***. 

- If you missed Part 1 (*Setup, Describe Input Dataset & Data Loading*) or wish to repeat, then click [here](GP-demo-1.ipynb).
- If you missed Part 2 (*Basic Table Functions*) or wish to repeat, then click [here](GP-demo-2.ipynb).

In [3]:
import os, re
from IPython.display import display_html

import pygments.lexers
from pygments import highlight
from pygments.formatters import HtmlFormatter

CONNECTION_STRING = os.getenv('GPDBCONN')

cs = re.match('^postgresql:\/\/(\S+):(\S+)@(\S+):(\S+)\/(\S+)$', CONNECTION_STRING)

DB_USER   = cs.group(1)
DB_PWD    = cs.group(2)
DB_SERVER = cs.group(3)
DB_PORT   = cs.group(4)
DB_NAME   = cs.group(5)

In [4]:
%reload_ext sql
%sql $CONNECTION_STRING

u'Connected: gpadmin@gpadmin'

## 5. MPP Fundamentals

This topic provides an overview of how a MPP (*Massively Parallel Processing*) database such as Greenplum, processes queries. Understanding this process can be useful when writing and tuning queries.

Users issue queries to Greenplum Database as they would to any database management system. They connect to the database instance on the Greenplum master host using a client application such as `psql` and submit SQL statements.

### 5.1. Understanding Parallel Query Execution

Greenplum creates a number of database processes to handle the work of a query. On the master, the query worker process is called the **query dispatcher** (QD). The QD is responsible for creating and dispatching the query plan. It also accumulates and presents the final results. On the segments, a query worker process is called a **query executor** (QE). A QE is responsible for completing its portion of work and communicating its intermediate results to the other worker processes.

There is at least one worker process assigned to each slice of the query plan. A worker process works on its assigned portion of the query plan independently. During query execution, each segment will have a number of processes working on the query in parallel.

### 5.2. Tuning SQL Queries

The Greenplum Database cost-based optimizer evaluates many strategies for executing a query and chooses the least costly method.

Like other RDBMS optimizers, the Greenplum optimizer takes into account factors such as the number of rows in tables to be joined, availability of indexes, and cardinality of column data when calculating the costs of alternative execution plans. The optimizer also accounts for the location of the data, preferring to perform as much of the work as possible on the segments and to minimize the amount of data that must be transmitted between segments to complete the query.

When a query runs slower than you expect, you can view the plan the optimizer selected as well as the cost it calculated for each step of the plan. This will help you determine which steps are consuming the most resources and then modify the query or the schema to provide the optimizer with more efficient alternatives. You use the SQL EXPLAIN statement to view the plan for a query.

The optimizer produces plans based on statistics generated for tables. It is important to have accurate statistics to produce the best plan. 

### 5.3 How to Generate Explain Plans

The `EXPLAIN` and `EXPLAIN ANALYZE` statements are useful tools to identify opportunities to improve query performance:

- `EXPLAIN` displays the query plan and estimated costs for a query, but does not execute the query. `EXPLAIN ANALYZE` executes the query in addition to displaying the query plan. 
- `EXPLAIN ANALYZE` discards any output from the `SELECT` statement; however, other operations in the statement are performed (for example, INSERT, UPDATE, or DELETE). To use `EXPLAIN ANALYZE` on a DML statement without letting the command affect the data, explicitly use `EXPLAIN ANALYZE` in a transaction (`BEGIN; EXPLAIN ANALYZE ...; ROLLBACK;`).

`EXPLAIN ANALYZE` runs the statement in addition to displaying the plan with additional information as follows:

- **Total elapsed time** (in milliseconds) to run the query
- **Number of workers** (segments) involved in a plan node operation
- **Maximum number of rows** returned by the segment (and its segment ID) that produced the most rows for an operation
- The **memory** used by the operation
- **Time** (in milliseconds) it took to retrieve the first row from the segment that produced the most rows, and the total time taken to retrieve all rows from that segment.

### 5.4. How to Read Explain Plans

An `EXPLAIN` plan is a report detailing the steps the Greenplum Database optimizer has determined it will follow to execute a query. The plan is a tree of nodes, read from bottom to top, with each node passing its result to the node directly above. Each node represents a step in the plan, and one line for each node identifies the operation performed in that step—for example, a scan, join, aggregation, or sort operation. The node also identifies the method used to perform the operation. The method for a scan operation, for example, may be a sequential scan or an index scan. A join operation may perform a hash join or nested loop join.

An explain plan is a report detailing the steps the Greenplum Database optimizer has determined it will follow to execute a query. The plan is a tree of nodes, read from bottom to top, with each node passing its result to the node directly above. Each node represents a step in the plan, and one line for each node identifies the operation performed in that step—for example, a scan, join, aggregation, or sort operation. The node also identifies the method used to perform the operation. The method for a scan operation, for example, may be a sequential scan or an index scan. A join operation may perform a hash join or nested loop join.

Following is an explain plan for a simple query. This query finds the number of rows in the contributions table stored at each segment.

gpadmin=# EXPLAIN SELECT gp_segment_id, count(*)
                  FROM contributions 
                  GROUP BY gp_segment_id;
                                 QUERY PLAN                        
--------------------------------------------------------------------------------
 Gather Motion 2:1  (slice2; segments: 2)  (cost=0.00..431.00 rows=2 width=12)
   ->  GroupAggregate  (cost=0.00..431.00 rows=1 width=12)
         Group By: gp_segment_id
         ->  Sort  (cost=0.00..431.00 rows=1 width=12)
               Sort Key: gp_segment_id
               ->  Redistribute Motion 2:2  (slice1; segments: 2)  (cost=0.00..431.00 rows=1 width=12)
                     Hash Key: gp_segment_id
                     ->  Result  (cost=0.00..431.00 rows=1 width=12)
                           ->  GroupAggregate  (cost=0.00..431.00 rows=1 width=12)
                                 Group By: gp_segment_id
                                 ->  Sort  (cost=0.00..431.00 rows=7 width=4)
                                       Sort Key: gp_segment_id
                                       ->  Table Scan on table1  (cost=0.00..431.00 rows=7 width=4)
 Optimizer status: PQO version 2.56.0
(14 rows)
This plan has eight nodes – Table Scan, Sort, GroupAggregate, Result, Redistribute Motion, Sort, GroupAggregate, and finally Gather Motion. Each node contains three cost estimates: cost (in sequential page reads), the number of rows, and the width of the rows.

The cost is a two-part estimate. A cost of 1.0 is equal to one sequential disk page read. The first part of the estimate is the start-up cost, which is the cost of getting the first row. The second estimate is the total cost, the cost of getting all of the rows.

The rows estimate is the number of rows output by the plan node. The number may be lower than the actual number of rows processed or scanned by the plan node, reflecting the estimated selectivity of WHERE clause conditions. The total cost assumes that all rows will be retrieved, which may not always be the case (for example, if you use a LIMIT clause).

The width estimate is the total width, in bytes, of all the columns output by the plan node.

The cost estimates in a node include the costs of all its child nodes, so the top-most node of the plan, usually a Gather Motion, has the estimated total execution costs for the plan. This is this number that the query planner seeks to minimize.

Scan operators scan through rows in a table to find a set of rows. There are different scan operators for different types of storage. They include the following:

Seq Scan on heap tables — scans all rows in the table.
Append-only Scan — scans rows in row-oriented append-only tables.
Append-only Columnar Scan — scans rows in column-oriented append-only tables.
Index Scan — traverses a B-tree index to fetch the rows from the table.
Bitmap Append-only Row-oriented Scan — gathers pointers to rows in an append-only table from an index and sorts by location on disk.
Dynamic Table Scan — chooses partitions to scan using a partition selection function.
Join operators include the following:

Hash Join – builds a hash table from the smaller table with the join column(s) as hash key. Then scans the larger table, calculating the hash key for the join column(s) and probing the hash table to find the rows with the same hash key. Hash joins are typically the fastest joins in Greenplum Database. The Hash Cond in the explain plan identifies the columns that are joined.
Nested Loop – iterates through rows in the larger dataset, scanning the rows in the smaller dataset on each iteration. The Nested Loop join requires the broadcast of one of the tables so that all rows in one table can be compared to all rows in the other table. It performs well for small tables or tables that are limited by using an index. It is also used for Cartesian joins and range joins. There are performance implications when using a Nested Loop join with large tables. For plan nodes that contain a Nested Loop join operator, validate the SQL and ensure that the results are what is intended. Set the enable_nestloop server configuration parameter to OFF (default) to favor Hash Join.
Merge Join – sorts both datasets and merges them together. A merge join is fast for pre-ordered data, but is very rare in the real world. To favor Merge Joins over Hash Joins, set the enable_mergejoin system configuration parameter to ON.
Some query plan nodes specify motion operations. Motion operations move rows between segments when required to process the query. The node identifies the method used to perform the motion operation. Motion operators include the following:

Broadcast motion – each segment sends its own, individual rows to all other segments so that every segment instance has a complete local copy of the table. A Broadcast motion may not be as optimal as a Redistribute motion, so the optimizer typically only selects a Broadcast motion for small tables. A Broadcast motion is not acceptable for large tables. In the case where data was not distributed on the join key, a dynamic redistribution of the needed rows from one of the tables to another segment is performed.
Redistribute motion – each segment rehashes the data and sends the rows to the appropriate segments according to hash key.
Gather motion – result data from all segments is assembled into a single stream. This is the final operation for most query plans.
Other operators that occur in query plans include the following:

Materialize – the planner materializes a subselect once so it does not have to repeat the work for each top-level row.
InitPlan – a pre-query, used in dynamic partition elimination, performed when the values the planner needs to identify partitions to scan are unknown until execution time.
Sort – sort rows in preparation for another operation requiring ordered rows, such as an Aggregation or Merge Join.
Group By – groups rows by one or more columns.
Group/Hash Aggregate – aggregates rows using a hash.
Append – concatenates data sets, for example when combining rows scanned from partitions in a partitioned table.
Filter – selects rows using criteria from a WHERE clause.
Limit – limits the number of rows returned.
Optimizing Greenplum Queries
This topic describes Greenplum Database features and programming practices that can be used to enhance system performance in some situations.

To analyze query plans, first identify the plan nodes where the estimated cost to perform the operation is very high. Determine if the estimated number of rows and cost seems reasonable relative to the number of rows for the operation performed.

If using partitioning, validate that partition elimination is achieved. To achieve partition elimination the query predicate (WHERE clause) must be the same as the partitioning criteria. Also, the WHERE clause must not contain an explicit value and cannot contain a subquery.

Review the execution order of the query plan tree. Review the estimated number of rows. You want the execution order to build on the smaller tables or hash join result and probe with larger tables. Optimally, the largest table is used for the final join or probe to reduce the number of rows being passed up the tree to the topmost plan nodes. If the analysis reveals that the order of execution builds and/or probes is not optimal ensure that database statistics are up to date. Running ANALYZE will likely address this and produce an optimal query plan.

Look for evidence of computational skew. Computational skew occurs during query execution when execution of operators such as Hash Aggregate and Hash Join cause uneven execution on the segments. More CPU and memory are used on some segments than others, resulting in less than optimal execution. The cause could be joins, sorts, or aggregations on columns that have low cardinality or non-uniform distributions. You can detect computational skew in the output of the EXPLAIN ANALYZE statement for a query. Each node includes a count of the maximum rows processed by any one segment and the average rows processed by all segments. If the maximum row count is much higher than the average, at least one segment has performed much more work than the others and computational skew should be suspected for that operator.

Identify plan nodes where a Sort or Aggregate operation is performed. Hidden inside an Aggregate operation is a Sort. If the Sort or Aggregate operation involves a large number of rows, there is an opportunity to improve query performance. A HashAggregate operation is preferred over Sort and Aggregate operations when a large number of rows are required to be sorted. Usually a Sort operation is chosen by the optimizer due to the SQL construct; that is, due to the way the SQL is written. Most Sort operations can be replaced with a HashAggregate if the query is rewritten. To favor a HashAggregate operation over a Sort and Aggregate operation ensure that the enable_groupagg server configuration parameter is set to ON.

When an explain plan shows a broadcast motion with a large number of rows, you should attempt to eliminate the broadcast motion. One way to do this is to use the gp_segments_for_planner server configuration parameter to increase the cost estimate of the motion so that alternatives are favored. The gp_segments_for_planner variable tells the query planner how many primary segments to use in its calculations. The default value is zero, which tells the planner to use the actual number of primary segments in estimates. Increasing the number of primary segments increases the cost of the motion, thereby favoring a redistribute motion over a broadcast motion. For example, setting gp_segments_for_planner = 100000 tells the planner that there are 100,000 segments. Conversely, to influence the optimizer to broadcast a table and not redistribute it, set gp_segments_for_planner to a low number, for example 2.


## Step 5. Partitioning

Table partitioning enables supporting very large tables, such as fact tables, by logically dividing them into smaller, more manageable pieces. Partitioned tables can improve query performance by allowing the Greenplum Database query optimizer to scan only the data needed to satisfy a given query instead of scanning all the contents of a large table.

### 1. Create a new copy of the original table, define a *PARTITION* pattern (by month) and load it.

After you create the partitioned table structure, top-level parent tables are empty. Data is routed to the bottom-level child table partitions. In a multi-level partition design, only the subpartitions at the bottom of the hierarchy can contain data.

Rows that cannot be mapped to a child table partition are rejected and the load fails. To avoid unmapped rows being rejected at load time, define your partition hierarchy with a DEFAULT partition. Any rows that do not match a partition's CHECK constraints load into the DEFAULT partition.

At runtime, the query optimizer scans the entire table inheritance hierarchy and uses the CHECK table constraints to determine which of the child table partitions to scan to satisfy the query's conditions. The DEFAULT partition (if your hierarchy has one) is always scanned. DEFAULT partitions that contain data slow down the overall scan time.

When you use COPY or INSERT to load data into a parent table, the data is automatically rerouted to the correct partition, just like a regular table.

In [42]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-1-create-and-load-partition-table.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [43]:
query = !cat script/5-1-create-and-load-partition-table.sql
%sql $DB_USER@$DB_SERVER {''.join(query)}

Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
3453164 rows affected.
2 rows affected.


row_count,tablename
3453164,demo.amzn_reviews
3453164,demo.amzn_reviews_2


### 2. Familiarize yourself with the Partitioned Table Design and Present Basic Demographics

#### 2.1. Retrieve Partitioned Table Design

In [66]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-2-1-partition-table-design.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [68]:
query = !cat script/5-2-1-partition-table-design.sql
%sql $DB_USER@$DB_SERVER {''.join(query)}

18 rows affected.


partitionboundary,partitiontablename,partitionname,partitionlevel,partitionrank
PARTITION year1998 START ('1998-01-01'::date) END ('1999-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year1998,year1998,0,1
PARTITION year1999 START ('1999-01-01'::date) END ('2000-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year1999,year1999,0,2
PARTITION year2000 START ('2000-01-01'::date) END ('2001-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2000,year2000,0,3
PARTITION year2001 START ('2001-01-01'::date) END ('2002-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2001,year2001,0,4
PARTITION year2002 START ('2002-01-01'::date) END ('2003-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2002,year2002,0,5
PARTITION year2003 START ('2003-01-01'::date) END ('2004-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2003,year2003,0,6
PARTITION year2004 START ('2004-01-01'::date) END ('2005-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2004,year2004,0,7
PARTITION year2005 START ('2005-01-01'::date) END ('2006-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2005,year2005,0,8
PARTITION year2006 START ('2006-01-01'::date) END ('2007-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2006,year2006,0,9
PARTITION year2007 START ('2007-01-01'::date) END ('2008-01-01'::date) EVERY ('1 year'::interval),amzn_reviews_2_1_prt_year2007,year2007,0,10


#### 2.2. Row Count per Partition

In [69]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-2-2-row_count_per_partition.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [70]:
query = !cat script/5-2-2-row_count_per_partition.sql
%sql $DB_USER@$DB_SERVER {''.join(query)}

18 rows affected.


partition_name,row_count
demo.amzn_reviews_2_1_prt_year1998,7
demo.amzn_reviews_2_1_prt_year1999,1199
demo.amzn_reviews_2_1_prt_year2000,6732
demo.amzn_reviews_2_1_prt_year2001,9611
demo.amzn_reviews_2_1_prt_year2002,12196
demo.amzn_reviews_2_1_prt_year2003,14407
demo.amzn_reviews_2_1_prt_year2004,15162
demo.amzn_reviews_2_1_prt_year2005,17792
demo.amzn_reviews_2_1_prt_year2006,22957
demo.amzn_reviews_2_1_prt_year2007,49904


#### 2.3. Row Count per Partition & Segment

In [71]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-2-3-row-count-per-partition-segment.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [72]:
query = !cat script/5-2-3-row-count-per-partition-segment.sql
%sql $DB_USER@$DB_SERVER {''.join(query)}

36 rows affected.


partition_name,segment_id,segment_count,partition_count
demo.amzn_reviews_2_1_prt_year1998,0,2,7
demo.amzn_reviews_2_1_prt_year1998,1,5,7
demo.amzn_reviews_2_1_prt_year1999,0,602,1199
demo.amzn_reviews_2_1_prt_year1999,1,597,1199
demo.amzn_reviews_2_1_prt_year2000,0,3348,6732
demo.amzn_reviews_2_1_prt_year2000,1,3384,6732
demo.amzn_reviews_2_1_prt_year2001,0,4779,9611
demo.amzn_reviews_2_1_prt_year2001,1,4832,9611
demo.amzn_reviews_2_1_prt_year2002,0,6024,12196
demo.amzn_reviews_2_1_prt_year2002,1,6172,12196


### 3. Partitioned Table Size and Disk Space Usage

After you create the partitioned table structure, top-level parent tables are empty. Data is routed to the bottom-level child table partitions. In a multi-level partition design, only the subpartitions at the bottom of the hierarchy can contain data.

Compare the output below with the [Non-Partitioned Table Size and Disk Usage](http://127.0.0.1:9900/notebooks/gp-demo/GP-demo-2.ipynb#5.2.-Using-the-gp_toolkit-Administrative-Schema-(Greenplum-5.x)).

When you use `COPY` or `INSERT` to load data into a parent table, the data is automatically rerouted to the correct partition, just like a regular table.

In [143]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-3-partitioned-table-size-disk.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [74]:
query = !cat script/5-3-partitioned-table-size-disk.sql
%sql $DB_USER@$DB_SERVER {''.join(query)}

18 rows affected.


tablename,tabledisksize,uncompressedsize,tablesize,indexsize,toastsize,othersize,partitionname,partitiontablesize,partitionindexsize
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year1998,160 kB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year1999,1056 kB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2000,6304 kB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2001,9376 kB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2002,11 MB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2003,14 MB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2004,16 MB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2005,19 MB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2006,23 MB,0 bytes
demo.amzn_reviews_2,96 kB,96 kB,0 bytes,0 bytes,96 kB,0 bytes,demo.amzn_reviews_2_1_prt_year2007,41 MB,0 bytes


### 4. Verify your Partition Strategy and Demonstrate *Partition Elimination* functionality

When a table is partitioned based on the query predicate, you can use `EXPLAIN` to verify that the query optimizer scans only the relevant data to examine the query plan. For example, the `demo.amzn_reviews_2` table is date-range partitioned by year. 

#### Example 1: `SELECT`' data for a single day (`2011-07-12`):

In [144]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-4-explain-example-1.sql
display_html('\n'.join(sqlfilecode), raw=True)

The query plan for this query should show a table scan of only the following tables:

- the default partition returning 0-1 rows (if your partition design has one)
- the 2011 partition (`	demo.amzn_reviews_2_1_prt_year2011`) returning ***some number*** of rows

To confirm, execute the `EXPLAIN` query and check the query plan:

In [145]:
explain_output = !cat script/5-4-explain-example-1.sql \
    | psql $CONNECTION_STRING | pygmentize -f html -O full,style=colorful -l psql 
display_html('\n'.join(explain_output), raw=True)

#### Example 2 : "Single-Partition" `SELECT`

##### Calculate Number of Reviews for the period, 1 January - 25 October 2012 over the Non-Partitioned Table (`demo.amzn_reviews`)

In [147]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-4-explain-example-2-1.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [163]:
explain_output = !cat script/5-4-explain-example-2-1.sql \
    | psql $CONNECTION_STRING | pygmentize -f html -O full,style=colorful -l psql 
display_html('\n'.join(explain_output), raw=True)

##### Calculate Number of Reviews for the period, 1 January - 25 October 2012 over the Partitioned Table (`demo.amzn_reviews_2`)

In [164]:
sqlfilecode = !pygmentize -f html -O full,style=colorful -l postgres script/5-4-explain-example-2-2.sql
display_html('\n'.join(sqlfilecode), raw=True)

In [165]:
explain_output= !cat script/5-4-explain-example-2-2.sql \
    | psql $CONNECTION_STRING | pygmentize -f html -O full,style=colorful -l psql 
display_html('\n'.join(explain_output), raw=True)