Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Clarify documentation to note that ingesting s3 access logs from a sub-prefix inside prefix does not work #33

Open
bbuechler opened this issue Jul 18, 2023 · 5 comments

Comments

@bbuechler
Copy link

I ran into an issue where we aggregate S3 distribution logs from a variety of sources into one account, and the logs are broken down into sub-prefix:

s3://<log-bucket>/s3_distribution_logs/<deployment-name-sub-prefix>/<log-file-name>

I was trying to run a single Glue job for s3://<log-bucket>/s3_distribution_logs/ to populate all <deployment-name-sub-prefix> logs into the same CONVERTED_TABLE_NAME. In this case, the RAW_TABLE_NAME athena table was getting populated, the job would initially error withe the below error, then on subsequent runs would run "successfully". Unfortunately, I wouldn't get any logs into my CONVERTED_TABLE_NAME Athena table.

With continuous logging enabled, and a little tinkering, I tracked the issue down to _get_first_key_in_prefix():

line 128, in _get_first_key_in_prefix
    first_object = response.get('Contents')[0].get('Key')
    TypeError: 'NoneType' object is not subscriptable

The values going into self.s3_client.list_objects_v2(**query_params) were:

{'Bucket': 'reformated-log-bucket', 'Prefix': 's3_access/', 'MaxKeys': 10}

from the glue_jobs.json:

"S3_CONVERTED_TARGET":"s3://reformated-log-bucket/s3_access/"

Its entirely unclear to my why, since I'm VERY new to both this project and Glue in general, but if I supply

"S3_SOURCE_LOCATION":"s3://<log-bucket>/s3_distribution_logs/<deployment-name-sub-prefix>/"

instead of:

"S3_SOURCE_LOCATION":"s3://<log-bucket>/s3_distribution_logs/"

...it just work. Albeit with a smaller subset of data than I wanted. This may also be related to #30

@bbuechler
Copy link
Author

bbuechler commented Jul 27, 2023

A little bit of follow up.

I've been working with this a bunch. I was getting the 'NoneType' object is not subscriptable error because the Dynamic Frame wasn't getting any data. This was because by default, Dynamic Frames some how only look at the top-level and don't recurse into sub-prefix/directories.

So, even though the Athena table was built around s3://bucket/path/, it wasn't picking up objects at s3://bucket/path/sub-path/log_file.ext. When that happened, data_frame had no payload, and partition write out was skipped. Since this is the initial run, nothing had been ever written to the output bucket.

The next step in the s3_access_job.py workflow call to job.convert_and_partition() is add_new_optimized_partitions(). This calls self.optimized_catalog.get_and_create_partitions() followed by self.partitioner.build_partitions_from_s3() and finally the call to s3_reader.get_first_hivecompatible_date_in_prefix(). That is making the failing call in my original post since it can't find any partitions AT ALL, as the initial population was aborted.

Based on this Re:Post Question, I added the additional_options = {"recurse": True} value to my Dynamic Frame and it JustWorked™. I updated my workflow to add --recursive parameter that I pass down into the DataConverter class and parse into True/False since some times I don't want recursion.

I also changed DataConverter.run() and job.trigger_conversion() to return True/None based on and then skip add_new_optimized_partitions() if we didn't add any new data.

Ultimately though, I may be totally SOL on this project. When the data gets optimized into parquet files, there is no backwards traceability to the original source log document. I tried a bunch of methods to include the raw Athena tables "$PATH" into the converted/optimized Athena and I couldn't find a way. I tried using spark.sql() to re-write the dynamic frame to include $PATH, and I tried creating an Athena view, but DynamicFrames can't read views. I spent days trying to coax a solution, but much like this StackOverflow Post, I came up empty.

@dacort
Copy link
Contributor

dacort commented Jul 28, 2023

Hi @bbuechler - Thanks for opening this issue and providing so much information. Apologies I haven't been able to respond until now.

For S3 Access Logs, the tool does assume that they're in their original location as written to by the S3 service. In other words, not organized under a sub-folder or partition. I'm glad you were able to get it to work, but yes there's no sort of lineage tracking. I presume you could potentially add input_file_name to the Glue code to extract the filename (the Athena "$PATH" wouldn't work since the conversion code is Glue).

Unfortunately I haven't been able to maintain this project since publishing - it'd be nice to update for newer versions of Glue or make the library more generic so it could run anywhere, or even just make it Athena CTAS/INSERT INTO. But alas, not able to at the moment.

@bbuechler
Copy link
Author

@dacort No worries, I understand the loss of traction. This was an awesome project to help me to learn a bunch of the mechanics of Glue and PySpark, something I previously had no experience with. Hopefully the {"recurse": True} might help someone else get their workflow moving.

I had considered input_file_name(), but I interpreted "file name of the current Spark task" as the python executable name. 🤦 I moved right passed it instead of trying. I just tried it. It gave me exactly what I needed. 🫠

Any way, I appreciate your work, and hopefully my input above can help someone else in the future, even if this project does not continue to evolve.

FWIW, I've been running my ingests as Glue 3 to take advantage of autoscaling. Seems to be working without a hitch. Glue 4 gave me a deprecation warning, but seemed to execute just fine.

@dacort
Copy link
Contributor

dacort commented Jul 28, 2023

Awesome, glad to hear you got it working! S3 Access Logs, given their legacy, can be quite persnickety. :)

Super glad to hear it's been helpful to you and thanks for the kudos. 🙏

@bbuechler
Copy link
Author

One final note for anyone following in my footsteps. The input_file_name() function does not work if file grouping is used for processing efficiency. File grouping is automatically enabled when processing +50K objects. This was a frustrating and confusing feature that was discovered during larger-scale log ingest testing. With File Grouping enabled, input_file_name() returns an empty value. If traceability is more important than throughput and cost optimization, it can be disabled in the additional_options block. This block is from the run() function of the DataConverter class.

# Retrieve the source data from the Glue catalog
source_data = self.glue_context.create_dynamic_frame.from_catalog(
    database=self.data_catalog.get_database_name(),
    table_name=self.data_catalog.get_table_name(),
    transformation_ctx="source_data",
    additional_options = {
        "recurse": True,       # RECURSIVE!
        "groupFiles": 'none'   # Performance Hit to ensure input_file_name works
    }
)

to inject the log file name, I just add it after the Dynamic Frame is converted to a Data Frame, also within converter.py.

from pyspark.sql.functions import input_file_name, lit
....
data_frame = data_frame.withColumn("log_object", lit(input_file_name()))

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants