Skip to content

Commit

Permalink
Support arbitrary partition schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepdeprecation committed Oct 29, 2019
1 parent f3f27d4 commit 731d752
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 42 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,22 @@ Ideally these will be removed in the future to enable use with more diverse sets

1. Your partition keys are `[year, month, day, hour]`.

1. Your partitions are stored in one of these two path schemas, assuming your table is located at `s3://bucket/table/`:
1. Your partitions are stored in one of two ways (examples assume your table's location is `s3://bucket/table/`):

1. Partitions are stored in `key=value` form or pathed similarly (examples below):

```
s3://bucket/table/YYYY/MM/DD/HH/
s3://bucket/table/year=YYYY/month=MM/day=DD/hour=HH/
```

2. You have a single partition key, which is your path with slashes changed into dashes (examples below):

```
s3://bucket/table/YYYY/MM/DD/HH/ => partition value of YYYY-MM-DD-HH
```


```
s3://bucket/table/YYYY/MM/DD/HH/
s3://bucket/table/year=YYYY/month=MM/day=DD/hour=HH/
```

## IAM Permissions

Expand Down
130 changes: 93 additions & 37 deletions glutil/partitioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ def __eq__(self, other):
return self.values == other.values and self.location == other.location

def _cmp(self, other):
if self.values == other.values and self.location == other.location:
return 0

if self.values > other.values:
return 1
elif self.values < other.values:
Expand Down Expand Up @@ -146,6 +143,7 @@ def __init__(self, database, table, aws_profile=None, aws_region=None):
source=e)

self.storage_descriptor = self.table_definition["Table"]["StorageDescriptor"]
self.partition_keys = self.table_definition["Table"]["PartitionKeys"]

self.bucket, self.prefix = self._get_bucket()

Expand All @@ -164,18 +162,18 @@ def _get_bucket(self):
def partitions_on_disk(self, limit_days=0):
"""Find partitions in S3.
This function will crawl S3 for any partitions matching the following
path formats:
- year=yyyy/month=mm/day=dd/hour=hh/
- yyyy/mm/dd/hh/
This function will crawl S3 for any partitions it can find.
And will return a :obj:`list` of :obj:`Partition` matching those found
paths.
Args:
limit_days (`int`): Providing a value other than 0 will limit the
search to only partitions created in the past N days.
NOTE: limit_days only works if your first three partition keys are
[year, month, day]. Any other first three partitions will raise
an exception if limit_days is non-zero.
"""

if not isinstance(limit_days, int) or limit_days < 0:
Expand All @@ -184,48 +182,98 @@ def partitions_on_disk(self, limit_days=0):
if limit_days == 0:
return self._all_partitions_on_disk()

# only year/month/keys, these are the partitions
partition_keys = [k["Name"].lower() for k in self.partition_keys[:3]]
if partition_keys != ["year", "month", "day"]:
raise TypeError("limit_days only works on tables partitioned by year, month, and day")

# determine all possible path prefixes for days
partition_prefixes = []
today = datetime.datetime.now()
for i in range(0, limit_days + 1):
date_delta = datetime.timedelta(days=i)
partition_date = today - date_delta
hive_format = partition_date.strftime("year=%Y/month=%m/day=%d/")
flat_format = partition_date.strftime("%Y/%m/%d/")
partition_prefixes.append(hive_format)
partition_prefixes.append(flat_format)
values = [
str(partition_date.year),
str(partition_date.month),
str(partition_date.day),
]

hive_format = partition_date.strftime(f"{self.prefix}year=%Y/month=%m/day=%d/")
flat_format = partition_date.strftime(f"{self.prefix}%Y/%m/%d/")
partition_prefixes.append({"prefix": hive_format, "values": values})
partition_prefixes.append({"prefix": flat_format, "values": values})

partitions = []
for prefix in partition_prefixes:
for hour_match in self._prefix_match(f"{self.prefix}{prefix}", "hour", r"\d{2}"):
partitions.append(self._partition_from_path(hour_match))
if len(self.partition_keys) == 3:
for prefix in partition_prefixes:
partition = self._confirm_partition(**prefix)
if partition:
partitions.append(partition)
else:
for prefix in partition_prefixes:
partitions.extend(self._partition_finder(prefix["prefix"], idx=3, values=prefix["values"]))

return partitions

