/
migrate.go
148 lines (115 loc) · 3.49 KB
/
migrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"context"
"fmt"
"time"
"github.com/frain-dev/migrate-to-postgres/convoy082/pkg/log"
"github.com/frain-dev/migrate-to-postgres/convoy082/config"
datastore082 "github.com/frain-dev/migrate-to-postgres/convoy082/datastore"
cm "github.com/frain-dev/migrate-to-postgres/convoy082/datastore/mongo"
"github.com/jmoiron/sqlx"
"github.com/spf13/cobra"
)
func addMigrateCommand() *cobra.Command {
var mongoDsn string
var postgresDsn string
cmd := &cobra.Command{
Use: "migrate",
Short: "Convoy migrations",
Run: func(cmd *cobra.Command, args []string) {
err := migrate(mongoDsn, postgresDsn)
if err != nil {
log.Fatal(err)
}
fmt.Println("Successfully migrated mongodb data to postgres")
},
}
cmd.PersistentFlags().StringVar(&mongoDsn, "mongo-dsn", "", "Mongo database dsn")
cmd.PersistentFlags().StringVar(&postgresDsn, "postgres-dsn", "", "Postgres database dsn")
return cmd
}
var oldIDToNewID = map[string]string{}
func (p *PG) GetDB() *sqlx.DB {
return p.db
}
type PG struct {
db *sqlx.DB
}
const batchSize int64 = 5000
func migrate(mongoDsn, postgresDsn string) error {
vv := config.Configuration{
Database: config.DatabaseConfiguration{Dsn: mongoDsn},
}
mc, err := cm.New(vv)
if err != nil {
return err
}
store := datastore082.New(mc.Database())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
db, err := sqlx.ConnectContext(ctx, "postgres", postgresDsn)
if err != nil {
return fmt.Errorf("failed to open database: %v", err)
}
if err != nil {
return err
}
err = migrateUserCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate user collection: %v", err)
}
err = migrateOrganisationsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate organisations collection: %v", err)
}
err = migrateProjectsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate projects collection: %v", err)
}
err = migrateEndpointsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate endpoints collection: %v", err)
}
err = migrateOrgMemberCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate org member collection: %v", err)
}
err = migrateOrgInvitesCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate org invites collection: %v", err)
}
err = migratePortalLinksCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate portal links collection: %v", err)
}
err = migrateDevicesCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate devices collection: %v", err)
}
err = migrateConfigurationsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate configurations collection: %v", err)
}
err = migrateSourcesCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate source collection: %v", err)
}
err = migrateSubscriptionsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate subscriptions collection: %v", err)
}
err = migrateAPIKeysCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate api keys collection: %v", err)
}
err = migrateEventsCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate events collection: %v", err)
}
time.Sleep(time.Second * 10)
err = migrateEventDeliveriesCollection(store, db)
if err != nil {
return fmt.Errorf("failed to migrate event deliveries collection: %v", err)
}
return nil
}