Skip to content

Commit

Permalink
Add logic to inherit schemaname and tablename from the source if not …
Browse files Browse the repository at this point in the history
…specified for the target to avoid failing with error `WARNING:root:'DBResource' object has no attribute 'copy_data'`.
  • Loading branch information
pvbouwel committed Jun 14, 2018
1 parent a31a14a commit 6c39220
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 7 deletions.
24 changes: 17 additions & 7 deletions src/UnloadCopyUtility/redshift_unload_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import logging
from global_config import GlobalConfigParametersReader, config_parameters
from util.s3_utils import S3Helper, S3Details
from util.resources import ResourceFactory, TableResource
from util.resources import ResourceFactory, TableResource, DBResource
from util.tasks import TaskManager, FailIfResourceDoesNotExistsTask, CreateIfTargetDoesNotExistTask, \
FailIfResourceClusterDoesNotExistsTask, UnloadDataToS3Task, CopyDataFromS3Task, CleanupS3StagingAreaTask, \
NoOperationTask
Expand Down Expand Up @@ -79,19 +79,29 @@ def __init__(self,

destination = ResourceFactory.get_target_resource_from_config_helper(self.config_helper, self.region)

if global_config_values['tableName'] and global_config_values['tableName'] != 'None':
source = TableResource(source.get_cluster(), source.get_schema(), global_config_values['tableName'])
destination = TableResource(destination.get_cluster(), destination.get_schema(), global_config_values['tableName'])

self.task_manager = TaskManager()
self.barrier_after_all_cluster_pre_tests = NoOperationTask()
self.task_manager.add_task(self.barrier_after_all_cluster_pre_tests)
self.barrier_after_all_resource_pre_tests = NoOperationTask()
self.task_manager.add_task(self.barrier_after_all_resource_pre_tests)

# TODO: Check whether both resources are of type table if that is not the case then perform other scenario's
# For example if both resources are of type schema then create target schema and migrate all tables
self.add_table_migration(source, destination, global_config_values)
if isinstance(source, TableResource):
if isinstance(destination, DBResource):
if not isinstance(destination, TableResource):
destination = ResourceFactory.get_table_resource_from_merging_2_resources(destination, source)
if global_config_values['tableName'] and global_config_values['tableName'] != 'None':
destination.set_table(global_config_values['tableName'])
self.add_table_migration(source, destination, global_config_values)
else:
logging.fatal('Destination should be a database resource')
raise NotImplementedError
pass
else:
# TODO: add additional scenario's
# For example if both resources are of type schema then create target schema and migrate all tables
logging.fatal('Source is not a Table, this type of unload-copy is currently not supported.')
raise NotImplementedError

self.task_manager.run()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env bash

source ${HOME}/variables.sh

SCENARIO=scenario010
SOURCE_SCHEMA="ssb"
SOURCE_TABLE="customer"
TARGET_SCHEMA="public"
TARGET_TABLE="${SOURCE_TABLE}"
PYTHON="python3"

DESCRIPTION="Perform Unload Copy with password encrypted using KMS. "
DESCRIPTION="${DESCRIPTION}Target table exists in target cluster in different schema. "
DESCRIPTION="${DESCRIPTION}Force re-creation. "
DESCRIPTION="${DESCRIPTION}Use ${PYTHON}."
DESCRIPTION="${DESCRIPTION}Do not specify table in copyTarget."

start_scenario "${DESCRIPTION}"

start_step "Create configuration JSON to copy ${SOURCE_SCHEMA}.${SOURCE_TABLE} of source cluster to ${TARGET_SCHEMA}.${TARGET_TABLE} on target cluster"

cat >${HOME}/${SCENARIO}.json <<EOF
{
"unloadSource": {
"clusterEndpoint": "${SourceClusterEndpointAddress}",
"clusterPort": ${SourceClusterEndpointPort},
"connectPwd": "${KMSEncryptedPassword}",
"connectUser": "${SourceClusterMasterUsername}",
"db": "${SourceClusterDBName}",
"schemaName": "${SOURCE_SCHEMA}",
"tableName": "${SOURCE_TABLE}"
},
"s3Staging": {
"aws_iam_role": "${S3CopyRole}",
"path": "s3://${CopyUnloadBucket}/${SCENARIO}/",
"deleteOnSuccess": "True",
"region": "eu-west-1",
"kmsGeneratedKey": "True"
},
"copyTarget": {
"clusterEndpoint": "${TargetClusterEndpointAddress}",
"clusterPort": ${TargetClusterEndpointPort},
"connectPwd": "${KMSEncryptedPassword}",
"connectUser": "${SourceClusterMasterUsername}",
"db": "${SourceClusterDBName}",
"schemaName": "${TARGET_SCHEMA}"
}
}
EOF

cat ${HOME}/${SCENARIO}.json >>${STDOUTPUT} 2>>${STDERROR}
r=$? && stop_step $r

start_step "Run Unload Copy Utility"
source ${VIRTUAL_ENV_PY36_DIR}/bin/activate >>${STDOUTPUT} 2>>${STDERROR}
cd ${HOME}/amazon-redshift-utils/src/UnloadCopyUtility && ${PYTHON} redshift_unload_copy.py --log-level debug --destination-table-auto-create --destination-table-force-drop-create ${HOME}/${SCENARIO}.json eu-west-1 >>${STDOUTPUT} 2>>${STDERROR}
psql -h ${TargetClusterEndpointAddress} -p ${TargetClusterEndpointPort} -U ${TargetClusterMasterUsername} ${TargetClusterDBName} -c "select * from stl_ddltext where text ilike 'drop table %${TARGET_SCHEMA}.${TARGET_TABLE}%'" 2>>${STDERROR} | grep -i "drop table" 2>>${STDERROR} | grep -i "${TARGET_SCHEMA}.${TARGET_TABLE}" >>${STDOUTPUT} 2>>${STDERROR}
RESULT="$?"
EXPECTED_COUNT=`psql -h ${SourceClusterEndpointAddress} -p ${SourceClusterEndpointPort} -U ${SourceClusterMasterUsername} ${SourceClusterDBName} -c "select 'count='||count(*) from ${SOURCE_SCHEMA}.${SOURCE_TABLE};" | grep "count=[0-9]*"|awk -F= '{ print $2}'` >>${STDOUTPUT} 2>>${STDERROR}
psql -h ${TargetClusterEndpointAddress} -p ${TargetClusterEndpointPort} -U ${TargetClusterMasterUsername} ${TargetClusterDBName} -c "select 'count='||count(*) from ${TARGET_SCHEMA}.${TARGET_TABLE};" | grep "count=${EXPECTED_COUNT}" >>${STDOUTPUT} 2>>${STDERROR}
r=$(( $? + ${RESULT} )) && stop_step $r
deactivate

stop_scenario
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env bash

source ${HOME}/variables.sh

SCENARIO=scenario011
SOURCE_SCHEMA="ssb"
SOURCE_TABLE="customer"
TARGET_SCHEMA="public"
TARGET_TABLE="${SOURCE_TABLE}"
PYTHON="python3"

DESCRIPTION="Perform Unload Copy with password encrypted using KMS. "
DESCRIPTION="${DESCRIPTION}Target table exists in target cluster in different schema. "
DESCRIPTION="${DESCRIPTION}Force re-creation. "
DESCRIPTION="${DESCRIPTION}Use ${PYTHON}."
DESCRIPTION="${DESCRIPTION}Do not specify schema and table in copyTarget."

start_scenario "${DESCRIPTION}"

start_step "Create configuration JSON to copy ${SOURCE_SCHEMA}.${SOURCE_TABLE} of source cluster to ${TARGET_SCHEMA}.${TARGET_TABLE} on target cluster"

cat >${HOME}/${SCENARIO}.json <<EOF
{
"unloadSource": {
"clusterEndpoint": "${SourceClusterEndpointAddress}",
"clusterPort": ${SourceClusterEndpointPort},
"connectPwd": "${KMSEncryptedPassword}",
"connectUser": "${SourceClusterMasterUsername}",
"db": "${SourceClusterDBName}",
"schemaName": "${SOURCE_SCHEMA}",
"tableName": "${SOURCE_TABLE}"
},
"s3Staging": {
"aws_iam_role": "${S3CopyRole}",
"path": "s3://${CopyUnloadBucket}/${SCENARIO}/",
"deleteOnSuccess": "True",
"region": "eu-west-1",
"kmsGeneratedKey": "True"
},
"copyTarget": {
"clusterEndpoint": "${TargetClusterEndpointAddress}",
"clusterPort": ${TargetClusterEndpointPort},
"connectPwd": "${KMSEncryptedPassword}",
"connectUser": "${SourceClusterMasterUsername}",
"db": "${SourceClusterDBName}"
}
}
EOF

cat ${HOME}/${SCENARIO}.json >>${STDOUTPUT} 2>>${STDERROR}
r=$? && stop_step $r

start_step "Run Unload Copy Utility"
source ${VIRTUAL_ENV_PY36_DIR}/bin/activate >>${STDOUTPUT} 2>>${STDERROR}
cd ${HOME}/amazon-redshift-utils/src/UnloadCopyUtility && ${PYTHON} redshift_unload_copy.py --log-level debug --destination-table-auto-create --destination-table-force-drop-create ${HOME}/${SCENARIO}.json eu-west-1 >>${STDOUTPUT} 2>>${STDERROR}
psql -h ${TargetClusterEndpointAddress} -p ${TargetClusterEndpointPort} -U ${TargetClusterMasterUsername} ${TargetClusterDBName} -c "select * from stl_ddltext where text ilike 'drop table %${TARGET_SCHEMA}.${TARGET_TABLE}%'" 2>>${STDERROR} | grep -i "drop table" 2>>${STDERROR} | grep -i "${TARGET_SCHEMA}.${TARGET_TABLE}" >>${STDOUTPUT} 2>>${STDERROR}
RESULT="$?"
EXPECTED_COUNT=`psql -h ${SourceClusterEndpointAddress} -p ${SourceClusterEndpointPort} -U ${SourceClusterMasterUsername} ${SourceClusterDBName} -c "select 'count='||count(*) from ${SOURCE_SCHEMA}.${SOURCE_TABLE};" | grep "count=[0-9]*"|awk -F= '{ print $2}'` >>${STDOUTPUT} 2>>${STDERROR}
psql -h ${TargetClusterEndpointAddress} -p ${TargetClusterEndpointPort} -U ${TargetClusterMasterUsername} ${TargetClusterDBName} -c "select 'count='||count(*) from ${TARGET_SCHEMA}.${TARGET_TABLE};" | grep "count=${EXPECTED_COUNT}" >>${STDOUTPUT} 2>>${STDERROR}
r=$(( $? + ${RESULT} )) && stop_step $r
deactivate

stop_scenario
17 changes: 17 additions & 0 deletions src/UnloadCopyUtility/util/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,23 @@ def get_source_resource_from_config_helper(config_helper, kms_region=None):
cluster_dict = config_helper.config['unloadSource']
return ResourceFactory.get_resource_from_dict(cluster_dict, kms_region)

@staticmethod
def get_table_resource_from_merging_2_resources(resource1, resource2):
cluster = resource1.get_cluster()
try:
schema = resource1.get_schema()
except AttributeError:
logging.info('Destination did not have a schema declared fetching from resource2.')
schema = resource2.get_schema()
logging.info('Using resource2 schema {s}'.format(s=schema))
try:
table = resource1.get_table()
except AttributeError:
logging.info('Destination did not have a table declared fetching from resource2.')
table = resource2.get_table()
logging.info('Using resource2 table {t}'.format(t=table))
return TableResource(cluster, schema, table)

@staticmethod
def get_target_resource_from_config_helper(config_helper, kms_region=None):
cluster_dict = config_helper.config['copyTarget']
Expand Down

0 comments on commit 6c39220

Please sign in to comment.