Skip to content

Commit

Permalink
fix add partition and return value
Browse files Browse the repository at this point in the history
  • Loading branch information
andreax79 committed May 24, 2022
1 parent ddfc3b8 commit 7331576
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade setuptools wheel
python -m pip install pytest
python -m pip install pytest moto
python -m pip install -r requirements.txt
- name: Run tests
Expand Down
77 changes: 42 additions & 35 deletions gluettalax.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
import botocore.credentials
import botocore.session
from botocore.exceptions import BotoCoreError
import boto3
import time
import fnmatch
Expand All @@ -35,7 +36,7 @@
from urllib.parse import urlparse

__author__ = 'Andrea Bonomi <andrea.bonomi@gmail.com>'
__version__ = '1.1.0'
__version__ = '1.1.1'
__all__ = [
'CrawlerTimeout',
'GluettalaxException',
Expand Down Expand Up @@ -353,6 +354,20 @@ def print_job_runs(name=None, include_succeeded=True, lines=None, header=True):
pass


def get_partition_values(kargs, partition_keys):
"Check and convert command line arguments to partitions dict"
if len(kargs) != len(partition_keys):
raise InvalidOption(
'{} partitions required ({})'.format(
len(partition_keys), ' '.join(['--{}=XXX'.format(x['Name']) for x in partition_keys])
)
)
try:
return [kargs[x['Name']] for x in partition_keys]
except KeyError as ex:
raise InvalidOption('missing --{} argument'.format(ex.args[0]))


Partitions = namedtuple('Partitions', ['partition_keys', 'max_lengths', 'data'])


Expand All @@ -361,10 +376,10 @@ def list_partitions(db, table, header=True):
# Get table metadata
glue = get_glue()
try:
response = glue.get_table(DatabaseName=db, Name=table)
glue_table = glue.get_table(DatabaseName=db, Name=table)
except glue.exceptions.EntityNotFoundException:
raise TableNotFound('Table {} not found'.format(table))
partition_keys = [x['Name'] for x in response['Table']['PartitionKeys']]
partition_keys = [x['Name'] for x in glue_table['Table']['PartitionKeys']]
# Get partitions
data = []
lengths = [len(x) for x in partition_keys] # calculate the labels lengths
Expand All @@ -389,11 +404,11 @@ def add_partitions_by_location(db, table, location, kargs):
bucket_files = [x.key for x in bucket.objects.filter(Prefix=url.path[1:]).all()]
bucket_dirs = sorted(list(set([os.path.dirname(x) for x in bucket_files])))
# Parsing table info required to create partitions from table
response = glue.get_table(DatabaseName=db, Name=table)
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']
glue_table = glue.get_table(DatabaseName=db, Name=table)
input_format = glue_table['Table']['StorageDescriptor']['InputFormat']
output_format = glue_table['Table']['StorageDescriptor']['OutputFormat']
serde_info = glue_table['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = glue_table['Table']['PartitionKeys']
# Iterate over dirs
for path in bucket_dirs:
partition_url = 's3://{}/{}/'.format(url.netloc, path)
Expand Down Expand Up @@ -433,22 +448,15 @@ def add_partition(db, table, kargs):
if 'location' in kargs:
del kargs['location']
# Get glue table
response = glue.get_table(DatabaseName=db, Name=table)
glue_table = glue.get_table(DatabaseName=db, Name=table)
# Check partition keys
partition_keys = response['Table']['PartitionKeys']
if len(kargs) != len(partition_keys):
raise InvalidOption(
'{} partitions required ({})'.format(
len(partition_keys), ' '.join(['--{}=XXX'.format(x['Name']) for x in partition_keys])
)
)
partition_values = [kargs[x['Name']] for x in partition_keys]
partition_keys = glue_table['Table']['PartitionKeys']
partition_values = get_partition_values(kargs, partition_keys)
# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']
input_format = glue_table['Table']['StorageDescriptor']['InputFormat']
output_format = glue_table['Table']['StorageDescriptor']['OutputFormat']
table_location = glue_table['Table']['StorageDescriptor']['Location']
serde_info = glue_table['Table']['StorageDescriptor']['SerdeInfo']
if not location:
if not table_location.endswith('/'):
table_location = table_location + '/'
Expand Down Expand Up @@ -477,15 +485,12 @@ def add_partition(db, table, kargs):
def delete_partition(db, table, kargs):
"Deletes a Glue partition"
glue = get_glue()
response = glue.get_table(DatabaseName=db, Name=table)
partition_keys = response['Table']['PartitionKeys']
if len(kargs) != len(partition_keys):
raise InvalidOption(
'{} partitions required ({})'.format(
len(partition_keys), ' '.join(['--{}=XXX'.format(x['Name']) for x in partition_keys])
)
)
partition_values = [kargs[x['Name']] for x in partition_keys]
# Get glue table
glue_table = glue.get_table(DatabaseName=db, Name=table)
# Check partition keys
partition_keys = glue_table['Table']['PartitionKeys']
partition_values = get_partition_values(kargs, partition_keys)
# Delete partition
try:
return glue.delete_partition(DatabaseName=db, TableName=table, PartitionValues=partition_values)
except glue.exceptions.EntityNotFoundException:
Expand Down Expand Up @@ -831,21 +836,23 @@ def lookup_cmd(cmd):
raise GluettalaxCommandNotFound('Invalid command "{}"; use "help" for a list.'.format(cmd))


def main(argv=None):
if argv is None:
argv = sys.argv
def main(argv):
if len(argv) < 2:
cmd_help(argv[1:])
return 2
try:
f = lookup_cmd(argv[1])
return f(argv[1:])
f(argv[1:])
return 0
except GluettalaxWarning as ex:
print(ex)
return 0
except GluettalaxException as ex:
print(ex)
return 1
except BotoCoreError as ex:
print(ex)
return 1


def gluettalax(*argv):
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ mypy
sphinx
pytest
black
moto
twine<3.4

0 comments on commit 7331576

Please sign in to comment.