In [None]:
# Allows us to use the BigQuery "magic" (%%bigquery)
%load_ext google.cloud.bigquery

# Required so python actually gets re-executed every time
%load_ext autoreload
%autoreload 2

In [None]:
import os
import sys
import pandas as pd
parts = os.path.normpath(os.getcwd()).split(os.path.sep)
relative_path_parts = parts[:parts.index('pulse-data') + 1]
abs_path = os.path.join('/', *relative_path_parts)

if sys.path[0] != abs_path:
    sys.path.insert(0, abs_path)

from datetime import datetime
from google.cloud import bigquery
from typing import Optional

from recidiviz.big_query.big_query_client import BigQueryClientImpl
from recidiviz.big_query.view_update_manager import TEMP_DATASET_DEFAULT_TABLE_EXPIRATION_MS
from recidiviz.ingest.direct.controllers.direct_ingest_view_collector import DirectIngestPreProcessedIngestViewCollector
from recidiviz.ingest.direct.views.direct_ingest_big_query_view_types import DirectIngestPreProcessedIngestView, RawTableViewType
from recidiviz.utils import regions
from recidiviz.utils.environment import GCP_PROJECT_STAGING
from recidiviz.utils.metadata import local_project_id_override, project_id
from google.cloud.bigquery import magics
magics.context.progress_bar_type = None

region_code = 'US_TN'
view_tag = 'AssignedStaffSupervisionPeriod'
dataset_prefix = 'caroletouma'
validation_dataset = 'us_tn_validation_scratch'


ingest_view_individual_IDs_by_day_by_site = """
    SELECT 
        DISTINCT LTRIM(OffenderID, '0') as OffenderID,
        PeriodDate as day,
        Site as site,
    FROM `recidiviz-staging.caroletouma_AssignedStaffSupervisionPeriod_validation.AssignedStaffSupervisionPeriod_latest` raw_periods,
    UNNEST(GENERATE_DATE_ARRAY(Date(2021, 10, 1), Date(2021, 10, 18), INTERVAL 1 DAY)) PeriodDate
    WHERE 
        (
            STRING(PeriodDate) BETWEEN raw_periods.StartDate AND raw_periods.EndDate
            OR (
                STRING(PeriodDate) >= raw_periods.StartDate AND raw_periods.EndDate is NULL
            )
        )
    GROUP BY day, site, OffenderID
    ORDER BY day
"""

ingest_view_population_by_day_by_site = """
    SELECT 
        PeriodDate as day,
        Site as site,
        COUNT(DISTINCT OffenderID) AS ingest_view_population
    FROM `recidiviz-staging.caroletouma_AssignedStaffSupervisionPeriod_validation.AssignedStaffSupervisionPeriod_latest` raw_periods,
    
    UNNEST(GENERATE_DATE_ARRAY(Date(2021, 10, 1), Date(2021, 10, 18), INTERVAL 1 DAY)) PeriodDate
    WHERE 
        (
            STRING(PeriodDate) BETWEEN raw_periods.StartDate AND raw_periods.EndDate
            OR (
                STRING(PeriodDate) >= raw_periods.StartDate AND raw_periods.EndDate is NULL
            )
        )
    GROUP BY day, site
    ORDER BY day
"""

population_comparison_query = """
WITH
  validation_query AS (
  SELECT
    # The validation view totals for the first of the month are as of the last day of the previous month.
    ReportingDate as day,
    REPLACE(REGEXP_REPLACE(SiteID, r'([\\'\\"])', ''), ' ', '') AS site,
    COUNT(DISTINCT OffenderID) as validation_population
  FROM
    `recidiviz-staging.us_tn_validation_scratch.supervision_population_validation_10_13_2021`
    GROUP BY day, site
)
SELECT
  ingest_view_query.day AS day,
  validation_query.site AS site,
  ingest_view_query.ingest_view_population,
  validation_query.validation_population,
  ingest_view_query.ingest_view_population - validation_query.validation_population AS total_diff,
  ABS(ROUND(100 * SAFE_DIVIDE(ingest_view_query.ingest_view_population - validation_query.validation_population,
      validation_query.validation_population), 2)) AS percentage_off
FROM
  validation_query
INNER JOIN
  `recidiviz-staging.caroletouma_AssignedStaffSupervisionPeriod_validation.ingest_view_population_by_day_by_site_materialized` ingest_view_query
ON
  ingest_view_query.day = validation_query.day
WHERE
  ingest_view_query.site = validation_query.site
ORDER BY
  percentage_off DESC
"""

individual_id_comparison = """
WITH validation_query AS (
  SELECT
    CAST(OffenderID AS STRING) as OffenderID,
    ReportingDate as day,
    REPLACE(REGEXP_REPLACE(SiteID, r'([\\'\\"])', ''), ' ', '') AS site,
  FROM
    `recidiviz-staging.us_tn_validation_scratch.supervision_population_validation_10_13_2021`
),
ingest_view_query as (
    SELECT
        OffenderID,
        day,
        site
    FROM `recidiviz-staging.caroletouma_AssignedStaffSupervisionPeriod_validation.ingest_view_individual_IDs_by_day_by_site_materialized`
    -- Hard coded for now. Update later.
    where day = "2021-10-13"
),
validation_ids_missing_from_ingest_view AS (
    with missing as (
        SELECT 
        * 
        FROM validation_query
        EXCEPT DISTINCT
            SELECT * 
            from ingest_view_query
    )
    SELECT 
        *,
        'validation_ids_missing_from_ingest_view' as type
    from missing
),
ingest_view_ids_missing_from_validation AS (
    with missing as (
        SELECT 
        * 
        FROM ingest_view_query
        EXCEPT DISTINCT
            SELECT * 
            from validation_query
    )
    SELECT 
        *,
        'ingest_view_ids_missing_from_validation' as type
    from missing
)

SELECT * FROM validation_ids_missing_from_ingest_view

UNION ALL

SELECT * FROM ingest_view_ids_missing_from_validation
"""

