Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ce/auto batches #19054

Merged
merged 109 commits into from Jun 6, 2018
Merged

Ce/auto batches #19054

merged 109 commits into from Jun 6, 2018

Conversation

calellowitz
Copy link
Contributor

This changes batches to have an auto incrementing id and start at the date of the last successful batch. It also prints the ID, to help airflow.

@gcapalbo @sravfeyn @czue cc: @mkangia

includes commits from #19052 to avoid merge conflicts

return string_to_utc_datetime(date_str)
except ValueError:
raise CommandError('Not a valid date string: {}'.format(date_str))
print(new_batch.id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E1601 print statement used

@calellowitz calellowitz added the product/invisible Change has no end-user visible impact label Jan 8, 2018
Copy link
Member

@czue czue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really feel I have enough context to approve this, but threw in a few questions. Seems logical/fine generally.

def handle(self, **options):
last_batch = Batch.objects.filter(completed_on__isnull=False).order_by('-end_datetime').first()
start = last_batch.end_datetime
end = date.today()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not knowing much about how this works - I assume it's fine if this ends up being quite a large window? We'd never want to do anything explicitly?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should allow windows to be any size - from a couple hours to a couple weeks, but we should be setting this to datetime.utcnow().

in order for this range method to work, the following needs to be true:

  • we need to store utc timestamps in the batch table
  • we need to be comparing to last modified utc timestamps in the source data when we pull, which are set from server processes (not any timestamps that come from the phones)
  • we need to be comparing last_modified with > start_datetime and <= end_datetime

def add_arguments(self, parser):
parser.add_argument('batch_id')
def handle(self, **options):
last_batch = Batch.objects.filter(completed_on__isnull=False).order_by('-end_datetime').first()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense in steady state, but how does the first batch get into the DB? Does this need to check for that scenario?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah if it doesn't find one we'll need to set the start_datetime to a date that's far enough in the past to capture all source data

@@ -3,8 +3,6 @@


class Batch(models.Model):
batch_id = models.UUIDField(unique=True, db_index=True, primary_key=True)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably make all batch id foreign key columns be ints now too. not sure if we'll be able to use an actual foreign key if any facts get sharded. but if we ever have the capability to replicate certain tables across all shards, the batch table would be one to do that with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know how to do this in django? All of the column specs in the migrations just say foreign key (it must determine the column type automatically). Perhaps adding a new migration with the same specification will force it to redetermine what type it is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, when you run the migration in this pr does it update the foreign key columns too? if not a manual migration might be needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like i never responded to this comment but I have added migrations to do that in future commits

@gcapalbo
Copy link

gcapalbo commented Jan 8, 2018

@calellowitz looks good. i think we're going to need to track batches on a per-dim and per-fact basis though. so we'll need to add the dim/fact slug to the batch table, and the process will be something like:

start batch for app dim >> load app staging >> load app dim >> complete batch for app dim

and for facts or any dims that have dependencies, we'll need to set the end_datetime of the batch to be the oldest end_datetime of all its dependencies' batches.

there's a couple reasons we're going to need it this way:

  • when we add new dims, we'll need to load the entire history in the first batch. if we just use one shared batch it would only load an incremental on its first run.

  • by splitting the batch per dim and fact, failures in lower priority dims / facts that nothing else depends on won't hold up loading other dims and facts on their regularly scheduled times

start = options.get('start')
end = options.get('end')
def handle(self, dag_slug, **options):
last_batch = Batch.objects.filter(dag_slug=dag_slug, completed_on__isnull=False).order_by('-end_datetime').first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (122 > 115 characters)

)
print(new_batch.id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E1601 print statement used

def handle(self, dag_slug, **options):
last_batch = Batch.objects.filter(dag_slug=dag_slug, completed_on__isnull=False).order_by('-end_datetime').first()
start = last_batch.end_datetime if last_batch else '2000-01-01'
end = datetime.utcnow()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as i mentioned in my comment we're going to need to inspect if this dim or fact has dependencies and if so use the oldest end_datetime from all dependencies as the end time.

each dim and fact class should have the dependencies listed in the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to push as much of the dependency logic to airflow as possible. If we make the batches self contained (one per dim for example), we can have airflow only run each batch when those dependencies have been fulfilled. Then we would not need to track dependencies in both airflow and our own code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the only requirement here is to set the end datetime based on the oldest of the dependencies' batches end times because if you're loading a fact which links to data from a few different dims, then you need to make sure all your pulled fact data will have corresponding dim entries so that the foreign key references won't be blank. whether we do that here or within airflow doesn't really matter, but i don't think we can avoid doing this.

@@ -8,6 +8,7 @@ class Batch(models.Model):

created_on = models.DateTimeField(auto_now_add=True)
completed_on = models.DateTimeField(null=True)
dag_slug = models.CharField(max_length=100)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they might be the same but this should be the slug defined on the warehouse model class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, missed this comment when it was first made. This slug does not match the model class because it defines a dag that could include many models. For example the batch that loads app_staging and the app_dim has a single slug called app. Similarly the one that goes through all the staging tables for the app_status_fact has the app_status slug, but none match the exact warehouse model since they often include many tables including multiple dims and facts

@gcapalbo my understanding was that that was what we agreed on.

@calellowitz
Copy link
Contributor Author

Based on all the conversation on this PR, and the linked airflow one, I am going to flesh out this proof of concept more, since I think it is hard to have these conversations in the abstract. Over the next few days, I will continue to push to these PRs and any and all feedback while I am working is appreciated, but do not feel obligated to continue reviewing until I ping to say I think its done.

if append:
batches = cls.objects.distinct('batch').values_list('batch', flat=True)
if batches:
oldest = Batch.objects.filter(pk__in=batches).order_by('start_datetime').first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'Batch'

@calellowitz calellowitz added the Open for review: do not merge A work in progress label Jan 15, 2018
@calellowitz
Copy link
Contributor Author

@gcapalbo
I think this is ready for a real review now. Beyond finalizing the batch methods, the main thing i changed was to make loading a staging table idempotent. If a later dim needed the same staging table as an earlier one, the load method will now determine if the later batch needs earlier data, and if so, add that data, otherwise it will leave the table as it was. This does mean we could write to the staging tables more than once per run of the DAG, but that seemed like an ok tradeoff to make, and would only happen if the multiple dependencies were not run at the same time in the past (likely only the first run of a new dim or fact that depends upon that staging table)

if batch.start_datetime < oldest.start_datetime:
batch.end_datetime = oldest.start_datetime
else:
return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not completely following what append is used for and what is happening when it's True - what's this for?

__in queries also don't perform that well at high scale

if batch.start_datetime < oldest.start_datetime:
batch.end_datetime = oldest.start_datetime
else:
return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about append

@gcapalbo
Copy link

gcapalbo commented Jan 17, 2018 via email

@calellowitz
Copy link
Contributor Author

what are the scenarios in which we'd need to do this? i think having
multiple processes update the same staging tables within a single run of
the warehouse is something we should try to avoid

The idea was that when there are multiple dims that depend on the same staging table, each dim would do its own load operation, by checking whether the batches that are already in the staging table cover the timeframe needed by the current batch. If not, it would load just the needed extra data, otherwise the load does nothing. That is the append logic above.

@gcapalbo
Copy link

i don't think we have any dims or facts where we need this today though right? will defer to your judgement whether we need this or not, but it might introduce some complexity that we don't require

@calellowitz
Copy link
Contributor Author

i don't think we have any dims or facts where we need this today though right? will defer to your judgement whether we need this or not, but it might introduce some complexity that we don't require

I was picturing dims like the UserGroupDim or UserLocationDim both of which depend on staging tables that are also used in other dims. So for example the GroupStaging table is used by both the UserGroupDim and the GroupDim. The work I did makes it so that if those two dims require different different dates worth of data it can happen naturally as opposed to precomputing the oldest date. If a dim that updates later needs data from an earlier date, it will load those dates into the staging table. If you think precomputing dates is simpler, I think we can make that change, it just requires us to change how we track dependencies (likely in the hq models themselves).

@gcapalbo
Copy link

i still have a feeling that having multiple processes updating the same staging tables might get us into trouble. i think the preferred way of doing this is to design the airflow process in such a way that the dims with these dependencies always run together and always use the same dates and staging data.

if we ever add a new dim that also has the same staging dependencies as an existing one, we'd just do a one-time historical update, and then put it on the same schedule as the others so it can just share the same staging data as the rest going forward.

i think that might simplify things a bit - or do you think we would still need the append functionality?

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

from collections import namedtuple
from datetime import date, datetime, timedelta

from couchdbkit import ResourceNotFound
from django.conf import settings

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F401 'django.conf.settings' imported but unused

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

migrations.AlterField(
model_name='formfact',
name='user_dim',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, to='warehouse.UserDim'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (116 > 115 characters)

migrations.AlterField(
model_name='applicationstatusfact',
name='app_dim',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, to='warehouse.ApplicationDim'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (123 > 115 characters)

migrations.AlterField(
model_name='appstatusformstaging',
name='app_dim',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, to='warehouse.ApplicationDim'),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (123 > 115 characters)


from corehq.sql_db.routers import db_for_read_write
from casexml.apps.phone.models import SyncLog

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F401 'casexml.apps.phone.models.SyncLog' imported but unused

@nickpell
Copy link
Contributor

Staging deploy fails with:

[hqdjango0-staging.internal-va.commcarehq.org] out: CommandError: Conflicting migrations detected; multiple leaf nodes in the migration graph: (0013_adding_indices_for_warehouse, 0013_rm_mptt_fields in locations).
[hqdjango0-staging.internal-va.commcarehq.org] out: To fix them run 'python manage.py makemigrations --merge'

@calellowitz Can you fix?

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

# Generated by Django 1.11.13 on 2018-06-01 06:08
from __future__ import unicode_literals

from django.db import migrations

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W1618 import missing from __future__ import absolute_import

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

@calellowitz calellowitz removed the Open for review: do not merge A work in progress label Jun 5, 2018
@calellowitz
Copy link
Contributor Author

Tests are now passing. I think it is finally time to merge the warehouse master pr.

@snopoke @gcapalbo

Copy link
Contributor

@snopoke snopoke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phew! Big PR! I haven't been following very closely only noticed some things now. Added a few comments but nothing blocking.

@@ -379,12 +379,12 @@ class SQLLocation(AdjListModel):
domain = models.CharField(max_length=255, db_index=True)
name = models.CharField(max_length=255, null=True)
location_id = models.CharField(max_length=100, db_index=True, unique=True)
location_type = models.ForeignKey(LocationType, on_delete=models.CASCADE)
location_type = models.ForeignKey(LocationType, on_delete=models.CASCADE, db_index=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a migration for this but I also think that foreign keys are automatically indexed so this shouldn't be necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ok. i will just delete that addition


user_dim = models.ForeignKey(UserDim, on_delete=models.PROTECT)
# not all synclogs have domains, added in 11/2016
domain_dim = models.ForeignKey(DomainDim, on_delete=models.PROTECT, null=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not infer this from the user and populate it anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we can for web users using webapps, but all new synclogs should have this anyway so we would not expect that to be an issue

@@ -8,6 +8,7 @@ class Batch(models.Model):

created_on = models.DateTimeField(auto_now_add=True)
completed_on = models.DateTimeField(null=True)
dag_slug = models.CharField(max_length=100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@calellowitz
Copy link
Contributor Author

Phew! Big PR!

haha yeah. the original pr was approved a while ago. the rest is from merging in other PRs that were separately approved so nothing should be too new, but always good to have another set of eyes

options={
'abstract': False,
},
bases=(models.Model, corehq.warehouse.models.shared.WarehouseTable, corehq.warehouse.etl.CustomSQLETLMixin),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E501 line too long (120 > 115 characters)

@snopoke snopoke merged commit bff2f5b into master Jun 6, 2018
@snopoke snopoke deleted the ce/auto-batches branch June 6, 2018 13:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
product/invisible Change has no end-user visible impact
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants