Skip to content

Commit

Permalink
feat: Add INSERT_ONLY option to streams (#655)
Browse files Browse the repository at this point in the history
  • Loading branch information
momer committed Aug 23, 2021
1 parent 807c6ce commit c906e01
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 5 deletions.
50 changes: 50 additions & 0 deletions docs/data-sources/functions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "snowflake_functions Data Source - terraform-provider-snowflake"
subcategory: ""
description: |-
---

# snowflake_functions (Data Source)



## Example Usage

```terraform
data "snowflake_functions" "current" {
database = "MYDB"
schema = "MYSCHEMA"
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- **database** (String) The database from which to return the schemas from.
- **schema** (String) The schema from which to return the functions from.

### Optional

- **id** (String) The ID of this resource.

### Read-Only

- **functions** (List of Object) The functions in the schema (see [below for nested schema](#nestedatt--functions))

<a id="nestedatt--functions"></a>
### Nested Schema for `functions`

Read-Only:

- **argument_types** (List of String)
- **comment** (String)
- **database** (String)
- **name** (String)
- **return_type** (String)
- **schema** (String)


50 changes: 50 additions & 0 deletions docs/data-sources/procedures.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "snowflake_procedures Data Source - terraform-provider-snowflake"
subcategory: ""
description: |-
---

# snowflake_procedures (Data Source)



## Example Usage

```terraform
data "snowflake_procedures" "current" {
database = "MYDB"
schema = "MYSCHEMA"
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- **database** (String) The database from which to return the schemas from.
- **schema** (String) The schema from which to return the procedures from.

### Optional

- **id** (String) The ID of this resource.

### Read-Only

- **procedures** (List of Object) The procedures in the schema (see [below for nested schema](#nestedatt--procedures))

<a id="nestedatt--procedures"></a>
### Nested Schema for `procedures`

Read-Only:

- **argument_types** (List of String)
- **comment** (String)
- **database** (String)
- **name** (String)
- **return_type** (String)
- **schema** (String)


2 changes: 2 additions & 0 deletions docs/resources/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ resource snowflake_stream stream {
on_table = "table"
append_only = false
insert_only = false
owner = "role1"
}
Expand All @@ -41,6 +42,7 @@ resource snowflake_stream stream {
- **append_only** (Boolean) Type of the stream that will be created.
- **comment** (String) Specifies a comment for the stream.
- **id** (String) The ID of this resource.
- **insert_only** (Boolean) Create an insert only stream type.
- **on_table** (String) Name of the table the stream will monitor.
- **show_initial_rows** (Boolean) Specifies whether to return all existing rows in the source table as row inserts the first time the stream is consumed.

Expand Down
1 change: 1 addition & 0 deletions examples/resources/snowflake_stream/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ resource snowflake_stream stream {

on_table = "table"
append_only = false
insert_only = false

owner = "role1"
}
14 changes: 14 additions & 0 deletions pkg/resources/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ var streamSchema = map[string]*schema.Schema{
Default: false,
Description: "Type of the stream that will be created.",
},
"insert_only": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: false,
Description: "Create an insert only stream type.",
},
"show_initial_rows": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -169,6 +176,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {
name := d.Get("name").(string)
onTable := d.Get("on_table").(string)
appendOnly := d.Get("append_only").(bool)
insertOnly := d.Get("insert_only").(bool)
showInitialRows := d.Get("show_initial_rows").(bool)

builder := snowflake.Stream(name, database, schema)
Expand All @@ -180,6 +188,7 @@ func CreateStream(d *schema.ResourceData, meta interface{}) error {

builder.WithOnTable(resultOnTable.DatabaseName, resultOnTable.SchemaName, resultOnTable.OnTableName)
builder.WithAppendOnly(appendOnly)
builder.WithInsertOnly(insertOnly)
builder.WithShowInitialRows(showInitialRows)

// Set optionals
Expand Down Expand Up @@ -247,6 +256,11 @@ func ReadStream(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("insert_only", stream.InsertOnly)
if err != nil {
return err
}

err = d.Set("show_initial_rows", stream.ShowInitialRows)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/stream_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestAcc_Stream(t *testing.T) {
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "on_table", fmt.Sprintf("%s.%s.%s", accName, accName, "STREAM_ON_TABLE")),
resource.TestCheckResourceAttr("snowflake_stream.test_stream", "comment", "Terraform acceptance test"),
checkBool("snowflake_stream.test_stream", "append_only", false),
checkBool("snowflake_stream.test_stream", "insert_only", false),
checkBool("snowflake_stream.test_stream", "show_initial_rows", false),
),
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/resources/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func TestStreamCreate(t *testing.T) {
"comment": "great comment",
"on_table": "target_db.target_schema.target_table",
"append_only": true,
"insert_only": false,
"show_initial_rows": true,
}
d := stream(t, "database_name|schema_name|stream_name", in)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`CREATE STREAM "database_name"."schema_name"."stream_name" ON TABLE "target_db"."target_schema"."target_table" COMMENT = 'great comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`).WillReturnResult(sqlmock.NewResult(1, 1))
expectStreamRead(mock)
err := resources.CreateStream(d, db)
r.NoError(err)
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestStreamUpdate(t *testing.T) {
"comment": "new stream comment",
"on_table": "target_table",
"append_only": true,
"insert_only": false,
}

d := stream(t, "database_name|schema_name|stream_name", in)
Expand Down
9 changes: 9 additions & 0 deletions pkg/snowflake/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type StreamBuilder struct {
schema string
onTable string
appendOnly bool
insertOnly bool
showInitialRows bool
comment string
}
Expand Down Expand Up @@ -57,6 +58,11 @@ func (sb *StreamBuilder) WithAppendOnly(b bool) *StreamBuilder {
return sb
}

func (sb *StreamBuilder) WithInsertOnly(b bool) *StreamBuilder {
sb.insertOnly = b
return sb
}

func (sb *StreamBuilder) WithShowInitialRows(b bool) *StreamBuilder {
sb.showInitialRows = b
return sb
Expand Down Expand Up @@ -92,6 +98,8 @@ func (sb *StreamBuilder) Create() string {

q.WriteString(fmt.Sprintf(` APPEND_ONLY = %v`, sb.appendOnly))

q.WriteString(fmt.Sprintf(` INSERT_ONLY = %v`, sb.insertOnly))

q.WriteString(fmt.Sprintf(` SHOW_INITIAL_ROWS = %v`, sb.showInitialRows))

return q.String()
Expand Down Expand Up @@ -125,6 +133,7 @@ type descStreamRow struct {
Owner sql.NullString `db:"owner"`
Comment sql.NullString `db:"comment"`
AppendOnly bool `db:"append_only"`
InsertOnly bool `db:"insert_only"`
ShowInitialRows bool `db:"show_initial_rows"`
TableName sql.NullString `db:"table_name"`
Type sql.NullString `db:"type"`
Expand Down
11 changes: 7 additions & 4 deletions pkg/snowflake/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ func TestStreamCreate(t *testing.T) {
s := Stream("test_stream", "test_db", "test_schema")

s.WithOnTable("test_db", "test_schema", "test_target_table")
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = false`)

s.WithComment("Test Comment")
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = false`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = false`)

s.WithShowInitialRows(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false SHOW_INITIAL_ROWS = true`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = false INSERT_ONLY = false SHOW_INITIAL_ROWS = true`)

s.WithAppendOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true SHOW_INITIAL_ROWS = true`)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = false SHOW_INITIAL_ROWS = true`)

s.WithInsertOnly(true)
r.Equal(s.Create(), `CREATE STREAM "test_db"."test_schema"."test_stream" ON TABLE "test_db"."test_schema"."test_target_table" COMMENT = 'Test Comment' APPEND_ONLY = true INSERT_ONLY = true SHOW_INITIAL_ROWS = true`)
}

func TestStreamChangeComment(t *testing.T) {
Expand Down

0 comments on commit c906e01

Please sign in to comment.