def _all_partitions_on_disk(self):
def _partition_finder(self, prefix, idx=0, values=[]):
strict = len(self.partition_keys) == 1

this_key = self.partition_keys[idx]
regex = r".*"
if this_key["Type"] == "int":
regex = r"\d{1,}"
key = this_key["Name"]

last_key = idx == len(self.partition_keys) - 1
partitions = []
for year_match in self._prefix_match(self.prefix, "year", r"\d{4}"):
for month_match in self._prefix_match(year_match, "month", r"\d{2}"):
for day_match in self._prefix_match(month_match, "day", r"\d{2}"):
for hour_match in self._prefix_match(day_match, "hour", r"\d{2}"):
partitions.append(self._partition_from_path(hour_match))
for match in self._prefix_match(prefix, key, regex, strict=strict):
these_values = values.copy()
these_values.append(match["value"])
if last_key:
this_prefix = match["prefix"]
partitions.append(Partition(
these_values,
f"s3://{self.bucket}/{this_prefix}",
))
else:
partitions.extend(self._partition_finder(match["prefix"], idx=idx + 1, values=these_values))

return partitions

def _partition_from_path(self, path):
match = re.search(self.PARTITION_MATCH, path)
location = f"s3://{self.bucket}/{path}"
return Partition(
[
match.group("year"),
match.group("month"),
match.group("day"),
match.group("hour"),
],
location)

def _prefix_match(self, prefix, partition_name, partition_value_regex):
regex = "({}=|){}/".format(partition_name, partition_value_regex)
def _all_partitions_on_disk(self):
partitions = self._partition_finder(self.prefix)

if not partitions and len(self.partition_keys) == 1:
flat_partitions = self._flat_partitions_on_disk()
partitions.extend(flat_partitions)

return partitions

def _flat_partitions_on_disk(self):
resp = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix)

items = set()

prefix_len = len(self.prefix)
for obj in resp["Contents"]:
name = obj["Key"][prefix_len:]
splits = name.split("/")
location_suffix = "/".join(splits[:-1])
partition_key = "-".join(splits[:-1])

partition = Partition(
[partition_key],
f"s3://{self.bucket}/{self.prefix}{location_suffix}/")
items.add(partition)

return list(items)

def _prefix_match(self, prefix, partition_name, partition_value_regex, strict=False):
base_regex = "({}=|)({})/"
if strict:
base_regex = "({}=)({})/"
regex = base_regex.format(partition_name, partition_value_regex)

resp = self.s3.list_objects_v2(
Bucket=self.bucket, Delimiter="/", Prefix=prefix)
Expand All @@ -237,11 +285,19 @@ def _prefix_match(self, prefix, partition_name, partition_value_regex):
prefix_len = len(prefix)
for obj in resp["CommonPrefixes"]:
name = obj["Prefix"][prefix_len:]
if re.match(regex, name):
items.append(obj["Prefix"])
match = re.search(regex, name)
if match:
items.append({"prefix": obj["Prefix"], "value": match.group(2)})

return items

def _confirm_partition(self, prefix, values):
resp = self.s3.list_objects_v2(
Bucket=self.bucket, Delimiter="/", Prefix=prefix)
if resp["KeyCount"] == 0:
return None
return Partition(values, f"s3://{self.bucket}/{prefix}")

