Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Add the capability for analysts to define their own tasks, or SQL
Browse files Browse the repository at this point in the history
scripts, which can be run, sequentially, against a target HP Vertica
database, using a constrained set of credentials and schema.

We've created two new tasks: RunVerticaSqlScriptTask and
RunVerticaSqlScriptsTask.

The former allows a user to run a single script against an HP Vertica
database, which will run in a configured schema.  The latter will read a
configuration file that specifies many scripts to be run, scheduling
them such that they run sequentially, one after the other, and only
continue running so long as the previous script did not encounter an
error.

This allows use to have a separate repository where analysts can commit
SQL scripts to run against the data warehouse to generate their own
summary tables, etc, without needing to write Luigi scripts and worry
about scheduling.
  • Loading branch information
Toby Lawrence committed Dec 15, 2016
1 parent cb8c6ad commit dddcefd
Show file tree
Hide file tree
Showing 18 changed files with 514 additions and 8 deletions.
2 changes: 1 addition & 1 deletion AUTHORS
Expand Up @@ -12,4 +12,4 @@ Dmitry Viskov <dmitry.viskov@webenterprise.ru>
Sanford Student <sstudent@edx.org>
Jillian Vogel <jill@opencraft.com>
Dennis Jen <djen@edx.org>

Toby Lawrence <tlawrence@edx.org>
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -47,7 +47,7 @@ test-requirements: requirements
test-local:
# TODO: when we have better coverage, modify this to actually fail when coverage is too low.
rm -rf .coverage
LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance'
LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance' -s

test: test-requirements develop test-local

Expand Down
4 changes: 4 additions & 0 deletions config/devstack.cfg
Expand Up @@ -87,3 +87,7 @@ number_of_shards = 5

[ccx]
enabled = false

[run-vertica-sql-script]
schema = testing
read_timeout = 5
4 changes: 4 additions & 0 deletions config/test.cfg
Expand Up @@ -134,3 +134,7 @@ api_root_url = http://example.com/api/v1/

[ccx]
enabled = false

[run-vertica-sql-script]
schema = testing
read_timeout = 5
3 changes: 2 additions & 1 deletion edx/analytics/tasks/load_internal_reporting_user_activity.py
Expand Up @@ -10,10 +10,11 @@
from edx.analytics.tasks.pathutil import PathSetTask
from edx.analytics.tasks.url import ExternalURL, url_path_join
from edx.analytics.tasks.user_activity import UserActivityTableTask
from edx.analytics.tasks.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin, CredentialFileVerticaTarget
from edx.analytics.tasks.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin
from edx.analytics.tasks.database_imports import ImportAuthUserTask
from edx.analytics.tasks.util.hive import HiveTableFromQueryTask, WarehouseMixin, HivePartition
from edx.analytics.tasks.util.weekly_interval import WeeklyIntervalMixin
from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget
from edx.analytics.tasks.user_activity import CourseActivityWeeklyTask

log = logging.getLogger(__name__)
Expand Down
3 changes: 2 additions & 1 deletion edx/analytics/tasks/load_warehouse.py
Expand Up @@ -12,8 +12,9 @@
from edx.analytics.tasks.load_internal_reporting_user_course import LoadUserCourseSummary
from edx.analytics.tasks.load_internal_reporting_user import LoadInternalReportingUserToWarehouse
from edx.analytics.tasks.course_catalog import DailyLoadSubjectsToVerticaTask
from edx.analytics.tasks.vertica_load import VerticaCopyTaskMixin, CredentialFileVerticaTarget
from edx.analytics.tasks.vertica_load import VerticaCopyTaskMixin

from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget
from edx.analytics.tasks.util.hive import WarehouseMixin
from edx.analytics.tasks.url import ExternalURL

Expand Down
149 changes: 149 additions & 0 deletions edx/analytics/tasks/run_vertica_sql_script.py
@@ -0,0 +1,149 @@
"""
Support for running a SQL script against an HP Vertica database.
"""
import datetime
import logging

import luigi
import luigi.configuration
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.util.vertica_target import VerticaTarget, CredentialFileVerticaTarget

log = logging.getLogger(__name__)

try:
import vertica_python
vertica_client_available = True # pylint: disable-msg=C0103
except ImportError:
log.warn('Unable to import Vertica client libraries')
# On hadoop slave nodes we don't have Vertica client libraries installed so it is pointless to ship this package to
# them, instead just fail noisily if we attempt to use these libraries.
vertica_client_available = False # pylint: disable-msg=C0103


class BaseVerticaSqlScriptTaskMixin(object):
"""
Parameters for running a SQL script against an HP Vertica database.
"""
date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
description='Default is today, UTC.',
)
schema = luigi.Parameter(
config_path={'section': 'run-vertica-sql-script', 'name': 'schema'},
description='Name of the schema to which to write.',
)
credentials = luigi.Parameter(
config_path={'section': 'run-vertica-sql-script', 'name': 'credentials'},
description='Path to the external access credentials file.',
)
read_timeout = luigi.IntParameter(
config_path={'section': 'run-vertica-sql-script', 'name': 'read_timeout'},
description='Timeout in seconds for reading from a Vertica database connection.'
)
marker_schema = luigi.Parameter(
default=None,
description='Name of the schema to which to write the marker table. marker_schema would '
'default to the schema value if the value here is None.'
)

def update_id(self):
"""
Unique string identifying this task run, based on the input parameters.
"""
return str(self)


class RunVerticaSqlScriptTaskMixin(BaseVerticaSqlScriptTaskMixin):
"""
Parameters required to run a single SQL script against an HP Vertica database.
"""
source_script = luigi.Parameter(
description='Path to the source script to execute.'
)
script_name = luigi.Parameter(
description='Unique identifier for the purposes of tracking whether or not this '
'script ran successfully i.e. the table created by this script, or the ticket related to it.'
)


class RunVerticaSqlScriptTask(RunVerticaSqlScriptTaskMixin, luigi.Task):
"""
A task for running a SQL script against an HP Vertica database.
"""
required_tasks = None
output_target = None
depends_on = None

def add_dependency(self, dependency):
"""
Adds a custom dependency/requirement for this task.
Note: this currently *sets* a single, custom dependency. You cannot add multiple dependencies to this task.
The last dependency to be added is the only one that will stick. It will, however, not be the only dependency,
as this task has a "base" set of dependencies.
"""
self.depends_on = dependency

def requires(self):
if self.required_tasks is None:
self.required_tasks = {
'credentials': ExternalURL(url=self.credentials),
'source_script': ExternalURL(url=self.source_script),
}

if self.depends_on is not None:
self.required_tasks['depends_on'] = self.depends_on

return self.required_tasks

def output(self):
"""
Returns a VerticaTarget representing the inserted dataset.
"""
if self.output_target is None:
self.output_target = CredentialFileVerticaTarget(
credentials_target=self.input()['credentials'],
table=self.script_name,
schema=self.schema,
update_id=self.update_id(),
read_timeout=self.read_timeout,
marker_schema=self.marker_schema,
)

return self.output_target

def run(self):
"""
Runs the given SQL script against the Vertica target.
"""
# Make sure we can connect to Vertica.
self.check_vertica_availability()
connection = self.output().connect()

try:
# Set up our connection to point to the specified schema so that scripts can have unqualified
# table references and not necessarily need to know or care about where they're running.
connection.cursor().execute('SET SEARCH_PATH = {schema};'.format(schema=self.schema))

with self.input()['source_script'].open('r') as script_file:
# Read in our script and execute it.
script_body = script_file.read()
connection.cursor().execute(script_body)

# If we're here, nothing blew up, so mark as complete.
self.output().touch(connection)

connection.commit()
log.debug("Committed transaction.")
except Exception as exc:
log.exception("Rolled back the transaction; exception raised: %s", str(exc))
connection.rollback()
raise
finally:
connection.close()

