fix#169 MapJoin failed, Configuration and input path are inconsistent #173
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The origin code will failed with running two tables join operation when they are also stored into elasticsearch. The problem is configuration mixed. Take the es.resource.read setting for example, The one table EsStorageHandler method use the job configuration to set the parameter es.resource.read='xxx', but another table also use EsStoragehandler method use job configuration to set parameter es.resource.read='xxx'. Because the job configuration is global variable, The later setting will overwritten the before one. I think @costin you fix the problem. But I think if you add the HiveValueWriter,HiveBytesConverter and HiveValueReader setting is much better. I fixed the issue with my idea and test through the complicate case.
my testing case as followings:
set hiveconf:postfix=for_es;
set hiveconf:postfix2=es;
set hiveconf:TESTROOT=/soak;
DROP TABLE part_${hiveconf:postfix};
DROP TABLE lineitem_${hiveconf:postfix};
DROP TABLE supplier_${hiveconf:postfix};
DROP TABLE orders_${hiveconf:postfix};
DROP TABLE partsupp_${hiveconf:postfix};
DROP TABLE nation_${hiveconf:postfix};
DROP TABLE part_${hiveconf:postfix2};
DROP TABLE lineitem_${hiveconf:postfix2};
DROP TABLE supplier_${hiveconf:postfix2};
DROP TABLE orders_${hiveconf:postfix2};
DROP TABLE partsupp_${hiveconf:postfix2};
DROP TABLE nation_${hiveconf:postfix2};
DROP TABLE q9_product_type_profit_${hiveconf:postfix2};
-- create the tables and load the data
create table part_${hiveconf:postfix} (P_PARTKEY BIGINT, P_NAME STRING, P_MFGR STRING, P_BRAND STRING, P_TYPE STRING, P_SIZE BIGINT, P_CONTAINER STRING, P_RETAILPRICE DOUBLE, P_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/part.tbl' OVERWRITE INTO TABLE part_${hiveconf:postfix};
Create table lineitem_${hiveconf:postfix} (L_ORDERKEY BIGINT, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBER BIGINT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE STRING, L_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/lineitem.tbl' OVERWRITE INTO TABLE lineitem_${hiveconf:postfix};
create table orders_${hiveconf:postfix} (O_ORDERKEY BIGINT, O_CUSTKEY BIGINT, O_ORDERSTATUS STRING, O_TOTALPRICE DOUBLE, O_ORDERDATE STRING, O_ORDERPRIORITY STRING, O_CLERK STRING, O_SHIPPRIORITY BIGINT, O_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/orders.tbl' OVERWRITE INTO TABLE orders_${hiveconf:postfix};
create table supplier_${hiveconf:postfix} (S_SUPPKEY BIGINT, S_NAME STRING, S_ADDRESS STRING, S_NATIONKEY BIGINT, S_PHONE STRING, S_ACCTBAL DOUBLE, S_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/supplier.tbl' OVERWRITE INTO TABLE supplier_${hiveconf:postfix};
create table partsupp_${hiveconf:postfix} (PS_PARTKEY BIGINT, PS_SUPPKEY BIGINT, PS_AVAILQTY BIGINT, PS_SUPPLYCOST DOUBLE, PS_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/partsupp.tbl' OVERWRITE INTO TABLE partsupp_${hiveconf:postfix};
create table nation_${hiveconf:postfix} (N_NATIONKEY BIGINT, N_NAME STRING, N_REGIONKEY BIGINT, N_COMMENT STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
LOAD DATA LOCAL INPATH '/root/nation.tbl' OVERWRITE INTO TABLE nation_${hiveconf:postfix};
-- create the tables and load the data
create external table part_${hiveconf:postfix2} (P_PARTKEY BIGINT, P_NAME STRING, P_MFGR STRING, P_BRAND STRING, P_TYPE STRING, P_SIZE BIGINT, P_CONTAINER STRING, P_RETAILPRICE DOUBLE, P_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/part','es.resource.read'='q9/part','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
Create external table lineitem_${hiveconf:postfix2} (L_ORDERKEY BIGINT, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBER BIGINT, L_QUANTITY DOUBLE, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, L_COMMITDATE STRING, L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE STRING, L_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/lineitem','es.resource.read'='q9/lineitem','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
create external table orders_${hiveconf:postfix2} (O_ORDERKEY BIGINT, O_CUSTKEY BIGINT, O_ORDERSTATUS STRING, O_TOTALPRICE DOUBLE, O_ORDERDATE STRING, O_ORDERPRIORITY STRING, O_CLERK STRING, O_SHIPPRIORITY BIGINT, O_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/orders','es.resource.read'='q9/orders','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
create external table supplier_${hiveconf:postfix2} (S_SUPPKEY BIGINT, S_NAME STRING, S_ADDRESS STRING, S_NATIONKEY BIGINT, S_PHONE STRING, S_ACCTBAL DOUBLE, S_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource'='q9/supplier','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
create external table partsupp_${hiveconf:postfix2} (PS_PARTKEY BIGINT, PS_SUPPKEY BIGINT, PS_AVAILQTY BIGINT, PS_SUPPLYCOST DOUBLE, PS_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/partsupp','es.resource.read'='q9/partsupp','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
create external table nation_${hiveconf:postfix2} (N_NATIONKEY BIGINT, N_NAME STRING, N_REGIONKEY BIGINT, N_COMMENT STRING) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/nation','es.resource.read'='q9/nation','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
insert overwrite table part_${hiveconf:postfix2} select * from part_${hiveconf:postfix};
insert overwrite table lineitem_${hiveconf:postfix2} select * from lineitem_${hiveconf:postfix};
insert overwrite table orders_${hiveconf:postfix2} select * from orders_${hiveconf:postfix};
insert overwrite table supplier_${hiveconf:postfix2} select * from supplier_${hiveconf:postfix};
insert overwrite table nation_${hiveconf:postfix2} select * from nation_${hiveconf:postfix};
insert overwrite table partsupp_${hiveconf:postfix2} select * from partsupp_${hiveconf:postfix};
-- create the result table
create external table q9_product_type_profit_${hiveconf:postfix2} (nation string, o_year string, sum_profit double) stored by 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource.write'='q9/product_type_profit','es.resource.read'='q9/product_type_profit','es.index.auto.create'='true','es.nodes' = 'localhost:9200');
set mapred.min.split.size=536870912;
set hive.exec.reducers.bytes.per.reducer=1024000000;
-- the query
insert overwrite table q9_product_type_profit_${hiveconf:postfix2}
select
nation, o_year, sum(amount) as sum_profit
from
(
select
n_name as nation, year(o_orderdate) as o_year,
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
from
orders_${hiveconf:postfix2} o join
(select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost
from part_${hiveconf:postfix2} p join
(select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey,
n_name, ps_supplycost
from partsupp_${hiveconf:postfix2} ps join
(select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey,
l_orderkey, n_name
from
(select s_suppkey, n_name
from nation_${hiveconf:postfix2} n join supplier_${hiveconf:postfix2} s on n.n_nationkey = s.s_nationkey
) s1 join lineitem_${hiveconf:postfix2} l on s1.s_suppkey = l.l_suppkey
) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
) l3 on o.o_orderkey = l3.l_orderkey
)profit
group by nation, o_year
order by nation, o_year desc;