-
Notifications
You must be signed in to change notification settings - Fork 0
/
addressApi.py
73 lines (61 loc) · 2.36 KB
/
addressApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import base64
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
import requests
import json
from log import log
import os
dir_path = os.path.dirname(os.path.realpath(__file__))
priKeyFile=dir_path+"/myPrivateKey.pem"
apiUrl="http://localhost:8088/internal/"
pathPrefix=dir_path+"/files/"
def getKey(key_file):
with open(key_file) as f:
data = f.read()
key = RSA.importKey(data)
return key
def encryptData(pubKeyFile,msg):
public_key = getKey(pubKeyFile)
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
def decryptData(encrypt_msg):
private_key = getKey(priKeyFile)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
def getAddressPri(address):
res = requests.post(url=apiUrl+"queryAddress", data=json.dumps({'address': address}),
headers={'Content-Type': 'application/json'})
print('getAddressPri res is :',res.text)
log('getAddressPri res is :' + res.text)
resDic = eval(res.text)
if resDic["code"]!=200:
priKey = ''
print('getAddressPri priKey err,err msg is :', resDic["msg"])
log('getAddressPri priKey err,err msg is :' + resDic["msg"])
return priKey
priMsg=resDic["addressInfo"]
priKey=decryptData(priMsg)
print('getAddressPri priKey is :',priKey)
log('getAddressPri priKey is :'+priKey)
return priKey
def getDatas(jobId):
res = requests.post(url=apiUrl+"queryJobInputs", data=json.dumps({'jobID': jobId}),
headers={'Content-Type': 'application/json'})
print('jobId is ',jobId,',getDatas res is :',res.text)
log('jobId is '+str(jobId)+',getDatas res is :' + res.text)
resDic = json.loads(res.text)
if resDic["code"]!=200:
data = ''
print('getDatas data err,err msg is :', resDic["msg"])
log('getDatas data err,err msg is :' + resDic["msg"])
return data
dataDic=resDic["Inputs"][0]
featurePath=pathPrefix+dataDic["featurePath"]
targetPath = pathPrefix+dataDic["targetPath"]
if dataDic["testingPath"]=="":
testingPath = ""
else:
testingPath = pathPrefix+dataDic["testingPath"]
return featurePath,targetPath,testingPath