Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HAWQ-1061. Fix data loss when file locations include directories, check policy and bucketnum in all mode. #911

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 23 additions & 7 deletions tools/bin/hawqregister
Expand Up @@ -370,14 +370,24 @@ class HawqRegister(object):
set_yml_dataa('AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'], partitionby, partitions_constraint,\
partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding)

def check_file_not_folder():
for fn in self.files:
hdfscmd = 'hdfs dfs -test -f %s' % fn
if local_ssh(hdfscmd, logger):
logger.info('%s is not a file in hdfs, please check the yaml configuration file.' % fn)
sys.exit(1)

if self.yml:
option_parser_yml(options.yml_config)
self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else ''
check_distribution_policy()
check_file_not_folder()
check_database_encoding()
if self.mode != 'force' and self.mode != 'repair':
if not create_table():
self.mode = 'second_exist'
check_bucket_number()
check_distribution_policy()
check_policy_consistency()
else:
self.file_format = 'Parquet'
check_hash_type() # Usage1 only support randomly distributed table
Expand All @@ -395,8 +405,6 @@ class HawqRegister(object):
if self.tabledir.strip('/') != self.filepath.strip('/'):
logger.error("In repair mode, file path from yaml file should be the same with table's path.")
sys.exit(1)
check_policy_consistency()
check_bucket_number()
existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath)
existed_info = {}
for k, fn in enumerate(existed_files):
Expand Down Expand Up @@ -553,8 +561,12 @@ class HawqRegister(object):
query = "set allow_system_table_mods='dml';"
query += "begin transaction;"
segno_lst = [f.split('/')[-1] for f in self.files_update]
for i, eof in enumerate(eofs):
query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i])
if self.file_format == 'Parquet':
for i, eof in enumerate(eofs):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
else:
for i, eof in enumerate(eofs):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
query += "end transaction;"
else: # update_and_insert
eofs = self.sizes
Expand All @@ -571,8 +583,12 @@ class HawqRegister(object):
query += ';'

segno_lst = [f.split('/')[-1] for f in self.files_update]
for i, eof in enumerate(self.sizes_update):
query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i])
if self.file_format == 'Parquet':
for i, eof in enumerate(self.sizes_update):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i])
else:
for i, eof in enumerate(self.sizes_update):
query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i])
query += "end transaction;"
return self.utility_accessor.update_catalog(query)

Expand Down