Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WithStreamingInputTable() #8

Merged
merged 6 commits into from
Dec 28, 2019
Merged

Conversation

EvanBoyle
Copy link
Owner

@EvanBoyle EvanBoyle commented Dec 28, 2019

Large refactor. I've created a fluent API for adding tables that I think makes more sense. The DW itself only creates the necessary buckets and top level glue database upon construction. Subsequent fluent withTable or withStreamingInputTable calls actually create glue tables and store them within the component. This is congruent with the idea that you'd like to create a single database with multiple tables.

Some more detail of what included in this review:

  1. Fluent .withTable() api. This is the escape hatch that allows a user to create a table of any definition, and then populate the underlying S3 data in some sort of custom manner. We still need to update the API at some point to support formats other than parquet (json, csv, tsv), but I've added a todo for now.
  2. Fluent .withStreamingInputTable() api. This created the glue table, kinesis stream, parquet firehose stream, and partition registrat, and just works. This is the core use case that I was targeting when I though about building this library. Calling this API gives you an endpoint that you can send records to, and they will automatically end up queryable via athena in an inserted_at partitioned format.
  3. Support multiple tables, test this in the example. There were lots of places in the code where we had things hardcoded in, either for the logs table name or for pulumi resource names (which must be unique if you want to create multiple instances). Now the example created impressions and clicks tables that share the same schema.
  4. I added a TODO and method stub for withBatchInputTable(). I think we should have a batch counterpart for the streaming. I can see this being useful for cases where you may aggregate streaming events on an hourly basis into higher level statistics tables. I'll create an issue with more details on what this might look like.

One thing that became painfully obvious to me during this process was that we need an integration test. My current process for making changes is tearing down my stack (with requires manually deleting all of the data from the S3 buckets first), recreating them, and then issuing an athena query to make sure I get data back from both tables. We could definitely write a script that automates this.

@EvanBoyle EvanBoyle changed the title Evan/with streaming input table WithStreamingInputTable() Dec 28, 2019
Copy link
Collaborator

@jmaysrowland jmaysrowland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts about adding testing for the features?

It's something I've been thinking about for a while, and honestly don't know how we would go about it.
Could brake the code up into smaller components, and test the components that are easier to test. But the ones that actually call AWS, we wouldn't really be able to test. Right?

Refactor looks good.

import { getS3Location } from "../../utils";
import { createPartitionDDLStatement } from "./partitionHelper";

export class HourlyPartitionRegistrar extends pulumi.ComponentResource {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see a case for not having hourly buckets? Maybe Daily or every minute?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I definitely can see some customization here for other use cases. For this one, we specifically care about hourly as that is all that kinesis firehose supports for the output folders. But it might make sense to refactor or create something new for registering the output for batch tables.

athenaResultsBucket: aws.s3.Bucket;
database: aws.glue.CatalogDatabase;
region: string;
scheduleExpression?: string; // TODO: we should remove this. It's useful in active development, but users would probably never bother.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see a case for not having hourly buckets? Maybe Daily or every minute?

If not, I can definitely see the benefit of taking this out.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was more about being able to set a 1 minute interval during development when we're manually testing things. Just so we didn't have to wait an hour to see results between changes.

@jmaysrowland jmaysrowland merged commit 1ec10b5 into master Dec 28, 2019
@jmaysrowland jmaysrowland deleted the evan/withStreamingInputTable branch December 28, 2019 07:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants