# Query GS data
- Create a hive table ('wdi_2016') against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table

In [3]:
-- Note semicolon is not allowed in zepplein
DROP TABLE IF EXISTS wdi_gs 


In [4]:
-- Create wdi_gs
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_homa/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [5]:
-- Show table meta data
DESCRIBE FORMATTED wdi_gs

In [6]:
SELECT count(countryName) FROM wdi_gs;

In [7]:
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/homaalmasieh/hive/wdi/wdi_csv_text'

In [8]:
-- Run a HiveQL which read data from wdi_gs table and write to wdi_csv_text 
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

## Check HDFS file size for wdi_csv_text file using bash
- hdfs dfs -ls -h /user/homaalmasieh/hive/wdi/wdi_csv_text
- hdfs dfs -du -s -h /user/homaalmasieh/hive/wdi/wdi_csv_text

In [10]:
-- Execute twice, and the second query run faster 
SELECT count(countryName) FROM wdi_csv_text

The number of rows(unique contryName) in wdi_csv_text table is 21759408.

## Monitor Hadoop/Yarn job

In [13]:
%sh

#SSH to master node
cd ~
hdfs  dfs -get  hdfs:///user/homaalmasieh/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G	total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s


 
It takes much longer to execute the query using bash since we would only be using one machine. Hive allows us to use multiple machine. 


In [15]:
-- Run the following query and discuss what could be wrong with the indicatorcode column.
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;


## Parsing issues
- Create an external table wdi_gs_debug with one column without any SerDe parsing
- Query the line that have parsing issue
- Create a Table with OpenCSV SerDe

In [17]:
DROP TABLE IF EXISTS wdi_gs_debug

In [18]:
-- Create a debug table 
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jarvis_data_eng_homa/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")


In [19]:
SELECT DISTINCT(line) FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"

In [20]:
DROP TABLE IF EXISTS wdi_opencsv_gs

In [21]:
-- Create a opencsv table 
-- SerDe treats all columns to be of type String
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_homa/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [22]:
SELECT DISTINCT(indicatorcode) from wdi_opencsv_gs

In [23]:
DROP TABLE IF EXISTS wdi_opencsv_text

In [24]:
-- Create wdi_opencsv_text table
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/homaalmasieh/hive/wdi/wdi_opencsv_text'

In [25]:
-- Write a HiveQL whcih read data from wdi_opencsv_gs and write to wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [26]:
-- Verify parsing from wdi_opencsv_text
SELECT DISTINCT(indicatorCode) from wdi_opencsv_text

In [27]:
-- Compare execution time
SELECT count(countryName) FROM wdi_opencsv_text

In [28]:
SELECT count(countryName) FROM wdi_csv_text

 
wdi_opencsv_text is significantly slower since it's using the OpenCSVSerde which has to do extra working in parsing the data for quotes and delimiters.    

## OpenCSVSerde Limitation
Create an OpenCSV Serde view and cast the column data_type to the proper type

In [31]:
-- Show wdi_opencsv_text metadata
DESCRIBE FORMATTED wdi_opencsv_text


In [32]:
-- Show wdi_csv_text metadata
DESCRIBE FORMATTED wdi_csv_text

In [33]:
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [34]:
-- Create a view on top of wdi_opencsv_text
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST(year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT) 
FROM wdi_opencsv_text


In [35]:
DESCRIBE FORMATTED wdi_opencsv_text_view

## 2015 Canada GDP Growth HQL

In [37]:
-- Write a HQL to find out the correct indicator name/code
SELECT DISTINCT(indicatorCode), indicatorName
FROM wdi_opencsv_text
WHERE indicatorCode LIKE '%GDP%'

In [38]:
-- Find 2015 GDP growth (annual %) for Canada
SELECT indicatorValue, year, countryName
FROM wdi_opencsv_text
WHERE countryName = 'Canada' AND indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'


Using filter will definitely slow down the query especially for table that are really huge. We can optimize our query by doing Hive partition.

 
## Hive Partitions
Optimize the previous query using Hive partition feature

In [41]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions

In [42]:
-- Create wdi_opencsv_text_partitions table which partitioned by year
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT) PARTITIONED By (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/homaalmasieh/hive/wdi/wdi_opencsv_text_partitions'

In [43]:
-- Load data from wdi_opencsv_text to wdi_opencsv_text_partitions
SET hive.exec.dynamic.partition.mode=nonstrict
SET hive.exec.dynamic.partition=true;
SET hive.stats.column.autogather=false;
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
SELECT countryname, countrycode, indicatorname, indicatorcode, indicatorvalue, year
FROM wdi_opencsv_text

In [44]:
SELECT * FROM wdi_opencsv_text_partitions

In [45]:
%sh
# Check how many partitions created
hdfs  dfs -ls -h hdfs:///user/homaalmasieh/hive/wdi/wdi_opencsv_text_partitions

In [46]:
-- Retrieve 2015 GDP growth (annual %) for Canada
SELECT indicatorValue, year, countryName
FROM wdi_opencsv_text
WHERE countryName = 'Canada' AND indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

In [47]:
DROP VIEW IF EXISTS wdi_opencsv_gdp_partitions_view

In [48]:
CREATE VIEW IF NOT EXISTS wdi_opencsv_gdp_partitions_view
AS
SELECT CAST(indicatorValue AS FLOAT) AS GDP_growth_value, CAST(year AS INTEGER), countryName 
FROM wdi_opencsv_text_partitions
WHERE countryName = 'Canada' AND indicatorName LIKE ('%GDP growth (annual \%)%')  AND year = '2015'

In [49]:
SELECT * FROM wdi_opencsv_gdp_partitions_view

The time taken to execute the 2015 GDP Canada Growth HQL has significantly decreased by using wdi_opencsv_text_partitions instead of wdi_opencsv_text. Since the compiler will face the partioned data set.

## Columnar File Optimization
Optimize HQL query using columnar file. 

In [52]:
DROP TABLE IF EXISTS wdi_csv_parquet 

In [53]:
-- Create the wdi_csv_parquet table
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/homaalmasieh/hive/wdi/wdi_csv_parquet'


In [54]:
-- Load data from wdi_opencsv_gs to wdi_csv_parquet
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT year, countryName, countryCode, indicatorName, indicatorCode, indicatorValue
FROM wdi_opencsv_gs

In [55]:
%sh
# Check the size of wdi_csv_parquet
hdfs dfs -du -s -h hdfs:///user/homaalmasieh/hive/wdi/wdi_csv_parquet

In [56]:
%sh
# Check the size of wdi_csv_parquet
hdfs dfs -du -s -h hdfs:///user/homaalmasieh/hive/wdi/wdi_opencsv_text

In [57]:
-- Compare runtime
SELECT count(countryName) FROM wdi_csv_parquet


In [58]:
SELECT count(countryName) FROM wdi_opencsv_text

In [59]:
-- Execute 2015 GDP Growth HQL against wdi_csv_parquet and wdi_opencsv_text tables
SELECT indicatorValue, year, countryName
FROM wdi_csv_parquet
WHERE countryName = 'Canada' AND indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

In [60]:
SELECT indicatorValue, year, countryName
FROM wdi_opencsv_text
WHERE countryName = 'Canada' AND indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

The dataset using wdi_csv_parquet is significiently faster than when using wdi_opencsv_text.

## Highest GDP Growth


In [63]:
-- Find the highest GDP growth (NY.GDP.MKTP.KD.ZG) year for each country
SELECT wdi.indicatorValue As GDP_growth_value, wdi.year, wdi.countryName
FROM (
     SELECT max(indicatorValue) AS max_ind, countryName
     FROM wdi_csv_parquet
     WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND indicatorValue !=0
     GROUP BY countryName
) maxValue
INNER JOIN wdi_csv_parquet AS wdi ON maxValue.max_ind = wdi.indicatorValue AND maxValue.countryName = wdi.countryName
ORDER BY GDP_growth_value DESC


In [64]:
%spark.sql
-- Use %spark.sql to switch to Spark interpreter

SELECT wdi.indicatorValue As GDP_growth_value, wdi.year, wdi.countryName
FROM (
     SELECT max(indicatorValue) AS max_ind, countryName
     FROM wdi_csv_parquet
     WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND indicatorValue !=0
     GROUP BY countryName
) maxValue
INNER JOIN wdi_csv_parquet AS wdi ON maxValue.max_ind = wdi.indicatorValue AND maxValue.countryName = wdi.countryName
ORDER BY GDP_growth_value DESC

### Compare the execution time with the Hive Tez query and spark.sql

The Hive interpreter took 1.5 minutes which is significantly slower than using spark interpreter.  

## Sort GDP by country and year 

In [67]:
-- Write a query that returns GDP Growth for all coutries. Sort by countryName and year.
SELECT countryName, year, indicatorCode, indicatorValue AS GDP_growth_value
FROM wdi_csv_parquet
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
DISTRIBUTE BY countryName
SORT BY countryName ASC, year ASC

