Skip to content

Commit

Permalink
Update podhelper.go
Browse files Browse the repository at this point in the history
  • Loading branch information
nnn-gif committed Oct 10, 2023
1 parent 0bc2f86 commit 1856aab
Showing 1 changed file with 89 additions and 58 deletions.
147 changes: 89 additions & 58 deletions cmd/oraclebuildertools/utils/podhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@ import (

var log = logrus.New()

// PodHelper is a utility struct for working with Kubernetes Pods.
type PodHelper struct {
k8sclient *kubernetes.Clientset
Image string
NameSpace string
k8sclient *kubernetes.Clientset
Image string
NameSpace string
Affinity string
SignerURL string
DiaRestURL string
DiaGraphqlURL string
PostgresHost string
}

func NewPodHelper(image, namespace string) *PodHelper {
// NewPodHelper creates a new instance of PodHelper.
func NewPodHelper(image, namespace, affinity, signerURL, diaRestURL, diaGraphqlURL, postgresHost string) *PodHelper {
config, err := rest.InClusterConfig()
if err != nil {
// try using kube config
Expand All @@ -51,58 +58,34 @@ func NewPodHelper(image, namespace string) *PodHelper {
if err != nil {
log.Fatal(err)
}
return &PodHelper{k8sclient: client, Image: image, NameSpace: namespace}
return &PodHelper{k8sclient: client, Image: image, NameSpace: namespace, Affinity: affinity, SignerURL: signerURL, DiaRestURL: diaRestURL, DiaGraphqlURL: diaGraphqlURL, PostgresHost: postgresHost}
}

func (kh *PodHelper) CreateOracleFeeder(ctx context.Context, feederID string, owner string, oracle string, chainID string, symbols, feedSelection, blockchainnode string, frequency, sleepSeconds, deviationPermille, mandatoryFrequency string) error {
// CreateOracleFeeder creates a new Oracle Feeder Pod in Kubernetes.
func (kh *PodHelper) CreateOracleFeeder(ctx context.Context, feederID, creator, feederAddress, oracle string, chainID string, symbols, feedSelection, blockchainnode string, frequency, sleepSeconds, deviationPermille, mandatoryFrequency string) error {

// -- oracle config
publickeyenv := corev1.EnvVar{Name: "ORACLE_PUBLICKEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: ".public", LocalObjectReference: corev1.LocalObjectReference{Name: feederID}}}}
deployedcontractenv := corev1.EnvVar{Name: "DEPLOYED_CONTRACT", Value: oracle}
chainidenv := corev1.EnvVar{Name: "ORACLE_CHAINID", Value: chainID}
signerservice := corev1.EnvVar{Name: "ORACLE_SIGNER", Value: "signer.dia-oracle-feeder:50052"}
sleepsecondenv := corev1.EnvVar{Name: "ORACLE_SLEEPSECONDS", Value: sleepSeconds}
deviationenv := corev1.EnvVar{Name: "ORACLE_DEVIATIONPERMILLE", Value: deviationPermille}
frequencyseconds := corev1.EnvVar{Name: "ORACLE_FREQUENCYSECONDS", Value: frequency}
oracletype := corev1.EnvVar{Name: "ORACLE_ORACLETYPE", Value: "3"}
oraclesymbols := corev1.EnvVar{Name: "ORACLE_SYMBOLS", Value: symbols}
oraclefeederid := corev1.EnvVar{Name: "ORACLE_FEEDERID", Value: feederID}
blockchainnodeenv := corev1.EnvVar{Name: "ORACLE_BLOCKCHAINNODE", Value: blockchainnode}
mandatoryfrequencyenv := corev1.EnvVar{Name: "ORACLE_MANDATORYFREQUENCY", Value: mandatoryFrequency}
feedSelectionenv := corev1.EnvVar{Name: "ORACLE_FEEDSELECTION", Value: feedSelection}

// -- oracle config ends here

// ---

postgreshost := corev1.EnvVar{Name: "POSTGRES_HOST", Value: "dia-postgresql.dia-db"}
postgresuser := corev1.EnvVar{Name: "POSTGRES_USER", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "user", LocalObjectReference: corev1.LocalObjectReference{Name: "user.graphqlserver"}}}}
postgrespassword := corev1.EnvVar{Name: "POSTGRES_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "password", LocalObjectReference: corev1.LocalObjectReference{Name: "user.graphqlserver"}}}}
postgresdb := corev1.EnvVar{Name: "POSTGRES_DB", Value: "postgres"}
updateconfigseconds := corev1.EnvVar{Name: "ORACLE_UPDATECONFIGSECONDS", Value: "120"}
useenv := corev1.EnvVar{Name: "USE_ENV", Value: "true"}
//---
envvars := kh.PodEnvironmentVariables(feederID, creator, oracle, chainID, symbols, feedSelection, blockchainnode, frequency, sleepSeconds, deviationPermille, mandatoryFrequency)

// Define an image pull request
imagepullrequest := corev1.LocalObjectReference{Name: "all-icr-io"}

// Create a Pod configuration
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: feederID,
Labels: map[string]string{
"oracle": oracle,
"chainID": chainID,
"owner": owner,
"oracle": oracle,
"chainID": chainID,
"owner": creator,
"feederAddress": feederAddress,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: feederID,
Image: kh.Image,
Env: []corev1.EnvVar{publickeyenv, deployedcontractenv, chainidenv,
sleepsecondenv, deviationenv, frequencyseconds, oracletype,
oraclesymbols, oraclefeederid, postgreshost, postgresuser, signerservice,
postgrespassword, postgresdb, updateconfigseconds, useenv, blockchainnodeenv, mandatoryfrequencyenv, feedSelectionenv},
Env: envvars,
},
},
ImagePullSecrets: []corev1.LocalObjectReference{imagepullrequest},
Expand All @@ -116,7 +99,7 @@ func (kh *PodHelper) CreateOracleFeeder(ctx context.Context, feederID string, ow
Key: "ibm-cloud.kubernetes.io/worker-pool-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{
"default",
kh.Affinity,
},
},
},
Expand All @@ -128,6 +111,8 @@ func (kh *PodHelper) CreateOracleFeeder(ctx context.Context, feederID string, ow
},
}

// Create the Pod

result, err := kh.k8sclient.CoreV1().Pods(kh.NameSpace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
log.Errorf("Failed to start pod %s: %v", feederID, err)
Expand All @@ -138,17 +123,14 @@ func (kh *PodHelper) CreateOracleFeeder(ctx context.Context, feederID string, ow

}

func (kh *PodHelper) UpdateOracleFeeder(ctx context.Context, feederID string, owner string, oracle string, chainID string, symbols, feedSelection, blockchainnode string, frequency, sleepSeconds, deviationPermille, mandatoryFrequency string) error {
fields := make(map[string]string)
fields["oracle"] = oracle
fields["chainID"] = chainID
fields["owner"] = owner
// PodEnvironmentVariables generates environment variables for the Oracle Feeder Pod.
func (kh *PodHelper) PodEnvironmentVariables(feederID string, owner string, oracle string, chainID string, symbols, feedSelection, blockchainnode string, frequency, sleepSeconds, deviationPermille, mandatoryFrequency string) (vars []corev1.EnvVar) {

// -- oracle config
// Oracle configuration
publickeyenv := corev1.EnvVar{Name: "ORACLE_PUBLICKEY", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: ".public", LocalObjectReference: corev1.LocalObjectReference{Name: feederID}}}}
deployedcontractenv := corev1.EnvVar{Name: "DEPLOYED_CONTRACT", Value: oracle}
chainidenv := corev1.EnvVar{Name: "ORACLE_CHAINID", Value: chainID}
signerservice := corev1.EnvVar{Name: "ORACLE_SIGNER", Value: "signer.dia-oracle-feeder:50052"}
signerservice := corev1.EnvVar{Name: "ORACLE_SIGNER", Value: kh.SignerURL}
sleepsecondenv := corev1.EnvVar{Name: "ORACLE_SLEEPSECONDS", Value: sleepSeconds}
deviationenv := corev1.EnvVar{Name: "ORACLE_DEVIATIONPERMILLE", Value: deviationPermille}
frequencyseconds := corev1.EnvVar{Name: "ORACLE_FREQUENCYSECONDS", Value: frequency}
Expand All @@ -159,36 +141,78 @@ func (kh *PodHelper) UpdateOracleFeeder(ctx context.Context, feederID string, ow
mandatoryfrequencyenv := corev1.EnvVar{Name: "ORACLE_MANDATORYFREQUENCY", Value: mandatoryFrequency}
feedSelectionenv := corev1.EnvVar{Name: "ORACLE_FEEDSELECTION", Value: feedSelection}

// -- oracle config ends here
diaRestAPIenv := corev1.EnvVar{Name: "DIA_REST_URL", Value: kh.DiaRestURL}

diaGraphqlenv := corev1.EnvVar{Name: "DIA_GRAPHQL_URL", Value: kh.DiaGraphqlURL}

// Append all corev1.EnvVar instances to vars
vars = append(vars, diaRestAPIenv, diaGraphqlenv, publickeyenv, deployedcontractenv, chainidenv, signerservice, sleepsecondenv, deviationenv, frequencyseconds, oracletype, oraclesymbols, oraclefeederid, blockchainnodeenv, mandatoryfrequencyenv, feedSelectionenv)

// ---
postgreshost := corev1.EnvVar{Name: "POSTGRES_HOST", Value: "dia-postgresql.dia-db"}
postgreshost := corev1.EnvVar{Name: "POSTGRES_HOST", Value: kh.PostgresHost}
postgresuser := corev1.EnvVar{Name: "POSTGRES_USER", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "user", LocalObjectReference: corev1.LocalObjectReference{Name: "user.graphqlserver"}}}}
postgrespassword := corev1.EnvVar{Name: "POSTGRES_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "password", LocalObjectReference: corev1.LocalObjectReference{Name: "user.graphqlserver"}}}}
postgresdb := corev1.EnvVar{Name: "POSTGRES_DB", Value: "postgres"}
updateconfigseconds := corev1.EnvVar{Name: "ORACLE_UPDATECONFIGSECONDS", Value: "120"}
useenv := corev1.EnvVar{Name: "USE_ENV", Value: "true"}

vars = append(vars, postgreshost, postgresuser, postgrespassword, postgresdb, updateconfigseconds, useenv)
//---
// -- oracle config ends here

return
}

// UpdateOracleFeeder updates an existing Oracle Feeder Pod in Kubernetes.
func (kh *PodHelper) UpdateOracleFeeder(ctx context.Context, feederID string, owner string, oracle string, chainID string, symbols, feedSelection, blockchainnode string, frequency, sleepSeconds, deviationPermille, mandatoryFrequency string) error {
fields := make(map[string]string)
fields["oracle"] = oracle
fields["chainID"] = chainID
fields["owner"] = owner

// Generate environment variables for the updated Pod
envvars := kh.PodEnvironmentVariables(feederID, owner, oracle, chainID, symbols, feedSelection, blockchainnode, frequency, sleepSeconds, deviationPermille, mandatoryFrequency)

// Define an image pull request
imagepullrequest := corev1.LocalObjectReference{Name: "all-icr-io"}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: feederID,
Labels: fields,
Name: feederID,
Labels: map[string]string{
"oracle": oracle,
"chainID": chainID,
"owner": owner,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: feederID,
Image: kh.Image,
Env: []corev1.EnvVar{publickeyenv, deployedcontractenv, chainidenv,
sleepsecondenv, deviationenv, frequencyseconds, oracletype,
oraclesymbols, oraclefeederid, postgreshost, postgresuser, signerservice,
postgrespassword, postgresdb, updateconfigseconds, useenv, blockchainnodeenv, mandatoryfrequencyenv, feedSelectionenv},
Env: envvars,
},
},
ImagePullSecrets: []corev1.LocalObjectReference{imagepullrequest},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "ibm-cloud.kubernetes.io/worker-pool-name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{
kh.Affinity,
},
},
},
},
},
},
},
},
},
}

