-
Notifications
You must be signed in to change notification settings - Fork 115
/
postgresql.py
140 lines (114 loc) · 5.42 KB
/
postgresql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""Postgresql source module is responsible to fetch documents from PostgreSQL."""
import ssl
from urllib.parse import quote
from sqlalchemy.ext.asyncio import create_async_engine
from connectors.sources.generic_database import GenericBaseDataSource, Queries
# Below schemas are system schemas and the tables of the systems schema's will not get indexed
SYSTEM_SCHEMA = ["pg_toast", "pg_catalog", "information_schema"]
DEFAULT_SSL_ENABLED = False
DEFAULT_SSL_CA = ""
class PostgreSQLQueries(Queries):
"""Class contains methods which return query"""
def ping(self):
"""Query to ping source"""
return "SELECT 1+1"
def all_tables(self, **kwargs):
"""Query to get all tables"""
return f"SELECT table_name FROM information_schema.tables WHERE table_catalog = '{kwargs['database']}' and table_schema = '{kwargs['schema']}'"
def table_primary_key(self, **kwargs):
"""Query to get the primary key"""
return f"SELECT c.column_name FROM information_schema.table_constraints tc JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema AND tc.table_name = c.table_name AND ccu.column_name = c.column_name WHERE constraint_type = 'PRIMARY KEY' and tc.table_name = '{kwargs['table']}' and tc.constraint_schema = '{kwargs['schema']}'"
def table_data(self, **kwargs):
"""Query to get the table data"""
return f'SELECT * FROM {kwargs["schema"]}."{kwargs["table"]}"'
def table_last_update_time(self, **kwargs):
"""Query to get the last update time of the table"""
return f'SELECT MAX(pg_xact_commit_timestamp(xmin)) FROM {kwargs["schema"]}."{kwargs["table"]}"'
def table_data_count(self, **kwargs):
"""Query to get the number of rows in the table"""
return f'SELECT COUNT(*) FROM {kwargs["schema"]}."{kwargs["table"]}"'
def all_schemas(self):
"""Query to get all schemas of database"""
return "SELECT schema_name FROM information_schema.schemata"
class PostgreSQLDataSource(GenericBaseDataSource):
"""PostgreSQL"""
name = "PostgreSQL"
service_type = "postgresql"
def __init__(self, configuration):
"""Setup connection to the PostgreSQL database-server configured by user
Args:
configuration (DataSourceConfiguration): Instance of DataSourceConfiguration class.
"""
super().__init__(configuration=configuration)
self.ssl_enabled = self.configuration["ssl_enabled"]
self.ssl_ca = self.configuration["ssl_ca"]
self.connection_string = f"postgresql+asyncpg://{self.user}:{quote(self.password)}@{self.host}:{self.port}/{self.database}"
self.queries = PostgreSQLQueries()
self.is_async = True
self.dialect = "Postgresql"
@classmethod
def get_default_configuration(cls):
"""Get the default configuration for database-server configured by user
Returns:
dictionary: Default configuration
"""
postgresql_configuration = super().get_default_configuration().copy()
postgresql_configuration.update(
{
"ssl_enabled": {
"display": "toggle",
"label": "Enable SSL verification",
"order": 9,
"type": "bool",
"value": DEFAULT_SSL_ENABLED,
},
"ssl_ca": {
"depends_on": [{"field": "ssl_enabled", "value": True}],
"label": "SSL certificate",
"order": 10,
"type": "str",
"value": DEFAULT_SSL_CA,
},
}
)
return postgresql_configuration
def _create_engine(self):
"""Create async engine for postgresql"""
self.engine = create_async_engine(
self.connection_string,
connect_args=self.get_connect_args() if self.ssl_enabled else {},
)
def get_pem_format(self):
"""Convert ca data into PEM format
Returns:
string: PEM format
"""
self.ssl_ca = self.ssl_ca.replace(" ", "\n")
pem_format = " ".join(self.ssl_ca.split("\n", 1))
pem_format = " ".join(pem_format.rsplit("\n", 1))
return pem_format
def get_connect_args(self):
"""Convert string to pem format and create a SSL context
Returns:
dictionary: Connection arguments
"""
pem_format = self.get_pem_format()
ctx = ssl.create_default_context()
ctx.load_verify_locations(cadata=pem_format)
connect_args = {"ssl": ctx}
return connect_args
async def get_docs(self, filtering=None):
"""Executes the logic to fetch databases, tables and rows in async manner.
Yields:
dictionary: Row dictionary containing meta-data of the row.
"""
schema_list = await anext(self.execute_query(query=self.queries.all_schemas()))
for [schema] in schema_list:
if schema not in SYSTEM_SCHEMA:
async for row in self.fetch_rows(schema=schema):
yield row, None