/
database_oracle.go
120 lines (101 loc) · 2.39 KB
/
database_oracle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package gxutil
import (
"bytes"
"io/ioutil"
"os"
"os/exec"
"strings"
"time"
"github.com/spf13/cast"
"github.com/xo/dburl"
)
// OracleConn is a Postgres connection
type OracleConn struct {
BaseConn
URL string
}
// Init initiates the object
func (conn *OracleConn) Init() error {
conn.BaseConn = BaseConn{
URL: conn.URL,
Type: "oracle",
}
conn.BaseConn.SetProp("allow_bulk_import", "false")
return conn.BaseConn.Init()
}
// BulkImportStream bulk import stream
func (conn *OracleConn) BulkImportStream(tableFName string, ds Datastream) (count uint64, err error) {
_, err = exec.LookPath("sqlldr")
if err != nil {
Log("sqlldr not found in path. Using cursor...")
return conn.BaseConn.InsertStream(tableFName, ds)
}
if !cast.ToBool(conn.BaseConn.GetProp("allow_bulk_import")) {
return conn.BaseConn.InsertStream(tableFName, ds)
}
return conn.SQLLoad(tableFName, ds)
}
// SQLLoad uses sqlldr to Bulk Import
// cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr
func (conn *OracleConn) SQLLoad(tableFName string, ds Datastream) (count uint64, err error) {
var stderr, stdout bytes.Buffer
url, err := dburl.Parse(conn.URL)
if err != nil {
Log("Error dburl.Parse(conn.URL)")
return
}
ctlPath := F(
"/tmp/oracle.sqlldr.%d.%s.ctl",
time.Now().Unix(),
RandString(alphaRunes, 3),
)
// write to ctlPath
ctlStr := R(
conn.BaseConn.GetTemplateValue("core.sqlldr"),
"table", tableFName,
"columns", strings.Join(ds.GetFields(), ",\n"),
)
err = ioutil.WriteFile(
ctlPath,
[]byte(ctlStr),
0755,
)
if err != nil {
Log("Error writing to "+ctlPath)
return
}
password, _ := url.User.Password()
hostPort := url.Host
sid := strings.ReplaceAll(url.Path, "/", "")
credHost := F(
"%s/%s@%s/%s", url.User.Username(),
password, hostPort, sid,
)
proc := exec.Command(
"sqlldr",
credHost,
"control="+ctlPath,
"data=/dev/stdin",
"log=/dev/stdout",
"bad=/dev/stderr",
)
proc.Stderr = &stderr
proc.Stdout = &stdout
proc.Stdin = ds.NewCsvReader(0)
// run and wait for finish
err = proc.Run()
// Delete ctrl file
os.Remove(ctlPath)
if err != nil {
cmdStr := strings.ReplaceAll(strings.Join(proc.Args, " "), password, "****")
println(stdout.String())
err = Error(
err,
F(
"Oracle Import Command -> %s\nOracle Import Error -> %s",
cmdStr, stderr.String(),
),
)
}
return ds.count, err
}