pd.options.display.max_rows = 999

In [None]:
# Get the view
with local_project_id_override(GCP_PROJECT_STAGING):
    region = regions.get_region(region_code, is_direct_ingest=True)
    collector = DirectIngestPreProcessedIngestViewCollector(region, [])

    views_by_tag = {
        builder.file_tag: builder.build()
        for builder in collector.collect_view_builders()}

    view = views_by_tag[view_tag]

In [None]:
# Create a dataset for materialized tables
validation_sandbox_dataset_id = f'{dataset_prefix}_{view_tag}_validation'

with local_project_id_override(GCP_PROJECT_STAGING):
    bq_client = BigQueryClientImpl()

    validation_dataset = bq_client.dataset_ref_for_id(validation_sandbox_dataset_id)
    print(f'Validation dataset: {validation_dataset}')

    print(f'Creating dataset [{validation_dataset.project}.{validation_dataset.dataset_id}] ...')
    bq_client.create_dataset_if_necessary(
        validation_dataset,
        default_table_expiration_ms=TEMP_DATASET_DEFAULT_TABLE_EXPIRATION_MS
    )
    print(f'Done creating dataset [{validation_dataset.project}.{validation_dataset.dataset_id}] ...')

In [None]:
# Defines a function that can be used to materialize a table for use in later queries
def materialize_query_with_name(
    dataset_ref: bigquery.DatasetReference,
    view_tag: str,
    query_name: str,
    query: str,
    query_dt: Optional[datetime] = None
):
    table_id = f'{view_tag}_{query_name}'
    print(f'Writing {query_name} query to [{dataset_ref.project}.{dataset_ref.dataset_id}.{table_id}]...')
    
    parameters = [bigquery.ScalarQueryParameter('StartDate', 'DATETIME', query_dt)] if query_dt else None
    create_job = bq_client.create_table_from_query_async(
        dataset_id=dataset_ref.dataset_id,
        table_id=table_id,
        query=query,
        query_parameters=parameters,
        overwrite=True
    )
    create_job.result()
    print(f'Finished writing {query_name} query.')

In [None]:
# Query the view two times and materialize the results to tables so we can analyze query determinism 
with local_project_id_override(GCP_PROJECT_STAGING):
    latest_query = view.expanded_view_query(
        config=DirectIngestPreProcessedIngestView.QueryStructureConfig(
            raw_table_view_type=RawTableViewType.LATEST,
        )
    )
    
    materialize_query_with_name(
        dataset_ref=validation_dataset,
        view_tag=view_tag,
        query_name='latest',
        query=latest_query,
    )
    
    
    # Given the materalized ingest view, collect info into Site/Date: Individual ID
    materialize_query_with_name(
        dataset_ref=validation_dataset,
        view_tag='ingest_view_individual_IDs_by_day_by_site',
        query_name='materialized',
        query=ingest_view_individual_IDs_by_day_by_site,
    )
    
    # Given the materalized ingest view, collect info into Site/Date: Population Count
    materialize_query_with_name(
        dataset_ref=validation_dataset,
        view_tag='ingest_view_population_by_day_by_site',
        query_name='materialized',
        query=ingest_view_population_by_day_by_site,
    )
    
    # Compare the population counts for the validation counts vs. ingest view
    materialize_query_with_name(
        dataset_ref=validation_dataset,
        view_tag='comparison_population_counts_validation_vs_generated',
        query_name='materialized',
        query=population_comparison_query,
    )

    materialize_query_with_name(
        dataset_ref=validation_dataset,
        view_tag='individual_id_comparison',
        query_name='materialized',
        query=individual_id_comparison,
    )
    
    print('Load complete')

In [None]:
%%bigquery --params {"OffenderID": ""}
# Insert the specific `OffenderID`

# QUERY DESCRIPTION:
# This query pulls in the death date associated with the OffenderID.
SELECT 
    OffenderID,
    DeathDate,
    DeathType
FROM `recidiviz-staging.us_tn_raw_data_up_to_date_views.OffenderAttributes_latest`
WHERE OffenderID LIKE @OffenderID

In [None]:
%%bigquery --params {"OffenderID": ""}
# Insert the specific `OffenderID`

# QUERY DESCRIPTION:
# This query lists the movements associated with that OffenderID in the raw `AssignedStaff` table.
SELECT *
FROM `recidiviz-staging.us_tn_raw_data_up_to_date_views.AssignedStaff_latest`
WHERE OffenderID LIKE CONCAT("%", CONCAT(@OffenderID, "%"))
ORDER BY StartDate DESC

In [None]:
%%bigquery --params {"OffenderID": 000000}
# Insert the specific `OffenderID` (note: remove the extra "00" at the front of the ID, if applicable)

# QUERY DESCRIPTION:
# This query lists specific dates and facilities that the OffenderID was counted for in the **RAW** validation
# incarceration population counts.
SELECT *
FROM `recidiviz-staging.us_tn_validation_scratch.supervision_population_validation_10_13_2021`
WHERE OffenderID = @OffenderID
ORDER BY ReportingDate ASC