def existing_partitions(self):
args = {
"DatabaseName": self.database,
Expand Down
129 changes: 129 additions & 0 deletions tests/partitioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def setUp(self):

@mock_glue
def test_init(self):
"""Confirm Partitioner.__init__ grabs table data from AWS"""
self.helper.make_database_and_table()

partitioner = Partitioner(self.database, self.table, aws_region=self.region)
Expand All @@ -36,6 +37,7 @@ def test_init(self):

@mock_glue
def test_init_no_table(self):
"""Partitioner.__init__ should raise an error when no table exists"""
with self.assertRaises(GlutilError) as context:
Partitioner(self.database, self.table, aws_region=self.region)

Expand Down Expand Up @@ -566,6 +568,133 @@ def test_update_partition_locations_with_mix_of_good_and_bad(self):
errors.should.have.length_of(1)
errors[0]["Partition"].should.equal(bad_partition.values)

@mock_glue
@mock_s3
def test_find_partitions_simple_schema(self):
"""Partitioner.partitions_on_disk should work with AWSLogs-like tables"""
self.s3.create_bucket(Bucket=self.bucket)
db_input = self.helper.create_database_input()
self.glue.create_database(**db_input)

table_input = self.helper.create_table_input()
table_input["TableInput"]["PartitionKeys"] = [
{"Name": "dt", "Type": "string"},
]

self.glue.create_table(**table_input)

# create initial partition
prefix = table_input["TableInput"]["StorageDescriptor"]["Location"]
location = f"{prefix}/2019/01/02/"
s3_key = f"{location}object.json"
splits = s3_key[len("s3://"):].split("/", 1)
bucket = splits[0]
path = splits[1]

self.s3.put_object(
Body='{"foo": "bar"}',
Bucket=bucket,
Key=path,
)

partitions = [Partition(["2019-01-02"], location)]

partitioner = Partitioner(self.database, self.table, aws_region=self.region)
found_partitions = partitioner.partitions_on_disk()

set(found_partitions).should.equal(set(partitions))

@mock_glue
@mock_s3
def test_find_partitions_single_key(self):
"""Partitioner.partitions_on_disk should work with single-key tables, in hive-format"""
self.s3.create_bucket(Bucket=self.bucket)
db_input = self.helper.create_database_input()
self.glue.create_database(**db_input)

table_input = self.helper.create_table_input()
table_input["TableInput"]["PartitionKeys"] = [
{"Name": "dt", "Type": "string"},
]

self.glue.create_table(**table_input)

# create initial partition
prefix = table_input["TableInput"]["StorageDescriptor"]["Location"]
location = f"{prefix}/dt=2019-01-02/"
s3_key = f"{location}object.json"
splits = s3_key[len("s3://"):].split("/", 1)
bucket = splits[0]
path = splits[1]

self.s3.put_object(
Body='{"foo": "bar"}',
Bucket=bucket,
Key=path,
)

partitions = [Partition(["2019-01-02"], location)]

partitioner = Partitioner(self.database, self.table, aws_region=self.region)
found_partitions = partitioner.partitions_on_disk()

set(found_partitions).should.equal(set(partitions))

@mock_glue
@mock_s3
def test_find_partitions_with_limit_no_hour_partition(self):
"""Partitioner.partitions_on_disk, limit_days set,
on a table partitioned by day, should work"""
self.s3.create_bucket(Bucket=self.bucket)
db_input = self.helper.create_database_input()
self.glue.create_database(**db_input)

table_input = self.helper.create_table_input(location=f"s3://{self.bucket}/{self.table}/")
table_input["TableInput"]["PartitionKeys"] = [
{"Name": "year", "Type": "string"},
{"Name": "month", "Type": "string"},
{"Name": "day", "Type": "string"},
]

self.glue.create_table(**table_input)

today = pendulum.now()

partitions = []
for i in range(1, 11):
partition_date = today.subtract(days=i)
year = partition_date.strftime("%Y")
month = partition_date.strftime("%m")
day = partition_date.strftime("%d")

partition = Partition([year, month, day], f"s3://{self.bucket}/{self.table}/{year}/{month}/{day}/")
self.helper.write_partition_to_s3(partition)
partitions.append(partition)

partitioner = Partitioner(self.database, self.table, aws_region=self.region)
found_partitions = partitioner.partitions_on_disk(limit_days=4)
found_partitions.should.have.length_of(4)
set(found_partitions).should.equal(set(partitions[0:4]))

@mock_glue
@mock_s3
def test_find_partitions_with_limit_bad_partition_keys(self):
"""Partitioner.partitions_on_disk, limit_days set,
on a single-partition table raises an error"""
self.s3.create_bucket(Bucket=self.bucket)
db_input = self.helper.create_database_input()
self.glue.create_database(**db_input)

table_input = self.helper.create_table_input(location=f"s3://{self.bucket}/{self.table}/")
table_input["TableInput"]["PartitionKeys"] = [
{"Name": "dt", "Type": "string"},
]

self.glue.create_table(**table_input)

partitioner = Partitioner(self.database, self.table, aws_region=self.region)
partitioner.partitions_on_disk.when.called_with(limit_days=4).should.have.raised(TypeError)


class PartitionTest(TestCase):
def test_partition_comparisons(self):
Expand Down

0 comments on commit 731d752

Please sign in to comment.