Skip to content

Commit

Permalink
Add some MSSQL examples for using bcp
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoonstra committed Mar 20, 2018
1 parent 0571f4e commit 737f402
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 0 deletions.
39 changes: 39 additions & 0 deletions docker-compose-mssql.yml
@@ -0,0 +1,39 @@
version: '2'
services:
mssql:
image: microsoft/mssql-server-linux:latest
container_name: docker-mssql
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=Th1sS3cret!
- POSTGRES_DB=airflow
ports:
- 1433:1433
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- 5432:5432
volumes:
- ./examples/do-this-first/database_user.sql:/docker-entrypoint-initdb.d/database_user.sql
- ./examples/do-this-first/dwh_tables.sql:/docker-entrypoint-initdb.d/dwh_tables.sql
- ./examples/do-this-first/populate_tables.sql:/docker-entrypoint-initdb.d/populate_tables.sql
webserver:
image: test/airflow:1.0
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
volumes:
- ./examples/mssql-example/dags:/usr/local/airflow/dags
- ./examples/mssql-example/sql:/usr/local/airflow/sql
ports:
- 8080:8080
command: webserver

Empty file.
Empty file.
65 changes: 65 additions & 0 deletions examples/mssql-example/dags/acme/hooks/bcp_hook.py
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow.hooks.dbapi_hook import DbApiHook
import subprocess


class BcpHook(DbApiHook):
"""
Interact with Microsoft SQL Server through bcp
"""

conn_name_attr = 'mssql_conn_id'
default_conn_name = 'mssql_default'
supports_autocommit = True

def __init__(self, *args, **kwargs):
super(BcpHook, self).__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)

def get_conn(self):
"""
Returns a mssql connection details object
"""
return self.get_connection(self.mssql_conn_id)

def run_bcp(self, cmd):
self.log.info("Running command: {0}".format(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
outs, errs = proc.communicate()
self.log.info("Output:")
print(outs)
self.log.info("Stderr:")
print(errs)
if proc.returncode != 0:
raise Exception("Process failed: {0}".format(proc.returncode))

def add_conn_details(self, cmd, conn):
conn_params = ['-S', conn.host, '-U', conn.login, '-P', conn.password, '-d', conn.schema]
cmd.extend(conn_params)

def generate_format_file(self, table_name, format_file):
# Generate format file first:
conn = self.get_conn()
cmd = ['bcp', table_name, 'format', 'nul', '-c', '-f', format_file.name, '-t,']
self.add_conn_details(cmd, conn)
self.run_bcp(cmd)

def import_data(self, format_file, data_file, table_name):
# Generate format file first:
conn = self.get_conn()
cmd = ['bcp', table_name, 'in', data_file, '-f', format_file]
self.add_conn_details(cmd, conn)
self.run_bcp(cmd)
47 changes: 47 additions & 0 deletions examples/mssql-example/dags/acme/hooks/mssql_hook.py
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pyodbc

from airflow.hooks.dbapi_hook import DbApiHook


class MsSqlHook(DbApiHook):
"""
Interact with Microsoft SQL Server.
"""

conn_name_attr = 'mssql_conn_id'
default_conn_name = 'mssql_default'
supports_autocommit = True

def __init__(self, *args, **kwargs):
super(MsSqlHook, self).__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)
self.conn = None

def get_conn(self):
"""
Returns a mssql connection object
"""
if self.conn:
return self.conn

conn = self.get_connection(self.mssql_conn_id)
conn_str = "DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={0};PORT={1};DATABASE={2};UID={3};PWD={4}".format(conn.host, conn.port, conn.schema, conn.login, conn.password)
self.conn = pyodbc.connect(conn_str)
return self.conn

def set_autocommit(self, conn, autocommit):
conn.autocommit = autocommit
Empty file.
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from acme.hooks.bcp_hook import BcpHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import tempfile
import csv


class MsSqlImportOperator(BaseOperator):
"""
Imports synthethic data into a table on MSSQL
"""
ui_color = '#ededed'

@apply_defaults
def __init__(
self,
table_name,
generate_synth_data,
mssql_conn_id='mssql_default',
*args, **kwargs):
super(MsSqlImportOperator, self).__init__(*args, **kwargs)
self.mssql_conn_id = mssql_conn_id
self.table_name = table_name
self.generate_synth_data = generate_synth_data

def get_column_list(self, format_file_name):
col_list = []
with open(format_file_name, 'r') as format_file:
version = format_file.readline()
num_cols = int(format_file.readline())
for i in range(0, num_cols):
new_col = format_file.readline()
row = new_col.split(" ")
row = [x for x in row if x is not '']
col_list.append(row[6])
return col_list

def execute(self, context):
hook = BcpHook(mssql_conn_id=self.mssql_conn_id)
with tempfile.NamedTemporaryFile(prefix='format', delete=False) as format_file:
self.log.info('Generating format file')
hook.generate_format_file(self.table_name, format_file)
self.log.info('Retrieving column list')
col_list = self.get_column_list(format_file.name)
self.log.info('Generating synthetic data using column list: {0}'.format(col_list))
data = self.generate_synth_data(col_list)
with tempfile.NamedTemporaryFile(prefix='data', mode='wb', delete=False) as data_file:
self.log.info('Writing data to temp file')
csv_writer = csv.writer(data_file, delimiter=',')
csv_writer.writerows(data)
data_file.flush()
self.log.info('Importing data to SQL server')
hook.import_data(format_file.name, data_file.name, self.table_name)
51 changes: 51 additions & 0 deletions examples/mssql-example/dags/acme/operators/mssql_operator.py
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from acme.hooks.mssql_hook import MsSqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class MsSqlOperator(BaseOperator):
"""
Executes sql code in a specific Microsoft SQL database
:param mssql_conn_id: reference to a specific mssql database
:type mssql_conn_id: string
:param sql: the sql code to be executed
:type sql: string or string pointing to a template file with .sql extension
:param database: name of database which overwrite defined one in connection
:type database: string
"""

template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'

@apply_defaults
def __init__(
self, sql, mssql_conn_id='mssql_default', parameters=None,
autocommit=False, database=None, *args, **kwargs):
super(MsSqlOperator, self).__init__(*args, **kwargs)
self.mssql_conn_id = mssql_conn_id
self.sql = sql
self.parameters = parameters
self.autocommit = autocommit
self.database = database

def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
schema=self.database)
hook.run(self.sql, autocommit=self.autocommit,
parameters=self.parameters)
50 changes: 50 additions & 0 deletions examples/mssql-example/dags/create_mssql.py
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import airflow
from datetime import datetime, timedelta
from acme.operators.mssql_operator import MsSqlOperator
from airflow import models
from airflow.settings import Session
from airflow.models import Variable
import logging


args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(7),
'provide_context': True
}

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
'create_mssql',
schedule_interval="@once",
default_args=args,
template_searchpath=tmpl_search_path,
max_active_runs=1)

t1 = MsSqlOperator(task_id='create_schema',
sql='create_schema.sql',
mssql_conn_id='mssql',
dag=dag)

t2 = MsSqlOperator(task_id='create_table',
sql='create_table.sql',
mssql_conn_id='mssql',
dag=dag)

t1 >> t2

0 comments on commit 737f402

Please sign in to comment.