Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolved: TableCatelog doesn't supports multiple columns from multiple Columnfamilies #45

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -66,7 +66,7 @@ public static void main(String[] args) {

JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

hbaseContext.bulkGet(TableName.valueOf(tableName), 2, rdd, new GetFunction(),
hbaseContext.bulkGet(tableName, 2, rdd, new GetFunction(),
new ResultFunction());
} finally {
jsc.stop();
Expand Down Expand Up @@ -112,4 +112,4 @@ public String call(Result result) throws Exception {
return b.toString();
}
}
}
}
Expand Up @@ -119,14 +119,14 @@ object AvroSource {
.save()

val df = withCatalog(catalog)
df.show
df.show()
df.printSchema()
df.registerTempTable("ExampleAvrotable")
val c = sqlContext.sql("select count(1) from ExampleAvrotable")
c.show
c.show()

val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
filtered.show
filtered.show()
val collected = filtered.collect()
if (collected(0).getSeq[String](1)(0) != "number1") {
throw new UserCustomizedSampleException("value invalid")
Expand All @@ -141,18 +141,18 @@ object AvroSource {
.format("org.apache.hadoop.hbase.spark")
.save()
val newDF = withCatalog(avroCatalogInsert)
newDF.show
newDF.show()
newDF.printSchema()
if(newDF.count() != 256) {
throw new UserCustomizedSampleException("value invalid")
}

df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show
.show()

df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show
.show()
}
}
Expand Up @@ -32,7 +32,7 @@ object UserCustomizedSampleException {
}

case class IntKeyRecord(
col0: Integer,
col0: Int,
col1: Boolean,
col2: Double,
col3: Float,
Expand Down Expand Up @@ -100,56 +100,56 @@ object DataType {
// test less than 0
val df = withCatalog(cat)
val s = df.filter($"col0" < 0)
s.show
s.show()
if(s.count() != 16){
throw new UserCustomizedSampleException("value invalid")
}

//test less or equal than -10. The number of results is 11
val num1 = df.filter($"col0" <= -10)
num1.show
num1.show()
val c1 = num1.count()
println(s"test result count should be 11: $c1")

//test less or equal than -9. The number of results is 12
val num2 = df.filter($"col0" <= -9)
num2.show
num2.show()
val c2 = num2.count()
println(s"test result count should be 12: $c2")

//test greater or equal than -9". The number of results is 21
val num3 = df.filter($"col0" >= -9)
num3.show
num3.show()
val c3 = num3.count()
println(s"test result count should be 21: $c3")

//test greater or equal than 0. The number of results is 16
val num4 = df.filter($"col0" >= 0)
num4.show
num4.show()
val c4 = num4.count()
println(s"test result count should be 16: $c4")

//test greater than 10. The number of results is 10
val num5 = df.filter($"col0" > 10)
num5.show
num5.show()
val c5 = num5.count()
println(s"test result count should be 10: $c5")

// test "and". The number of results is 11
val num6 = df.filter($"col0" > -10 && $"col0" <= 10)
num6.show
num6.show()
val c6 = num6.count()
println(s"test result count should be 11: $c6")

//test "or". The number of results is 21
val num7 = df.filter($"col0" <= -10 || $"col0" > 10)
num7.show
num7.show()
val c7 = num7.count()
println(s"test result count should be 21: $c7")

//test "all". The number of results is 32
val num8 = df.filter($"col0" >= -100)
num8.show
num8.show()
val c8 = num8.count()
println(s"test result count should be 32: $c8")

Expand Down
Expand Up @@ -89,13 +89,13 @@ object HBaseSource {
.save()

val df = withCatalog(cat)
df.show
df.show()
df.filter($"col0" <= "row005")
.select($"col0", $"col1").show
.select($"col0", $"col1").show()
df.filter($"col0" === "row005" || $"col0" <= "row005")
.select($"col0", $"col1").show
.select($"col0", $"col1").show()
df.filter($"col0" > "row250")
.select($"col0", $"col1").show
.select($"col0", $"col1").show()
df.registerTempTable("table1")
val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'")
c.show()
Expand Down
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.SparkConf
object HBaseBulkDeleteExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkDeletesExample {tableName} ")
println("HBaseBulkDeleteExample - {tableName} missing at argument")
return
}

Expand Down Expand Up @@ -60,4 +60,4 @@ object HBaseBulkDeleteExample {
sc.stop()
}
}
}
}
Expand Up @@ -26,13 +26,13 @@ import org.apache.hadoop.hbase.client.Result
import org.apache.spark.SparkConf

