/
fivetran_demo_pipe.py
164 lines (160 loc) · 6.41 KB
/
fivetran_demo_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore, Back, Style
import time
#configuration file
r = '/config.json'
with open(r, "r") as i:
l = i.read()
y = json.loads(l)
api_key = y['API_KEY']
api_secret = y['API_SECRET']
a = HTTPBasicAuth(api_key, api_secret)
#Create a new group, destination, webhook, connectors, and execute a transformation.
def atlas(method, endpoint, payload):
base_url = 'https://api.fivetran.com/v1'
url = f'{base_url}/{endpoint}'
try:
if method == 'GET':
response = requests.get(url,auth=a)
elif method == 'POST':
response = requests.post(url, json=payload, auth=a)
elif method == 'PATCH':
response = requests.patch(url,json=payload, auth=a)
elif method == 'DELETE':
response = requests.delete(url, auth=a)
else:
raise ValueError('Invalid request method.')
response.raise_for_status() # Raise exception for 4xx or 5xx responses
return response.json()
except requests.exceptions.RequestException as e:
print(f'Request failed: {e}')
return None
#Group and destination params
method = 'POST'
endpoint = 'destinations/'
gendpoint = 'groups/'
gpayload = {
"name": "im_group"
}
#Submit group
gresp = atlas(method, gendpoint, gpayload)
if gresp is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + gendpoint + ' ' + str(gpayload))
print(Fore.GREEN + 'Response: ' + gresp['code'])
print(Fore.MAGENTA + str(gresp))
payload = {
"group_id": gresp['data']['id'],
"service": "big_query",
"region": "US",
"time_zone_offset": "-5",
"config" :
{
"project_id": "",
"data_set_location": "US"
}
}
#Submit destination
dresponse = atlas(method, endpoint, payload)
if dresponse is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + endpoint + ' ' + str(payload))
print(Fore.GREEN + 'Response: ' + dresponse['code'])
print(Fore.MAGENTA + str(dresponse))
#New Webhook from response data
wgid = gresp['data']['id']
wmethod = 'POST'
wendpoint = 'webhooks/group/' + wgid
wpayload = {
"url": "https.ngrok-free.app",
"events": [ "connection_successful",
"connection_failure",
"create_connector",
"pause_connector",
"resume_connector",
"edit_connector",
"delete_connector",
"force_update_connector",
"resync_connector",
"resync_table"
]
}
#Submit Webhook
print(Fore.CYAN + "Submitting Webhook")
wresponse = atlas(wmethod, wendpoint, wpayload)
#Create Connectors
p = y['T'] #source auth
new_schema = ["t_400", "t_401","t_402"] #connector names
smethod = 'POST'
sendpoint = 'connectors/'
for new_schema in new_schema:
spayload = {
"service": "sql_server_rds",
"group_id": wgid,
"trust_certificates": "true",
"run_setup_tests": "true",
"paused": "false",
"pause_after_trial": "true",
"config": { "schema_prefix": new_schema,
"host": "",
"port": 1433,
"database": "sqlserver",
"user": "fivetran",
"password": p
}}
#Submit Connectors
print(Fore.CYAN + "Submitting Connector")
cresponse = atlas(smethod, sendpoint, spayload)
#Review Connector Response
if cresponse is not None:
print(Fore.MAGENTA + "Connector: " + cresponse['data']['id'] + " successfully created in " + str(wgid))
#Pause for 30 seconds. Then, Pause the connector. Then, edit schema.
time.sleep(30)
#Pause the new connector
u_2 = 'https://api.fivetran.com/v1' + '/connectors/' + cresponse['data']['id']
pc = requests.patch(u_2,auth=a,json={"paused": True})
print(Fore.GREEN + "Connector Paused")
#Load the schema of the new connector
u_3 = 'https://api.fivetran.com/v1' + cresponse['data']['id'] + "/schemas/reload"
o = requests.post(u_3,auth=a)
print(Fore.GREEN + "Connector Schema Loaded")
#Configure the Schemas
#PATCH https://api.fivetran.com/v1/connectors/{connector_id}/schemas/{schema}
sgroup_id = wgid
ssmethod = 'PATCH'
ssendpoint = 'connectors/' + cresponse['data']['id'] + '/schemas/hr'
sspayload = {
"enabled": True,
"tables": {
"employees": {
"enabled": True
},
"events": {
"enabled": False
}
}
}
sresponse = atlas(ssmethod, ssendpoint, sspayload)
#Sync the connectors
if sresponse is not None:
print(Fore.MAGENTA + "Connector: " + cresponse['data']['id'] + " successfully configured in " + str(wgid))
#Access to the destination must be granted first.
u_5 = 'https://api.fivetran.com/v1' + cresponse['data']['id'] + "/sync"
j = {"force": True} #initiate the sync
s = requests.post(u_5,auth=a,json=j)
print(Fore.GREEN + "Connector Sync Started")
#Execute a transformation
transformation_id = ''
tmethod = 'POST'
tendpoint = 'dbt/transformations/' + transformation_id + '/run'
tpayload = ''
#Submit Transfromation
tresponse = atlas(tmethod, tendpoint, tpayload)
#Review
if tresponse is not None:
print(Fore.CYAN + 'Call: ' + tmethod + ' ' + tendpoint + ' ' + str(tpayload))
print(Fore.GREEN + 'Response: ' + tresponse['code'])
print(Fore.MAGENTA + str(tresponse))
#Script Complete
print(Fore.BLUE + 'Script Complete. Check resources')