-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
250 lines (204 loc) · 8.91 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from __future__ import annotations
import logging
import os
import threading
from ftplib import FTP, FTP_TLS # nosec
from typing import IO, TYPE_CHECKING
from carbon.feed import ArticlesXmlFeed, PeopleXmlFeed
if TYPE_CHECKING:
from collections.abc import Callable
from socket import socket
from carbon.database import DatabaseEngine
logger = logging.getLogger(__name__)
class CarbonFtpsTls(FTP_TLS):
"""FTP_TLS subclass with support for SSL session reuse.
The stdlib version of FTP_TLS creates a new SSL session for data
transfer commands. This results in a cryptic OpenSSL error message
when a server requires SSL session reuse. The ntransfercmd here takes
advantage of the new session parameter to wrap_socket that was added
in 3.6.
Additionally, in the stdlib, storbinary destroys the SSL session after
transfering the file. Since the session has been shared with the
command connection, OpenSSL will once again generate a cryptic error
message for subsequent commands. The modified storbinary method here
removes the unwrap call. Calling quit on the ftp connection should
still cleanly shutdown the connection.
Attributes:
See ftplib.FTP_TLS for more details.
"""
def ntransfercmd(self, cmd: str, rest: str | int | None = None) -> tuple[socket, int]:
"""Initiate a transfer over the data connection."""
conn, size = FTP.ntransfercmd(self, cmd, rest)
if self._prot_p: # type: ignore[attr-defined]
conn = self.context.wrap_socket(
conn, server_hostname=self.host, session=self.sock.session # type: ignore[union-attr] # noqa: E501
)
return conn, size
def storbinary(
self,
cmd: str,
fp: IO, # type: ignore[override]
blocksize: int = 8192,
callback: Callable | None = None,
rest: str | None = None, # type: ignore[override]
) -> str:
"""Store a file in binary mode."""
self.voidcmd("TYPE I")
with self.transfercmd(cmd, rest) as conn:
while 1:
buf = fp.read(blocksize)
if not buf:
break
conn.sendall(buf)
if callback:
callback(buf)
return self.voidresp()
class FileWriter:
"""A writer that outputs normalized XML strings to a specified file.
Use this class to generate either a 'people' or 'articles' feed that is written
to a specified output file.
Attributes:
output_file: A file-like object (stream) into which normalized XML
strings are written.
"""
def __init__(self, engine: DatabaseEngine, output_file: IO):
self.output_file = output_file
self.engine = engine
def write(self, feed_type: str) -> None:
"""Write the specified feed type to the configured output."""
xml_feed: PeopleXmlFeed | ArticlesXmlFeed
if feed_type == "people":
xml_feed = PeopleXmlFeed(engine=self.engine, output_file=self.output_file)
xml_feed.run(nsmap=xml_feed.namespace_mapping)
elif feed_type == "articles":
xml_feed = ArticlesXmlFeed(engine=self.engine, output_file=self.output_file)
xml_feed.run()
logger.info(
"The feed has collected %s '%s' records", xml_feed.record_count, feed_type
)
class ConcurrentFtpFileWriter(FileWriter):
"""A read/write carbon.app.Writer for the Symplectic Elements FTP server.
This class is intended to provide a buffered read/write connecter.
Attributes:
output_file: A file-like object (stream) into which normalized XML
strings are written.
ftp_output_file: A file-like object (stream) that reads data from
PipeWriter().output_file and writes its contents to an XML file
on the Symplectic Elements FTP server.
"""
def __init__(self, engine: DatabaseEngine, input_file: IO, ftp_output_file: Callable):
super().__init__(engine, input_file)
self.ftp_output_file = ftp_output_file
def write(self, feed_type: str) -> None:
"""Concurrently read/write from the configured inputs and outputs.
This method will block until both the reader and writer are finished.
"""
thread = threading.Thread(target=self.ftp_output_file)
thread.start()
super().write(feed_type)
self.output_file.close()
thread.join()
class FtpFile:
"""A file writer for the Symplectic Elements FTP server.
The FtpFileWriter will read data from a provided feed and write the contents
from the feed to a file on the Symplectic Elements FTP server.
Attributes:
content_feed: A file-like object (stream) that contains the records
from the Data Warehouse.
user: The username for accessing the Symplectic FTP server.
password: The password for accessing the Symplectic FTP server.
path: The full file path to the XML file (including the file name) that is
uploaded to the Symplectic FTP server.
host: The hostname of the Symplectic FTP server.
port: The port of the Symplectic FTP server.
"""
def __init__(
self,
content_feed: IO,
user: str,
password: str,
path: str,
host: str = "localhost",
port: int = 21,
):
self.content_feed = content_feed
self.user = user
self.password = password
self.path = path
self.host = host
self.port = port
def __call__(self) -> None:
"""Transfer a file using FTP over TLS."""
ftps = CarbonFtpsTls(timeout=30)
ftps.connect(host=self.host, port=self.port)
ftps.login(user=self.user, passwd=self.password)
ftps.prot_p()
ftps.storbinary(cmd=f"STOR {self.path}", fp=self.content_feed)
ftps.quit()
class DatabaseToFilePipe:
"""A pipe feeding data from the Data Warehouse to a local file."""
def __init__(self, config: dict, engine: DatabaseEngine, output_file: IO):
self.config = config
self.engine = engine
self.output_file = output_file
def run(self) -> None:
FileWriter(engine=self.engine, output_file=self.output_file).write(
feed_type=self.config["FEED_TYPE"]
)
class DatabaseToFtpPipe:
"""A pipe feeding data from the Data Warehouse to the Symplectic Elements FTP server.
The feed consists of a pipe that connects 'read' and 'write' file-like objects
(streams) that allows for one-way passing of information to each other. The flow of
data is as follows:
1. The records from the Data Warehouse are transformed into normalized
XML strings and are concurrently written to the 'write' file stream
one record at a time.
2. The connected 'read' file stream concurrently transfers data from the
'write' file stream into an XML file on the Elements FTP server.
Attributes:
config: A dictionary of required environment variables for running the feed.
"""
def __init__(self, config: dict, engine: DatabaseEngine):
self.config = config
self.engine = engine
def run(self) -> None:
read_file, write_file = os.pipe()
with open(read_file, "rb") as buffered_reader, open(
write_file, "wb"
) as buffered_writer:
ftp_file = FtpFile(
content_feed=buffered_reader,
user=self.config["SYMPLECTIC_FTP_USER"],
password=self.config["SYMPLECTIC_FTP_PASS"],
path=self.config["SYMPLECTIC_FTP_PATH"],
host=self.config["SYMPLECTIC_FTP_HOST"],
port=int(self.config["SYMPLECTIC_FTP_PORT"]),
)
ConcurrentFtpFileWriter(
engine=self.engine, input_file=buffered_writer, ftp_output_file=ftp_file
).write(feed_type=self.config["FEED_TYPE"])
def run_connection_test(self) -> None:
"""Test connection to the Symplectic Elements FTP server.
Verify that the provided FTP credentials can be used
to successfully connect to the Symplectic Elements FTP server.
"""
logger.info("Testing connection to the Symplectic Elements FTP server")
try:
ftps = CarbonFtpsTls(timeout=30)
ftps.connect(
host=self.config["SYMPLECTIC_FTP_HOST"],
port=int(self.config["SYMPLECTIC_FTP_PORT"]),
)
ftps.login(
user=self.config["SYMPLECTIC_FTP_USER"],
passwd=self.config["SYMPLECTIC_FTP_PASS"],
)
except Exception as error:
error_message = (
f"Failed to connect to the Symplectic Elements FTP server: {error}"
)
logger.exception(error_message)
raise
else:
logger.info("Successfully connected to the Symplectic Elements FTP server")
ftps.quit()