/**
* This is a simple example of getting records in HBase
* This is a simple example of getting records from HBase
* with the bulkGet function.
*/
object HBaseBulkGetExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
println("HBaseBulkGetExample - {tableName} missing at argument")
return
}

Expand All @@ -58,7 +58,7 @@ object HBaseBulkGetExample {
val hbaseContext = new HBaseContext(sc, conf)

val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
tableName,
2,
rdd,
record => {
Expand Down
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.SparkConf
object HBaseBulkPutExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
println("HBaseBulkPutExample - {tableName} {columnFamily} missing at argument")
return
}

Expand Down Expand Up @@ -72,4 +72,4 @@ object HBaseBulkPutExample {
sc.stop()
}
}
}
}
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.SparkConf
object HBaseBulkPutExampleFromFile {
def main(args: Array[String]) {
if (args.length < 3) {
println("HBaseBulkPutExampleFromFile {tableName} {columnFamily} {inputFile}")
println("HBaseBulkPutExampleFromFile - {tableName} {columnFamily} {inputFile} are missing at argument")
return
}

Expand Down
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.SparkConf
object HBaseBulkPutTimestampExample {
def main(args: Array[String]) {
if (args.length < 2) {
System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily}")
System.out.println("HBaseBulkPutTimestampExample - {tableName} {columnFamily} are missing at arguments")
return
}

Expand Down
Expand Up @@ -24,12 +24,12 @@ import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.SparkConf
/**
* This is a simple example of scanning records from HBase
* with the hbaseRDD function.
* with the hbaseRDD function in Distributed fashion.
*/
object HBaseDistributedScanExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("GenerateGraphs {tableName}")
println("HBaseDistributedScanExample - {tableName} is missing at argument")
return
}

Expand All @@ -52,10 +52,10 @@ object HBaseDistributedScanExample {

println("Length: " + getRdd.map(r => r._1.copyBytes()).collect().length);

//.collect().foreach(v => println(Bytes.toString(v._1.get())))
} finally {
}
finally {
sc.stop()
}
}

}
}
Expand Up @@ -33,7 +33,7 @@ object HBaseStreamingBulkPutExample {
def main(args: Array[String]) {
if (args.length < 4) {
println("HBaseStreamingBulkPutExample " +
"{host} {port} {tableName} {columnFamily}")
"{host} {port} {tableName} {columnFamily} are missing at argument")
return
}

Expand All @@ -42,7 +42,7 @@ object HBaseStreamingBulkPutExample {
val tableName = args(2)
val columnFamily = args(3)

val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
val sparkConf = new SparkConf().setAppName("HBaseStreamingBulkPutExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
Expand Down Expand Up @@ -71,4 +71,4 @@ object HBaseStreamingBulkPutExample {
sc.stop()
}
}
}
}
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkContext, SparkConf}
object HBaseBulkDeleteExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkDeletesExample {tableName} ")
println("HBaseBulkDeletesExample - {tableName} missing at argument")
return
}

Expand Down Expand Up @@ -61,4 +61,4 @@ object HBaseBulkDeleteExample {
sc.stop()
}
}
}
}
Expand Up @@ -24,13 +24,13 @@ import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.spark.{SparkContext, SparkConf}

/**
* This is a simple example of getting records in HBase
* This is a simple example of getting records from HBase
* with the bulkGet function.
*/
object HBaseBulkGetExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
println("HBaseBulkGetExample - {tableName} is missing in argument")
return
}

Expand Down
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, SparkContext}
object HBaseBulkPutExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
println("HBaseBulkPutExample - {tableName} {columnFamily} missing at arguments")
return
}

Expand Down Expand Up @@ -73,4 +73,4 @@ object HBaseBulkPutExample {
sc.stop()
}
}
}
}
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkContext, SparkConf}
object HBaseForeachPartitionExample {
def main(args: Array[String]) {
if (args.length < 2) {
println("HBaseBulkPutExample {tableName} {columnFamily}")
println("HBaseBulkPutExample - {tableName} {columnFamily} missing at arguments")
return
}

Expand Down
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkContext, SparkConf}
object HBaseMapPartitionExample {
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkGetExample {tableName}")
println("HBaseMapPartitionExample - {tableName} missing at arguments")
return
}

Expand Down
Expand Up @@ -150,7 +150,7 @@ case class HBaseTableCatalog(
def getRowKey: Seq[Field] = row.fields
def getPrimaryKey= row.keys(0)
def getColumnFamilies = {
sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey)
sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey).toSeq.distinct
}

def get(key: String) = params.get(key)
Expand Down
Expand Up @@ -812,9 +812,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col2":{"cf":"cf1", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col4":{"cf":"cf3", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
Expand Down