From 416e97fae1415a3bdd60720e647aba0810103c38 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 1 Sep 2022 20:50:51 -0500 Subject: [PATCH 1/8] fix typo in example --- prefect_airbyte/configuration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prefect_airbyte/configuration.py b/prefect_airbyte/configuration.py index 59b3e5a..453b160 100644 --- a/prefect_airbyte/configuration.py +++ b/prefect_airbyte/configuration.py @@ -39,8 +39,8 @@ async def export_configuration( @task def zip_and_write_somewhere( - airbyte_config: bytearray - somwhere: str = 'my_destination.gz',' + airbyte_configuration: bytearray + somewhere: str = 'my_destination.gz',' ): with gzip.open('my_destination.gz', 'wb') as f: f.write(airbyte_configuration) From 97b219750e734b731c58bb68cc3a41ed035b5870 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 1 Sep 2022 21:08:29 -0500 Subject: [PATCH 2/8] fix typo in readme and add detail --- README.md | 2 +- .../conftest.cpython-38-pytest-7.1.1.pyc | Bin 871 -> 6959 bytes 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f76d7f6..448e2c9 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Welcome! -`prefect-airbyte` is a collections of prebuilt Prefect tasks that can be used to quickly construct Prefect flows. +`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows trigger Airbyte syncs or export your connector configurations. ## Getting Started diff --git a/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc b/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc index 20e9b599f04beb6b47251ff5ad0e5012cbc5d6cf..b5052a225cde3f1eec74f08cfe3e34c99650c5d1 100644 GIT binary patch literal 6959 zcmds5OKcm*8QvF{4^grnekrjNIaX{lDNWkZGjY?nvfQd^=)|(oV$p(F?hL7wNiI9P zv@KORC6^q0D9~J5_0Xd(J@?Q{Z@u+cU=O{tK;d2kG=P94{r=e{xuT@l4T>JR#13cv z`Df;T|Nrd$(b1fOCwu9$+9xGN`5O(AkAcBEczs1v6s9n>saUG~*IK%!s7lMQQff3~ zTBe*yTWS1j%}gt6Wih8)Ic{!_@H{hiRhDAr$HqshHJX5>Sw_Of60j`GN!W=5Y=q?{ zY&-!QWn&UHK`?fLjeo3Q_sImt1e=r?Qwi7tfd7XVLo|%zcUXVH|va6EISB9`ltQ=!sWa_$7xb~E6qf$`A z%w6txytdD`XZf|-_449Uy|}b=-7e16=gY%-T?tLw3EYSLX%RT?FaG$`n)_Qc1wGVU7G@p6 z?SQkJL744!Sk%Q_-tpRPj)h)(jfJKk*g@A1^-Zs~r-q{|cPf?Dm3wP zPV5U)$g?l>&aTKR@fBY3_HnA?V9ZLuGUaNiNem{J-eHSvZgEBr*FGW zK6B&knf12!sNS?6@R?h@<%J^*w&KF(V6NL!vp-+IbN}wjYPE9nz18*iZ?3F<7%JPd z3+4H_TKSDy(O#~x;#_$dzP4a57RyWJ#f5r#VPSc3etXuLw^@1q4PKmIoV!+>TU?wg z+6!f;SYHMbU!Hg77z@*W0JmxR))?=&jJI5;N*>~erdQj9Wp7{J2!hU=rP8MF!TMn- zE&Hk`8YL#|dQdDcmLk~2&L}yufbu=0<6b+{YCyOv<6)j`xrY496ne{@N4Kcz_=R`YQ^$AnVyq z&qfr7xsHHKi@@c+m9K+bct9P_xKT)he7gloiRpF8vo+~=I~`Ae=yf>ldmi*;O-0e* z7O%T)&Q`j@_k=acc-`(cgJ^i$02Y7R{)pN61>UdW3q z{=~~@tf@XBk{g%o&pOcu-u6Vx4nmXSp8FWo$uPJEtt1G-Fi#Sd#=FLRYlN6sv7uA1 zfi(~$>o&QdKxdtVq~#`e`Omtp;4R(`e32)D24O??WbByj#f}CK$SC57-Va(;Qq_5^ z_J52fil7ulNwy_#1y$~o;8U`ACCOrM)lt0O!!bT-P%$#9 zxPt#faWV~{B6TR1QhzT zsHa)dm$5Hi2jqC>$=|`UhD4#`r=ZHAG?CiXpc}IyQkV}7GE%qsms%~Bgq4e&GqkcA;?uxcgP6a_4~HFfsK61w2E=S^fKD# z;wqrW8ypn~k-<%a#n|YKik0=dj>9?QOj0>8y**X#n9#&w?~;|l5oA8XVUgKsc~z9Z z31MLxH55aB7$QrQu-hjuRBV7A0D9V`ZuD>tfO=~B&{RKx|Adk)`u6n+_BhIU<^exM6a};uT0j9tmit;|`&?WE zG(E>|Zg_t1=2E#_F4-No^zhp8|83F)pR)AXJu-oi5RHk$p(12`1GO8dDIj38b> zLX$4GXM{So2#uEKJP;)L(C9#a1vAXiz2k9JPLM8Stsf;O3M)N@~$=5T*Z!o?4O{r{wQu>^+1DjH;! zHR>NFV+sqyGQJ>X90!(xDMWWgauELySQW`PC*^A-WSW$)uV z@NB;#d#1Fx3~ccxH8-gFHZ|Wtg9`gL^}k0A*%|-?*6BVs%f6o{w{h?e~I83 z={%KSoP;Y2mB_<}Lijl;0=f|=bnRk9n*KYMMwLN!Skt0{Ifqh6@1cHQ7~d#TriO%} zBgXZG(T}SCd8E{jh?Ju{=3s9r=&|P(w4h`lZc#(l5fu~fQEwGZA)OFDl~qwFlR-`D zg7V(5C7zdRaNCH%5Pl+^NqXQfsgyX|l)43aJ~NH?e9bv{K`B|q#W2U1TW z97gasG`^3Oj=WCM9(btVSCoapvlYcmDl@16k4ujXN(71}Ss7h&N0lDZ=tsWB=r7~w zsB}LVXCf;%hJ`9^dnme`jm)rVl8j#Jbz)MxP1#Me1E5C@u}xvFY&|`95Z!t&zSaD9#VcL};m3 z`bqg`j+g)TREhCU)7zp?6Qzjw5(?Hd7zhA5Mh!%KLL#zmx+v9fux^43sy&HNtp5=S zfW9E~G$K>peT)mUVW!q~oA@s1hxvCpTh;z(y4t~~W_(?aifG)~uL>?dBH?qetg8VE z(>+)YiwV7I3w+Cw-7qEVz%WH6{$rW8#V8I9)A-PXPxTyM?OZ32-=d=gCKI_dwkyDk zGTT+^(UnjPd|8HBS@p<*fGRveWr$4UGJOe(k%IhBT3nc-@8-Uc5+ueGVw8p@Wtq9r z^5`?t+hmV^2TewqR5BUah(0W*v{CxAxAb1O2Vi>zryeP)_Y7b0u5 zSc<}CMWYnHvZ>9%cgCe&Y%s?PV+}^sOQ+4+P5#S7LN?~R$Qr4^e_M@fzf{PcPF^}=VRBn&jj~2hGnzB>k Vwd2v8poqzojA;zxAcQ0d;V+V#-g5u| From 4d534317a7bbacdc247a81b6d5bb64f04d2297a9 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 1 Sep 2022 21:09:50 -0500 Subject: [PATCH 3/8] word --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 448e2c9..406cfe1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Welcome! -`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows trigger Airbyte syncs or export your connector configurations. +`prefect-airbyte` is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows to trigger Airbyte syncs or export your connector configurations. ## Getting Started From 7957d8402447c17da56cfbd4487ecf2956c2dc50 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 1 Sep 2022 21:10:45 -0500 Subject: [PATCH 4/8] delete *.pyc --- .../conftest.cpython-38-pytest-7.1.1.pyc | Bin 6959 -> 0 bytes .../test_messages.cpython-38-pytest-7.1.1.pyc | Bin 1928 -> 0 bytes 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc delete mode 100644 tests/__pycache__/test_messages.cpython-38-pytest-7.1.1.pyc diff --git a/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc b/tests/__pycache__/conftest.cpython-38-pytest-7.1.1.pyc deleted file mode 100644 index b5052a225cde3f1eec74f08cfe3e34c99650c5d1..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6959 zcmds5OKcm*8QvF{4^grnekrjNIaX{lDNWkZGjY?nvfQd^=)|(oV$p(F?hL7wNiI9P zv@KORC6^q0D9~J5_0Xd(J@?Q{Z@u+cU=O{tK;d2kG=P94{r=e{xuT@l4T>JR#13cv z`Df;T|Nrd$(b1fOCwu9$+9xGN`5O(AkAcBEczs1v6s9n>saUG~*IK%!s7lMQQff3~ zTBe*yTWS1j%}gt6Wih8)Ic{!_@H{hiRhDAr$HqshHJX5>Sw_Of60j`GN!W=5Y=q?{ zY&-!QWn&UHK`?fLjeo3Q_sImt1e=r?Qwi7tfd7XVLo|%zcUXVH|va6EISB9`ltQ=!sWa_$7xb~E6qf$`A z%w6txytdD`XZf|-_449Uy|}b=-7e16=gY%-T?tLw3EYSLX%RT?FaG$`n)_Qc1wGVU7G@p6 z?SQkJL744!Sk%Q_-tpRPj)h)(jfJKk*g@A1^-Zs~r-q{|cPf?Dm3wP zPV5U)$g?l>&aTKR@fBY3_HnA?V9ZLuGUaNiNem{J-eHSvZgEBr*FGW zK6B&knf12!sNS?6@R?h@<%J^*w&KF(V6NL!vp-+IbN}wjYPE9nz18*iZ?3F<7%JPd z3+4H_TKSDy(O#~x;#_$dzP4a57RyWJ#f5r#VPSc3etXuLw^@1q4PKmIoV!+>TU?wg z+6!f;SYHMbU!Hg77z@*W0JmxR))?=&jJI5;N*>~erdQj9Wp7{J2!hU=rP8MF!TMn- zE&Hk`8YL#|dQdDcmLk~2&L}yufbu=0<6b+{YCyOv<6)j`xrY496ne{@N4Kcz_=R`YQ^$AnVyq z&qfr7xsHHKi@@c+m9K+bct9P_xKT)he7gloiRpF8vo+~=I~`Ae=yf>ldmi*;O-0e* z7O%T)&Q`j@_k=acc-`(cgJ^i$02Y7R{)pN61>UdW3q z{=~~@tf@XBk{g%o&pOcu-u6Vx4nmXSp8FWo$uPJEtt1G-Fi#Sd#=FLRYlN6sv7uA1 zfi(~$>o&QdKxdtVq~#`e`Omtp;4R(`e32)D24O??WbByj#f}CK$SC57-Va(;Qq_5^ z_J52fil7ulNwy_#1y$~o;8U`ACCOrM)lt0O!!bT-P%$#9 zxPt#faWV~{B6TR1QhzT zsHa)dm$5Hi2jqC>$=|`UhD4#`r=ZHAG?CiXpc}IyQkV}7GE%qsms%~Bgq4e&GqkcA;?uxcgP6a_4~HFfsK61w2E=S^fKD# z;wqrW8ypn~k-<%a#n|YKik0=dj>9?QOj0>8y**X#n9#&w?~;|l5oA8XVUgKsc~z9Z z31MLxH55aB7$QrQu-hjuRBV7A0D9V`ZuD>tfO=~B&{RKx|Adk)`u6n+_BhIU<^exM6a};uT0j9tmit;|`&?WE zG(E>|Zg_t1=2E#_F4-No^zhp8|83F)pR)AXJu-oi5RHk$p(12`1GO8dDIj38b> zLX$4GXM{So2#uEKJP;)L(C9#a1vAXiz2k9JPLM8Stsf;O3M)N@~$=5T*Z!o?4O{r{wQu>^+1DjH;! zHR>NFV+sqyGQJ>X90!(xDMWWgauELySQW`PC*^A-WSW$)uV z@NB;#d#1Fx3~ccxH8-gFHZ|Wtg9`gL^}k0A*%|-?*6BVs%f6o{w{h?e~I83 z={%KSoP;Y2mB_<}Lijl;0=f|=bnRk9n*KYMMwLN!Skt0{Ifqh6@1cHQ7~d#TriO%} zBgXZG(T}SCd8E{jh?Ju{=3s9r=&|P(w4h`lZc#(l5fu~fQEwGZA)OFDl~qwFlR-`D zg7V(5C7zdRaNCH%5Pl+^NqXQfsgyX|l)43aJ~NH?e9bv{K`B|q#W2U1TW z97gasG`^3Oj=WCM9(btVSCoapvlYcmDl@16k4ujXN(71}Ss7h&N0lDZ=tsWB=r7~w zsB}LVXCf;%hJ`9^dnme`jm)rVl8j#Jbz)MxP1#Me1E5C@u}xvFY&|`95Z!t&zSaD9#VcL};m3 z`bqg`j+g)TREhCU)7zp?6Qzjw5(?Hd7zhA5Mh!%KLL#zmx+v9fux^43sy&HNtp5=S zfW9E~G$K>peT)mUVW!q~oA@s1hxvCpTh;z(y4t~~W_(?aifG)~uL>?dBH?qetg8VE z(>+)YiwV7I3w+Cw-7qEVz%WH6{$rW8#V8I9)A-PXPxTyM?OZ32-=d=gCKI_dwkyDk zGTT+^(UnjPd|8HBS@p<*fGRveWr$4UGJOe(k%IhBT3nc-@8-Uc5+ueGVw8p@Wtq9r z^5`?t+hmV^2TewqR5BUah(0ylHiIV4B_ogR7ZDSx4lxI?ZUF6y9^IZFFxOA?D9j1uaVuwEf?Ik7$4jB)IE4(P17l2pB_(b2e;)V+Gr z@ER~LvGRa;OUz~te3uz!6;@?6R)1v;Fld^fX|N@>%$l!=?nb-9ozvCVYixzBvNg6o zpxow5e3>`-$`yS>*aq8tfv&K(KD8^XEU+qUEU@{2Z5f*lhL4fE{TA+px-M4sNM!6h z%wb4R;_SjDYDMyt`Qb3g{e(*y47hq=R8bmcNt6!!3*H}Q+1YG)Y(Er1%CcnaJRI

pX(Hlmx1%E->3i3P%hY3&fw-~}0x9(m0T((YvIPM4GnQJMU z^Ph7?BBq>~R@st=d6cDIIT2^-AoGJJKvf-ZMKvOT&~6|MI*y*`6u zq>c;`g6F?K$F8yX5AoVNkU6;4;4dtQ*oU}Nz58uYRb;!ckL8h^P! z?S1daec85(6E^f&I;3fq#U_ju)r1kcPTbO+aOR=kN#94>fl1$|{~yR#U@UcQ=v1yd z^Hko72cUB|mD4|u;yg;>vDh75`t#YgKY}uldAnd4yiIMvFGQ4cRhg@lJ=0ZTvxz1U zssctL&oZcC0*9z_v7{epQ+I~k-qv*HPb;oHeW9f(ag!*|*+1Otb`Oq@7404T)bnb4 zz24sb;nRa|@7Sv!AMNcw@xM9xe*Xy=B!M_nwNW5~gy$j}b3;{g&4Tv&T_{EdJZC3? ze-|--tRHTviR++MR3{FulO{-q5X+SH-}IKeqcuHq-WB6AR7Cw9q1ClAc5oF&{{W%y B;fDYK From 4fc3689f0045d96f0b9244b23a22781bcfc65efb Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Fri, 2 Sep 2022 02:56:44 -0500 Subject: [PATCH 5/8] add httpx client timeout arg to `trigger_sync` and `export_configuration` --- prefect_airbyte/client.py | 11 ++++++----- prefect_airbyte/configuration.py | 4 +++- prefect_airbyte/connections.py | 24 ++++++++++++++---------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/prefect_airbyte/client.py b/prefect_airbyte/client.py index 7a7c702..f68e339 100644 --- a/prefect_airbyte/client.py +++ b/prefect_airbyte/client.py @@ -30,30 +30,31 @@ def __init__( self, logger: logging.Logger, airbyte_base_url: str = "http://localhost:8000/api/v1", + timeout: int = 5, ) -> None: """ `AirbyteClient` constructor Args: logger: for client use, e.g. `prefect.logging.loggers.get_logger` - airbyte_base_url: Full Airbyte API endpoint. + airbyte_base_url: Full Airbyte API endpoint + timeout: seconds for httpx client timeout Returns: AirbyteClient: an instance of the `AirbyteClient` class """ self.airbyte_base_url = airbyte_base_url self.logger = logger + self.timeout = timeout async def _establish_session(self) -> httpx.AsyncClient: """ AirbyteClient method to `check_health_status` and establish a `client` session - Args: - Returns: client: `httpx.AsyncClient` used to communicate with the Airbyte API """ - client = httpx.AsyncClient() + client = httpx.AsyncClient(timeout=self.timeout) if await self.check_health_status(client): return client else: @@ -88,7 +89,7 @@ async def create_client(self) -> httpx.AsyncClient: Convenience method for establishing a healthy `httpx` Airbyte client Args: - + timeout: `int` seconds for request timeout with this client Returns: httpx.AsyncClient: client for interacting with Airbyte instance """ diff --git a/prefect_airbyte/configuration.py b/prefect_airbyte/configuration.py index 453b160..3d820ed 100644 --- a/prefect_airbyte/configuration.py +++ b/prefect_airbyte/configuration.py @@ -10,6 +10,7 @@ async def export_configuration( airbyte_server_host: str = "localhost", airbyte_server_port: int = "8000", airbyte_api_version: str = "v1", + timeout: int = 5, ) -> bytearray: """ @@ -23,6 +24,7 @@ async def export_configuration( airbyte_api_version (str, optional): Version of Airbyte API to use to trigger connection sync, will overwrite the value provided at init if provided. + timeout (int): timeout in seconds on the httpx AirbyteClient Returns: bytearray: `bytearray` containing Airbyte configuration @@ -69,7 +71,7 @@ def example_export_configuration_flow(): f"{airbyte_server_port}/api/{airbyte_api_version}" ) - airbyte = AirbyteClient(logger, airbyte_base_url) + airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) logger.info("Initiating export of Airbyte configuration") airbyte_config = await airbyte.export_configuration() diff --git a/prefect_airbyte/connections.py b/prefect_airbyte/connections.py index 60d253f..07913cc 100644 --- a/prefect_airbyte/connections.py +++ b/prefect_airbyte/connections.py @@ -27,6 +27,7 @@ async def trigger_sync( connection_id: str = None, poll_interval_s: int = 15, status_updates: bool = False, + timeout: int = 5, ) -> dict: """ Task run method for triggering an Airbyte Connection. @@ -45,17 +46,17 @@ async def trigger_sync( Args: str airbyte_server_host : Hostname of Airbyte server where connection is - configured. Will overwrite the value provided at init if provided. + configured. str airbyte_server_port: Port that the Airbyte server is listening on. - Will overwrite the value provided at init if provided. + str airbyte_api_version: Version of Airbyte API to use to trigger connection - sync. Will overwrite the value provided at init if provided. - str connection_id: if provided, - will overwrite the value provided at init. - int poll_interval_s: this task polls the - Airbyte API for status, if provided this value will + sync. + str connection_id: the Airbyte connection ID + int poll_interval_s: how often to poll the + Airbyte API for sync status, if provided this will override the default polling time of 15 seconds. bool status_updates: whether to log status as the task polls jobs + str timeout: The request `timeout` for the `httpx.AsyncClient` Returns: dict: connection_id (str) and succeeded_at (timestamp str) @@ -105,7 +106,7 @@ def example_trigger_sync_flow(): f"{airbyte_server_port}/api/{airbyte_api_version}" ) - airbyte = AirbyteClient(logger, airbyte_base_url) + airbyte = AirbyteClient(logger, airbyte_base_url, timeout=timeout) logger.info( f"Getting Airbyte Connection {connection_id}, poll interval " @@ -147,9 +148,12 @@ def example_trigger_sync_flow(): "job_updated_at": job_updated_at, } elif connection_status == CONNECTION_STATUS_INACTIVE: - logger.error(f"Please enable the Connection {connection_id} in Airbyte Server.") + logger.error( + f"Connection: {connection_id} is inactive" + " - you'll need to enable it in your Airbyte instance" + ) raise err.AirbyteConnectionInactiveException( - f"Please enable the Connection {connection_id} in Airbyte Server." + f"Please enable the Connection {connection_id} in Airbyte instance." ) elif connection_status == CONNECTION_STATUS_DEPRECATED: logger.error(f"Connection {connection_id} is deprecated.") From 6c925cf4d60c48a0a7bce1bf48b78252e1046ea9 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Fri, 2 Sep 2022 03:16:24 -0500 Subject: [PATCH 6/8] update changelog --- CHANGELOG.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3a77fa..1f460c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,13 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased -### Added - -### Changed +- -### Deprecated +### Added -### Removed +- a `timeout` parameter to `trigger_sync` and `export_configuration` passed to `httpx.AsyncClient` ### Fixed From f008131902c0048273b3db8eaed9c5510ae09398 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Fri, 2 Sep 2022 12:13:53 -0500 Subject: [PATCH 7/8] update changelog --- CHANGELOG.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f460c4..1242dd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - +### Added + +### Changed + +### Deprecated + +### Removed + +### Fixed + +## 0.1.1 + +Released on September 2, 2022. ### Added From 58ec73bd06ada034d40e79bc10b1a37d4a0bac7e Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Fri, 2 Sep 2022 12:18:05 -0500 Subject: [PATCH 8/8] stray line --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1242dd9..aff7ef7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased -- ### Added ### Changed