def check_vertica_availability(self):
"""Call to ensure fast failure if this machine doesn't have the Vertica client library available."""
if not vertica_client_available:
raise ImportError('Vertica client library not available')
97 changes: 97 additions & 0 deletions edx/analytics/tasks/run_vertica_sql_scripts.py
@@ -0,0 +1,97 @@
"""
Support for running multiple SQL scripts against an HP Vertica database in a deterministic fashion.
"""
import yaml
import datetime
import logging
from os import path

import luigi
import luigi.configuration
from edx.analytics.tasks.url import ExternalURL
from edx.analytics.tasks.run_vertica_sql_script import BaseVerticaSqlScriptTaskMixin, RunVerticaSqlScriptTask


log = logging.getLogger(__name__)


class RunVerticaSqlScriptsTaskMixin(BaseVerticaSqlScriptTaskMixin):
"""
Parameters for running multiple SQL scripts against an HP Vertica database in a deterministic fashion.
"""
script_configuration = luigi.Parameter(
description='Path to the configuration file that specifies which scripts to run.'
)
script_root = luigi.Parameter(
default='',
description='Root directory from which the script locations in the configuration '
'are referenced from.'
)


class RunVerticaSqlScriptsTask(RunVerticaSqlScriptsTaskMixin, luigi.WrapperTask):
"""
A wrapper task for running multiple SQL scripts against an HP Vertica database in a deterministic fashion.
We use a YAML file that defines a list of scripts to run. We run the scripts in the order they are defined.
By using RunVerticaSqlScriptTask, each script stores its own marker, thus allowing us to idempotently run this
task until all required tasks (aka our scripts) have successfully run for a given date.
"""
downstream_task = None

def requires(self):
if self.downstream_task is None:
self.downstream_task = self.get_downstream_task()

if self.downstream_task is not None:
yield self.downstream_task

def validate_script_entry(self, script):
# It has to be a dictionary.
if not isinstance(script, dict):
return False

# It needs to have a name and a script location.
for attr in ['name', 'location']:
if attr not in script:
return False

return True

def get_downstream_task(self):
# If no downstream task has been set, load our configuration and generate our tasks and dependency chain.
if self.downstream_task is None:
script_conf_target = ExternalURL(url=self.script_configuration).output()
with script_conf_target.open('r') as script_conf_file:
config = yaml.safe_load(script_conf_file)
if config is not None and isinstance(config, dict):
previous_task = None

scripts = config.get('scripts', [])

# Iterate over the list of scripts in the configuration file in reverse order. We also zip a list of integers,
# representing the zero-based index position of the given script in the overall list. We iterate in reverse
# in order to link each task together, using requires(), to ensure that tasks run sequentially, and in the intended
# order: from the top of the file, downwards.
for script in scripts:
if not self.validate_script_entry(script):
log.warn("encountered invalid script entry!")
continue

new_task = RunVerticaSqlScriptTask(
credentials=self.credentials, schema=self.schema, marker_schema=self.marker_schema,
date=self.date, read_timeout=self.read_timeout, source_script=path.join(self.script_root, script['location']),
script_name=script.get('name'))

# If we previously configured a task, set it as a dependency of this one, so it runs prior to.
if previous_task is not None:
new_task.add_dependency(previous_task)

# Mark this as the previously-created task.
previous_task = new_task

self.downstream_task = previous_task

# If a downstream task has been set, yield it, triggering Luigi to schedule our scripts.
if self.downstream_task is not None:
yield self.downstream_task
Empty file.
@@ -0,0 +1,9 @@
scripts:
- name: script one
location: script_one.sql
- name: script two
location: script_two.sql
- name: script three
location: script_three.sql
- name: script four
location: script_four.sql
@@ -0,0 +1 @@
asdasdaikubdnaiksudb
@@ -0,0 +1,4 @@
scripts:
- name: Simple Testing Script
script_id: my_lil_test_script
location: foobar.sql
@@ -0,0 +1,5 @@
scripts:
- name: script one
location: script_one.sql
- name: script two
location: script_two.sql

0 comments on commit dddcefd

Please sign in to comment.