Expand All @@ -201,6 +225,7 @@ func (kh *PodHelper) UpdateOracleFeeder(ctx context.Context, feederID string, ow

}

// DeleteOracleFeeder deletes an Oracle Feeder Pod in Kubernetes.
func (kh *PodHelper) DeleteOracleFeeder(ctx context.Context, feederID string) error {
_, err := kh.k8sclient.CoreV1().Pods(kh.NameSpace).Get(ctx, feederID, metav1.GetOptions{})
if err != nil {
Expand All @@ -225,6 +250,7 @@ func (kh *PodHelper) DeleteOracleFeeder(ctx context.Context, feederID string) er
return nil
}

// RestartOracleFeeder restarts an Oracle Feeder Pod in Kubernetes.
func (kh *PodHelper) RestartOracleFeeder(ctx context.Context, feederID string, oracleconfig dia.OracleConfig) (err error) {

_, err = kh.k8sclient.CoreV1().Pods(kh.NameSpace).Get(ctx, feederID, metav1.GetOptions{})
Expand All @@ -234,7 +260,7 @@ func (kh *PodHelper) RestartOracleFeeder(ctx context.Context, feederID string, o
} else {
return err
}
err = kh.CreateOracleFeeder(ctx, feederID, oracleconfig.Owner, oracleconfig.Address, oracleconfig.ChainID, strings.Join(oracleconfig.Symbols[:], ","), oracleconfig.FeederSelection, oracleconfig.BlockchainNode, oracleconfig.Frequency, oracleconfig.SleepSeconds, oracleconfig.DeviationPermille, oracleconfig.MandatoryFrequency)
err = kh.CreateOracleFeeder(ctx, feederID, oracleconfig.Owner, oracleconfig.FeederAddress, oracleconfig.Address, oracleconfig.ChainID, strings.Join(oracleconfig.Symbols[:], ","), oracleconfig.FeederSelection, oracleconfig.BlockchainNode, oracleconfig.Frequency, oracleconfig.SleepSeconds, oracleconfig.DeviationPermille, oracleconfig.MandatoryFrequency)
if err != nil {
log.Errorf("Pod %s start err\n", err)
return
Expand All @@ -245,21 +271,25 @@ func (kh *PodHelper) RestartOracleFeeder(ctx context.Context, feederID string, o
//if err != nil {
// return err
//}
kh.waitPodDeleted(ctx, oracleconfig.Address, func() {
deleteerr := kh.waitPodDeleted(ctx, oracleconfig.Address, func() {
time.Sleep(1000 * time.Millisecond)
err = kh.CreateOracleFeeder(ctx, feederID, oracleconfig.Owner, oracleconfig.Address, oracleconfig.ChainID, strings.Join(oracleconfig.Symbols[:], ","), oracleconfig.FeederSelection, oracleconfig.BlockchainNode, oracleconfig.Frequency, oracleconfig.SleepSeconds, oracleconfig.DeviationPermille, oracleconfig.MandatoryFrequency)
err = kh.CreateOracleFeeder(ctx, feederID, oracleconfig.Owner, oracleconfig.FeederAddress, oracleconfig.Address, oracleconfig.ChainID, strings.Join(oracleconfig.Symbols[:], ","), oracleconfig.FeederSelection, oracleconfig.BlockchainNode, oracleconfig.Frequency, oracleconfig.SleepSeconds, oracleconfig.DeviationPermille, oracleconfig.MandatoryFrequency)
if err != nil {
log.Errorf("Pod %s start err\n", err)
return
}
log.Infof("Pod %s started\n", feederID)
})
if deleteerr != nil {
log.Infof("Pod %s delete err\n", deleteerr.Error())
}
log.Infof("Pod %s deleted\n", feederID)
}

return err
}

// podWatcher creates a Pod watcher for a given Oracle address.
func (kh *PodHelper) podWatcher(ctx context.Context, oracleaddress string) (watch.Interface, error) {
labelSelector := fmt.Sprintf("oracle=%s", oracleaddress)
log.Infof("Creating watcher for POD with label: %s", labelSelector)
Expand All @@ -273,6 +303,7 @@ func (kh *PodHelper) podWatcher(ctx context.Context, oracleaddress string) (watc
return kh.k8sclient.CoreV1().Pods(kh.NameSpace).Watch(ctx, opts)
}

// waitPodDeleted waits for a Pod to be deleted.
func (kh *PodHelper) waitPodDeleted(ctx context.Context, resName string, run func()) error {
watcher, err := kh.podWatcher(ctx, resName)
if err != nil {
Expand Down

0 comments on commit 1856aab

Please sign in to comment.