Skip to content

Commit

Permalink
PHOENIX-5585: Add documentation for Phoenix-Spark Java example
Browse files Browse the repository at this point in the history
  • Loading branch information
ChinmaySKulkarni committed Nov 27, 2019
1 parent 7bb63dc commit 77e2ad3
Showing 1 changed file with 132 additions and 8 deletions.
140 changes: 132 additions & 8 deletions phoenix-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
```

### Load as a DataFrame using the DataSourceV2 API
Scala example:
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
Expand All @@ -51,6 +52,39 @@ df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
```
Java example:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkRead {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);

// Load data from TABLE1
Dataset<Row> df = sqlContext
.read()
.format("phoenix")
.option("table", "TABLE1")
.option(ZOOKEEPER_URL, "phoenix-server:2181")
.load();
df.createOrReplaceTempView("TABLE1");

SQLContext sqlCtx = new SQLContext(jsc);
df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df.show();
jsc.stop();
}
}
```

## Saving to Phoenix

Expand All @@ -69,7 +103,7 @@ Given two Phoenix tables with the following DDL:
CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
you can load from an input table and save to an output table as a DataFrame as follows:
you can load from an input table and save to an output table as a DataFrame as follows in Scala:

```scala
import org.apache.spark.SparkContext
Expand All @@ -86,17 +120,53 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
.options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.load

// Save to OUTPUT_TABLE
df
.write
df.write
.format("phoenix")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
.options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.save()
```
Java example:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SQLContext;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkWriteFromInputTable {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);

// Load INPUT_TABLE
Dataset<Row> df = sqlContext
.read()
.format("phoenix")
.option("table", "INPUT_TABLE")
.option(ZOOKEEPER_URL, "phoenix-server:2181")
.load();

// Save to OUTPUT_TABLE
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.option(ZOOKEEPER_URL, "phoenix-server:2181")
.save();
jsc.stop();
}
}
```

### Save from an external RDD with a schema to a Phoenix table

Expand All @@ -111,7 +181,7 @@ Given an output Phoenix table with the following DDL:
```sql
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
you can save a dataframe from an RDD as follows:
you can save a dataframe from an RDD as follows in Scala:

```scala
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -139,10 +209,64 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema)

df.write
.format("phoenix")
.options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress"))
.options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.mode(SaveMode.Overwrite)
.save()
```
Java example:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkWriteFromRDDWithSchema {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = sqlContext.sparkSession();
Dataset<Row> df;

// Generate the schema based on the fields
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(fields);

// Generate the rows with the same exact schema
List<Row> rows = new ArrayList<>();
for (int i = 1; i < 4; i++) {
rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
}

// Create a DataFrame from the rows and the specified schema
df = spark.createDataFrame(rows, schema);
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.option(ZOOKEEPER_URL, "phoenix-server:2181")
.save();

jsc.stop();
}
}
```

## Notes

Expand All @@ -160,7 +284,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "zkUrl" -> "hosta,hostb,hostc", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.load;
```
This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.
Expand Down

4 comments on commit 77e2ad3

@kiakaku
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @ChinmaySKulkarni
How can it get this phoenix-spark version
In maven repo i can get 5.0.0-HBase-2.0 only

@ChinmaySKulkarni
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiakaku connectors-1.0.0 is not released yet. It is dependent on phoenix 4.15 to be released which is scheduled to happen soon (within the next couple of weeks max). For now, you can locally build the phoenix project branch 4.x-HBase-1.4 and change the poms in the phoenix-connectors project to be 4.15.0-HBase-1.4 instead of 4.15.0-HBase-1.4-SNAPSHOT see this. Once 4.15.0 Phoenix is released, you will no longer need to do this kind of local change. Hope that helps.

@kiakaku
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChinmaySKulkarni Thank you so much

@kiakaku
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @ChinmaySKulkarni Do you have any plan for phoenix Hbase 2?

Please sign in to comment.