Switch branches/tags
Nothing to show
Clone or download
Benyuel Merge pull request #6 from groupon/dev
salesforce bulk api
Latest commit 98bd810 Mar 16, 2017
Failed to load latest commit information.
luigi_warehouse adding in Salesforce Bulk API source Mar 16, 2017
.gitignore initial commit Jan 26, 2017 initial commit Jan 26, 2017 initial commit Jan 26, 2017 update readme Mar 16, 2017
luigi.cfg-example initial commit Jan 26, 2017
requirements.txt initial commit Jan 26, 2017 initial commit Jan 26, 2017


A boilerplate implementation of Luigi at Groupon


  • Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more

  • Luigi-Warehouse adds

  • example workflows (i.e. replicating postgresql tables to redshift)

  • more data sources

  • variable data sources that do not rely on default luigi behavior/configs (i.e. VariableS3Client)

Install / Setup

  • Install python3 - This repo has been tested against python 3.4+

python install

Developers - if you're wanting to modify/use the workflows with your custom logic
  • Clone this repo
  • pip3 install -r requirements.txt if you want full functionality of all data sources
  • mkdir your-path-to/data
  • Put your credentials and settings in luigi.cfg. luigi.cfg-example shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
  • You're ready to replicate or move data around...

Getting Started

  • Some example workflows are included. Assumptions, Args & Comments are in the File
File Description Main Class(es) replicates all data from a google sheet to a redshift table (full copy/replace) Run replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) main replicates postgres tables to redshift (incrementally or full copy/replace) Run - PerformIncrementalImport PerformFullImport spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) Run - RunIncremental RunFromScratch replicates a salesforce report or SOQL to a redshift table(full copy/replace) SOQLtoRedshift ReporttoRedshift replicates given teradata SQL to redshift table (incrementally or full copy/replace) Run replicates all data from typeform responses to a redshift table (full copy/replace) Run extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) Run generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) ZendeskSpark
  • Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
  • Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/ Run --params here --workers 50

Data Sources

Dependent python packages required & API reference

Luigi - Spotify/Luigi
Postgres / Redshift - psycopg2
MySQL - pymysql
Adwords - googleads : API Reference
Googlesheets - gspread : API Reference
Slack - slackclient : API Reference
Five9 - suds : API Reference
Twilio - twilio : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
  • requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig 
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini 
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
Salesforce - simple-salesforce : API Reference
  • Props to cghall for the capability to query salesforce reports directly using the analytics API

  • Also available are SalesforceBulk and SalesforceBulkJob classes which use the Salesforce bulk API

Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
AWS - boto : boto3


  • We currently use slack or email for job status notifications which can easily be added

  • luigi-slack

from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'

if __name__ == '__main__':
  slack_channel = 'luigi-status-messages'
  slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
  with notify(slacker): 
import boto3

class Email:
  def __init__(self, region, aws_access_key_id, aws_secret_access_key):
    self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

  def send(self, from_, to_list, subject, body):
    return self.client.send_email(Source=from_,
                                  Destination={'ToAddresses': to_list},
                                                     {'Data': subject},
                                                             {'Data': body},
                                                              {'Data':' '}

Data Validation

  • Targeted towards ensuring successful replication of data to Redshift (see modules/
  • if the same number of columns in the csv are in the target table
  • if the columns have the same datatypes in the same order (VARCHAR is acceptable for any python datatype)
    • uses python_redshift_dtypes to convert
  • Checks for load errors for the target:schema:table provided since the load_start provided timestamp
  • Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better

  • pass in the taskobj with the following attributes

    • type = ['LoadError', 'Structure']
    • target = Redshift
    • table =
    • schema =
    • local_file = local csv file path
    • load_start = when you started to copy the records from S3
  • doing RunAnywayTarget(self).done() will not do validation

  • doing RunAnywayTarget(self).validation() will do the validation and if successful also say we're done the task

  • Takes the following args
  1. target_cols : a list of columns ordered for how you want your dataframe to be structured
  2. df : your dataframe you want restructured
  • example: I my dataframe to have columns in this order ['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
    one two  four  five  three
0  None         1     7      8
1  None         2     5      6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
    one two  three  four  five   six
0  None          8     1     7  None
1  None          6     2     5  None
  • This class will fix tables for you
  1. Check for copy errors
  2. Handle the copy errors
  • Add column(s) if needed
  • Change dtype(s) if needed
  1. Get orig table's schema
  2. Craft new table's schema with changes from errors
  3. Make the change and retry the copy and remove duplicate * records
  4. While there are copy errors
  • handle the errors

  • attempt to fix

  • retry copy

  • remove duplicate * records

  • To run use

StructureDynamic(target_schema=  ,# redshift schema your table is in
                 target_table=    # your table
                      add_cols=  ,# True or False for if you want columns added in attempting to fix
                      change_dtypes=  ,# True or False if you want column data types changed in attempting to fix
                      copy=           ,# copy command you attempted
                      load_start=      # when you started the copy command, '%Y-%m-%d %H:%M:$S
  • Example usage:
    • sql prep: create the table
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
  • test.csv: create the csv you want to attempt to copy
  • we attempt to copy normally but we get load errors because one of the columns isn't right
COPY public.test FROM 's3://luigi-godata/test.csv' 
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
  • we run ValidationDynamic
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv' 
          CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
  • our table is fixed and called public.test
  • our original table is kept as public.test_orig_backup
  • stdout lists the stl_load_errors
  • the changes made to the table's ddl is printed to stdout