Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added logging with levels. #51

Merged
merged 7 commits into from Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 0 additions & 65 deletions copy.go

This file was deleted.

4 changes: 2 additions & 2 deletions copy_test.go
Expand Up @@ -47,11 +47,11 @@ var createTablePostgres = `
)
`

var config Config
var config utils.Config
var rdb *redis.Client

func TestMain(m *testing.M) {
config = Config{
config = utils.Config{
SQLType: "",
SQLUser: "root",
SQLPassword: "password",
Expand Down
13 changes: 11 additions & 2 deletions redisql/main.go
Expand Up @@ -4,7 +4,8 @@ import (
"flag"
"os"

redisql "github.com/DGKSK8LIFE/redisql"
utils "github.com/DGKSK8LIFE/redisql/utils"
logging "github.com/DGKSK8LIFE/redisql/utils/logging"
)

var dataType *string
Expand All @@ -18,11 +19,19 @@ func init() {
}

func main() {
config, err := redisql.NewConfig(*file)
config, err := utils.NewConfig(*file)
if err != nil {
panic(err)
}

if config.LogLevel != nil {
logging.InitLogging(*config.LogLevel)
}

if config.LogFilename != nil {
logging.SetLogFile(*config.LogFilename)
}

switch *dataType {
case "string":
if err = config.CopyToString(); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions utils/config.go
@@ -0,0 +1,40 @@
package utils

import (
"io/ioutil"
"gopkg.in/yaml.v2"
)

// Config is the configuration struct for redisql
type Config struct {
SQLType string `yaml:"sqltype"`
SQLUser string `yaml:"sqluser"`
SQLPassword string `yaml:"sqlpassword"`
SQLDatabase string `yaml:"sqldatabase"`
SQLHost string `yaml:"sqlhost"`
SQLPort string `yaml:"sqlport"`
SQLTable string `yaml:"sqltable"`
RedisAddr string `yaml:"redisaddr"`
RedisPass string `yaml:"redispass"`
LogLevel *uint32 `yaml:"log_level"`
LogFilename *string `yaml:"log_filename"`
}

// NewConfig initializes a new object of Config structure
func NewConfig(filePath string) (*Config, error) {
if err := ValidateFilePath(filePath); err != nil {
return nil, err
}

file, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, err
}

var c Config
if err = yaml.Unmarshal(file, &c); err != nil {
return nil, err
}

return &c, nil
}
122 changes: 122 additions & 0 deletions utils/copy.go
@@ -0,0 +1,122 @@
package utils

import (
"database/sql"
"fmt"

"github.com/DGKSK8LIFE/redisql/utils/logging"
)

// CopyToString reads a desired SQL table's rows and writes them to Redis strings
func (c Config) CopyToString() error {
logging.Log("Starting CopyToString", 1)
if err := copyTable(c, "string"); err != nil {
return err
}
return nil
}

// CopyToList reads a desired SQL table's rows and writes them to Redis lists
func (c Config) CopyToList() error {
logging.Log("Starting CopyToList", 1)
if err := copyTable(c, "list"); err != nil {
return err
}
return nil
}

// CopyToHash reads a desired SQL table's rows and writes them to Redis hashes
func (c Config) CopyToHash() error {
logging.Log("Starting CopyToHash", 1)
if err := copyTable(c, "hash"); err != nil {
return err
}
return nil
}

// copyTable is an internal function for Copy methods
func copyTable(cfg Config, redisType string) error {

db, err := OpenDB(cfg)
if err != nil {
return err
}

rdb := OpenRedis(cfg.RedisAddr, cfg.RedisPass)

defer db.Close()
defer rdb.Close()

rows, err := db.Query(fmt.Sprintf(`SELECT * FROM %s`, cfg.SQLTable))
if err != nil {
return err
}

defer rows.Close()

columns, err := rows.Columns()
if err != nil {
return err
}

values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}

index := 0
switch redisType {
case "string":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
for i, col := range values {
id := fmt.Sprintf("%s:%d:%s", cfg.SQLTable, index, columns[i])
err := rdb.Set(CTX, id, string(col), 0).Err()
if err != nil {
return err
}
}
index += 1
}
case "list":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
fields := []string{}
for _, col := range values {
fields = append(fields, string(col))
}
id := fmt.Sprintf("%s:%d", cfg.SQLTable, index)
err := rdb.RPush(CTX, id, fields).Err()
if err != nil {
return err
}
index += 1
}
case "hash":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
rowMap := make(map[string]string)
for i, col := range values {
rowMap[columns[i]] = string(col)
}
id := fmt.Sprintf("%s:%d", cfg.SQLTable, index)
err := rdb.HSet(CTX, id, rowMap).Err()
if err != nil {
return err
}
index += 1
}
if err = rows.Err(); err != nil {
return err
}
}
logging.Log("Copying done", 1)
return nil
}
94 changes: 9 additions & 85 deletions utils/db.go
Expand Up @@ -56,100 +56,24 @@ func OpenPostgres(user, password, database, host, port string) (*sql.DB, error)
return db, err
}

// Convert is an internal function for Copy methods
func Convert(redisType, sqlUser, sqlPassword, sqlDatabase, sqlHost, sqlPort, sqlTable, redisAddr, redisPass, sqlType string) error {
func OpenDB(cfg Config) (*sql.DB, error) {
var db *sql.DB
var err error

switch sqlType {
switch cfg.SQLType {
case "mysql":
db, err = OpenMySQL(sqlUser, sqlPassword, sqlDatabase, sqlHost, sqlPort)
db, err = OpenMySQL(cfg.SQLUser, cfg.SQLPassword, cfg.SQLDatabase, cfg.SQLHost, cfg.SQLPort)
if err != nil {
return err
return nil, err
}
case "postgres":
db, err = OpenPostgres(sqlUser, sqlPassword, sqlDatabase, sqlHost, sqlPort)
db, err = OpenPostgres(cfg.SQLUser, cfg.SQLPassword, cfg.SQLDatabase, cfg.SQLHost, cfg.SQLPort)
if err != nil {
return err
return nil, err
}
default:
return errors.New("unsupported sql database type")
return nil, errors.New("unsupported sql database type")
}

rdb := OpenRedis(redisAddr, redisPass)

defer db.Close()
defer rdb.Close()

rows, err := db.Query(fmt.Sprintf(`SELECT * FROM %s`, sqlTable))
if err != nil {
return err
}

defer rows.Close()

columns, err := rows.Columns()
if err != nil {
return err
}

values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}

index := 0
switch redisType {
case "string":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
for i, col := range values {
id := fmt.Sprintf("%s:%d:%s", sqlTable, index, columns[i])
err := rdb.Set(CTX, id, string(col), 0).Err()
if err != nil {
return err
}
}
index += 1
}
case "list":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
fields := []string{}
for _, col := range values {
fields = append(fields, string(col))
}
id := fmt.Sprintf("%s:%d", sqlTable, index)
err := rdb.RPush(CTX, id, fields).Err()
if err != nil {
return err
}
index += 1
}
case "hash":
for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return err
}
rowMap := make(map[string]string)
for i, col := range values {
rowMap[columns[i]] = string(col)
}
id := fmt.Sprintf("%s:%d", sqlTable, index)
err := rdb.HSet(CTX, id, rowMap).Err()
if err != nil {
return err
}
index += 1
}
if err = rows.Err(); err != nil {
return err
}
}
return nil
return db, nil
}