-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
56 lines (42 loc) · 1.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
import connection
import sqlparse
import pandas as pd
if __name__ == '__main__':
# connection data source
conf = connection.config('marketplace_prod')
conn, engine = connection.get_conn(conf, 'DataSource')
cursor = conn.cursor()
# connection dwh
conf_dwh = connection.config('dwh')
conn_dwh, engine_dwh = connection.get_conn(conf, 'DataWarehouse')
cursor_dwh = conn_dwh.cursor()
#get query string
path_query = os.getcwd()+'/query/'
query = sqlparse.format(
open(path_query+'query.sql','r' ).read(), strip_comments=True
).strip()
dwh_design = sqlparse.format(
open(path_query+'dwh_design.sql','r' ).read(), strip_comments=True
).strip()
print(query)
print(dwh_design)
try:
#get data
print('[INFO] service etl is running..')
df = pd.read_sql(query, engine)
#create schema dwh
cursor_dwh.execute(dwh_design)
conn_dwh.commit()
# ingest data to dwh
df.to_sql(
'dim_orders',
engine_dwh,
schema='felix_dwh',
if_exists='append',
index=False
)
print('[INFO] service etl is succes..')
except Exception as e:
print('[INFO] service etl is failes')
print(str(e))