Skip to content

Commit

Permalink
Merge pull request #96 from Gemma-Analytics/staging
Browse files Browse the repository at this point in the history
v0.5.15
  • Loading branch information
soltanianalytics committed May 5, 2021
2 parents e212dd6 + 4da6510 commit de4ea8a
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 15 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ ENV AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
ENV AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth
ENV AIRFLOW__WEBSERVER__EXPOSE_CONFIG=False
ENV AIRFLOW__CORE__SECURE_MODE=True
ENV AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120
# Increase processor timeout to avoid timeout when processing a large dags.py
ENV AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=240
ENV AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=240

# Related to $AIRFLOW_HOME
ENV AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ EWAH currently supports the following operators:
- OracleSQL
- Pipedrive
- PostgreSQL / Redshift
- Recurly
- S3 (for CSV or JSON files stored in an S3 bucket, e.g. from Kinesis Firehose)
- Salesforce
- Shopify
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.14
0.5.15
24 changes: 24 additions & 0 deletions airflow/dags/dags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,30 @@ el_dags:
product_prices:
sql_select_statement: !text_from_file secret_sql_query.sql

EL_Recurly_inc:
dag_strategy: incremental
el_operator: recurly
target_schema_name: raw_recurly_incr
start_date: 2021-04-01 00:00:00+00:00
operator_config:
general_config:
source_conn_id: recurly
tables:
coupons: {}
items: {}

EL_Recurly_fr:
dag_strategy: atomic
el_operator: recurly
target_schema_name: raw_recurly_fr
start_date: 2021-04-01 00:00:00+00:00
operator_config:
general_config:
source_conn_id: recurly
tables:
coupons: {}
items: {}

EL_S3: !yml_from_file secret_dag_s3.yml
EL_S3_2: !yml_from_file secret_dag_s3_2.yml
EL_S3_3: !yml_from_file secret_dag_s3_3.yml
Expand Down
6 changes: 0 additions & 6 deletions ewah/ewah_utils/dbt_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,3 @@ def execute(self, context):
[cmd.append("dbt {0}".format(dc)) for dc in self.dbt_commands]
cmd.append("deactivate")
assert run_cmd(cmd, env, self.log.info) == 0

# if applicable: close SSH tunnel
if hasattr(self, "ssh_tunnel_forwarder"):
self.log.info("Stopping!")
self.ssh_tunnel_forwarder.stop()
del self.ssh_tunnel_forwarder
82 changes: 82 additions & 0 deletions ewah/hooks/recurly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from ewah.hooks.base import EWAHBaseHook
import recurly


class EWAHRecurlyHook(EWAHBaseHook):

_ATTR_RELABEL = {
"api_token": "password",
}

conn_name_attr = "recurly_conn_id"
default_conn_name = "recurly_default"
conn_type = "ewah_recurly"
hook_name = "EWAH Recurly Connection"

@staticmethod
def get_ui_field_behaviour():
return {
"hidden_fields": ["port", "schema", "extra", "host", "login"],
"relabeling": {"password": "Baisc Auth API Key"},
}

@property
def client(self):
if not hasattr(self, "_client"):
self._client = recurly.Client(self.conn.api_token)

return self._client

@staticmethod
def validate_resource(resource):
return hasattr(recurly.Client, "list_{0}".format(resource))

@classmethod
def recurly_to_dict(cls, recurly_object):
final_dict = {}
for key, value in vars(recurly_object).items():
if isinstance(value, recurly.Resource):
value = cls.recurly_to_dict(value)
if isinstance(value, list):
value = [
cls.recurly_to_dict(item)
if isinstance(item, recurly.Resource)
else item
for item in value
]
final_dict[key] = value
return final_dict

def get_data_in_batches(
self,
resource,
data_from=None,
data_until=None,
batch_size=10000,
):
assert isinstance(batch_size, int) and batch_size > 0
params = {
"limit": 200, # maximum limit per call (overwrite default of 20)
"sort": "updated_at",
"order": "asc", # see docs - don't use desc with updated_at sort!
}
if data_from:
params["begin_time"] = data_from.isoformat()
if data_until:
params["end_time"] = data_until.isoformat()

# Serve data in batches
data = []
i = 0
for item in getattr(self.client, "list_{0}".format(resource))(
params=params
).items():
i += 1
data.append(self.recurly_to_dict(item))
if i == batch_size:
yield data
data = []
i = 0

if data: # last batch
yield data
32 changes: 32 additions & 0 deletions ewah/operators/recurly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from ewah.hooks.recurly import EWAHRecurlyHook

from ewah.operators.base import EWAHBaseOperator
from ewah.constants import EWAHConstants as EC


class EWAHRecurlyOperator(EWAHBaseOperator):

_NAMES = ["recurly"]

_ACCEPTED_EXTRACT_STRATEGIES = {
EC.ES_FULL_REFRESH: True,
EC.ES_INCREMENTAL: True,
}

_CONN_TYPE = EWAHRecurlyHook.conn_type

def __init__(self, resource=None, *args, **kwargs):
kwargs["primary_key_column_name"] = "id"
resource = resource or kwargs.get("target_table_name")
super().__init__(*args, **kwargs)

assert EWAHRecurlyHook.validate_resource(resource)
self.resource = resource

def ewah_execute(self, context):
for batch in self.source_hook.get_data_in_batches(
resource=self.resource,
data_from=self.data_from,
data_until=self.data_until,
):
self.upload_data(batch)
11 changes: 4 additions & 7 deletions ewah/uploaders/google_sheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ def _create_or_update_table(
pk_columns=None, # must accept arg, but it must also always be []
):
# Google Sheets only works with drop & replace in one go
assert load_strategy == EC.LS_INSERT_REPLACE
assert upload_call_count == 1
assert (
load_strategy == EC.LS_INSERT_REPLACE
), "Google Sheets DWHs can only be drop_and_replace!"
assert upload_call_count == 1, "Chunking is not possible for Google Sheets DWH!"

if pk_columns:
raise Exception("Arg pk_columns invalidly supplied!")
Expand All @@ -86,11 +88,6 @@ def colnum_string(n):
string = chr(65 + remainder) + string
return string

if not load_strategy == EC.LS_INSERT_REPLACE:
raise Exception("Google Sheets DWHs can only be drop_and_replace!")
self._upload_call_count += 1
if not self._upload_call_count == 1:
raise Exception("Chunking is not possible for Google Sheets DWH!")
self.log.info("Replacing data in Google Sheets!")

if not data:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"pymysql",
"pytz",
"pyyaml",
"recurly",
"simple-salesforce",
"snowflake-connector-python>=2.3.8", # 2.3.8 vendored urrlib3 and requests
"sshtunnel>=0.2.2",
Expand Down

0 comments on commit de4ea8a

Please sign in to comment.