In [None]:
%pip install -q -r requirements.txt

In [None]:

import importlib
import functions.core_functions as core_functions
import functions.pyarrow_functions as pyarrow_functions
import dask
import dask.dataframe as dd
from dask.dataframe.utils import assert_eq
import pandas as pd
import numpy as np
import pandas_gbq
import json
import gc
import cudf
import os
import yaml
from concurrent.futures import ProcessPoolExecutor
from functools import partial

dask.config.set({"dataframe.backend": "cudf"})

importlib.reload(core_functions)
importlib.reload(pyarrow_functions)

In [None]:
table_schema = yaml.safe_load(open('table-schemas.yaml'))

detections_python = table_schema['detections_python_schema']
detections_python

n90_schema_dict = {item['name']: item['type'] for item in detections_python}
veil_schema_dict = {item['name']: item['type'] for item in detections_python if 'geo_' not in item['name']}
final_detections_cols = []
for item in detections_python:
    final_detections_cols.append(item['name'])
    # print(f"{item['name']} = {item['type']}")


In [None]:
# veil_schema_dict

In [None]:
# # unlikely to be used
# from google.cloud import bigquery
# def convert_schema_to_bigquery(schema):
    # Mapping from your data types to BigQuery data types
    type_mapping = {
        'int64': 'INTEGER',
        'float64': 'FLOAT',
        'string': 'STRING',
        'bool': 'BOOLEAN',
        'datetime64[ns, UTC]': 'TIMESTAMP',
        'datetime64[ns]': 'TIMESTAMP',  # If any datetime fields lack timezone info
        'object': 'STRING',  # Pandas may use 'object' for string data
        # Add more mappings if necessary
    }
    
    bigquery_schema = []
    for field_name, field_type in schema.items():
        # Map the data type to BigQuery type
        bq_type = type_mapping.get(field_type, 'STRING')  # Default to STRING if type not found
        
        # Create SchemaField
        schema_field = bigquery.SchemaField(
            name=field_name,
            field_type=bq_type,
            mode='NULLABLE'  # Assuming all fields are nullable; adjust if needed
        )
        
        bigquery_schema.append(schema_field)
    
    return bigquery_schema

In [None]:
# convert_schema_to_bigquery(n90_schema_dict)

In [None]:
resp = {}
resp = core_functions.initialize_clients(service_account_secret_name='SA_ADHOC_BILLING')
resp2 = core_functions.initialize_clients(service_account_secret_name='SA_N90_CORE_APPS')

config = resp.get('config')
bigquery_client = resp.get('clients').get('bigquery_client')
n90_bigquery_client = resp2.get('clients').get('bigquery_client')
storage_client = resp.get('clients').get('storage_client')
sf_client = resp.get('clients').get('sf_client')
veil_billing = resp.get('config').get('veil_billing')
veil_vars = resp.get('config').get('veil_billing').get('vars')
# print(veil_billing)
sfdc_adv_account_cols = veil_billing.get('vars').get('sfdc_adv_account_cols')
sfdc_rate_card_cols = veil_billing.get('vars').get('sfdc_rate_card_cols')
unknown_dma_overrides = config.get('national_dma_overrides_to_us_national')

In [None]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
mongo_tables = ['dmas', 'shows']
mongo_data = core_functions.fetch_table_data(
    project_id=veil_billing.get('avs_project_id'),
    dataset_id='mongo',
    table_names=mongo_tables,
    bigquery_client=bigquery_client
)
dmas_df = core_functions.fix_df_dtypes(mongo_data['dmas'])
shows_df = core_functions.fix_df_dtypes(mongo_data['shows'])
shows_df['length'] = shows_df['length'].fillna(0.0).astype('Int64')
master_channel_sql = f"""
    SELECT * from `adhoc-billing.avs_billing_process.master_channels_expanded`
"""
master_channel_df = core_functions.fix_df_dtypes(core_functions.fetch_gbq_data(master_channel_sql, bigquery_client))
dmas_df['neustar_id'] = dmas_df['neustar_id'].fillna(808080).astype('Int64')
mask = dmas_df['neustar_id'].isin(unknown_dma_overrides)
dmas_df.loc[mask, 'neustar_id'] = 808080
dmas_df['dma_rank'] = dmas_df['dma_rank'].fillna(0).astype('Int64')
shows_df['show_id'] = shows_df['show_id'].fillna(-6).astype('Int64')
shows_df.rename(columns={'id': 'show_record_id'}, inplace=True)
master_channel_df['broadcaster_id'] = master_channel_df['broadcaster_id'].fillna(-10).astype('Int64')

broadcast_cal_sql = f"""
    SELECT id as bcw_id, bcw_index, bcm_index, bcw_start_date, bcw_end_date FROM `adhoc-billing.avs_billing_process.lu_broadcast_week`
"""
broadcast_cal_df = core_functions.fetch_gbq_data(query=broadcast_cal_sql, bigquery_client=bigquery_client)


In [None]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
geo_sql = f"""
    SELECT *
    from `next90-core-applications.next90_analytics.geos` WHERE geo_type = 'dma'
"""
geo_df = core_functions.fix_df_dtypes(core_functions.fetch_gbq_data(query=geo_sql, bigquery_client=n90_bigquery_client))

int_cols = ['geo_location', 'geo_neustar_id','geo_us_msa_id', 'geo_us_county_fips_id','geo_ca_cma_id']
for col in int_cols:
    geo_df[col] = geo_df[col].fillna(-1).astype('Int64')



In [None]:
# from datetime import datetime
# from dateutil.relativedelta import relativedelta

# def generate_month_list(start_date, end_date):
#     """
#     Generate a list of months in 'YYYY-MM' format from start_date to end_date (inclusive).
    
#     Args:
#         start_date (str): The start date in 'YYYY-MM-DD' format.
#         end_date (str): The end date in 'YYYY-MM-DD' format.
    
#     Returns:
#         list: List of months in 'YYYY-MM' format.
#     """
#     start = datetime.strptime(start_date, "%Y-%m-%d")
#     end = datetime.strptime(end_date, "%Y-%m-%d")
    
#     if start > end:
#         raise ValueError("start_date must be before or equal to end_date")
    
#     month_list = []
#     current = start
#     while current <= end:
#         month_list.append(current.strftime("%Y-%m"))
#         current += relativedelta(months=1)
    
#     return month_list

In [None]:


veil_storage_options = None
veil_storage_options = config.get('VEIL_GCS_STORAGE_OPTIONS')

n90_storage_options = None
n90_storage_options = config.get('N90_GCS_STORAGE_OPTIONS')

s4_storage_options = None
s4_storage_options = config.get('S4_STORAGE_OPTIONS')

veil_billing_bucket = None
veil_billing_bucket = config.get('veil_billing').get('billing_gcs_bucket_id')

# process_df['profile__attributes']

s4_bucket = 'n90-data-lake-stl'
s4_output_prefix = 'veil/detections_v2b'


n90_bucket = None
n90_bucket = 'n90_veil_partner'
n90_bucket_2 = 'n90-data-lake'

veil_output_prefix = None
veil_output_prefix = 'detections_v2b'
n90_output_prefix = None
n90_output_prefix = 'advocado-looker/avs_prod/detections_v2b'
n90_output_prefix_2 = 'avs_prod/detections_v2b'

# importlib.reload(core_functions)

## New Monthly Start
# read partitioned parquet files from s3



In [None]:
# encodings_input_prefix = 'veil/encodings_v2b'
# encodings_df = core_functions.fix_df_dtypes(pyarrow_functions.load_encodings_for_detections(s4_storage_options, s4_bucket, encodings_input_prefix))
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
encodings_sql = f"""
    SELECT encoding_id, isci, aeis_id, encoded_timestamp, format_id, format_name, customer_id, customer_name, sfdc_account_id,
    sfdc_account_name, sfdc_advertiser_id, attributes_cable_estimate, attributes_spot_estimate, encoder_group_id,
    encoder_id, encoder_group_name, length_in_seconds, billing_last_updated, billing_last_audit_id, clone_of
    FROM `next90-core-applications.n90_data_lake.avs_encodings`
"""
encodings_df = core_functions.fetch_gbq_data(encodings_sql, n90_bigquery_client)
encodings_df['segments_format_id_group'] = encodings_df['format_id'].apply(core_functions.assign_segment_group)
encodings_df.head()
# encodings_df['length_in_seconds'] = encodings_df['length_in_seconds'].astype('Int64')

In [None]:
# TODO: Add bcw and bcm as segments

encodings_df.head()

In [None]:
# #  TODO: look at whether to use this or not
# def process_detections(df, encodings_df):
#     df.dtypes

#     df['date_time'] = pd.to_datetime(df['date_time'], utc=True)
#     df['year'] = df['date_time'].dt.year
#     df['month'] = df['date_time'].dt.month
#     df['day'] = df['date_time'].dt.day
#     df['tv_show_id'] = df['tv_show_id'].fillna(-10) 
#     df['group_occurrence_id'] = df['group_occurrence_id'].fillna(-6)
#     # dmas_df['dma_rank'].replace('', 0).fillna(0, inplace=True)

#     df['occurrence_id'] = df['occurrence_id'].astype(float)
#     # # detections_df = detections_df.merge(master_channels_df, how='left', left_on='broadcaster_id', right_on='broadcaster_id')
#     # dmas_df.loc[mask, 'neustar_id'] = 808080
#     # dmas_df['neustar_id'] = dmas_df['neustar_id'].fillna(808080).astype(int)

#     df = df.merge(dmas_df, how='left', left_on='dma_id', right_on='dma_id')

#     shows_df['show_id'] = shows_df['show_id'].fillna(-5)
#     # mask = ((dmas_df['dma_id'].isin(unknown_dma_overrides)) )
#     df = df.merge(shows_df, how='left', left_on='tv_show_id', right_on='show_id')


#     geo_df['geo_location'] = geo_df['geo_location'].astype(int)
#     df = df.merge(geo_df, how='left', left_on='neustar_id', right_on='geo_location')
#     gc.collect()

#     needed_encodings = df['encoding_id'].unique().tolist()
    
    
#     encodings_df.sort_values(by=['encoding_id', 'billing_last_updated'], ascending=[True, False], inplace=True)
    
    
#     encodings_df = encodings_df.drop_duplicates(subset=['encoding_id'], keep='first')
#     needed_encodings_df = encodings_df.loc[encodings_df['encoding_id'].isin(needed_encodings)].copy()
#     df = df.merge(needed_encodings_df, how='left', left_on='encoding_id', right_on='encoding_id', suffixes=('', '_encoding'))

#     len(needed_encodings_df)
#     df = df.sort_values(by=['occurrence_id', 'year', 'month', 'day'])
#     billing_last_updated = pd.Timestamp.utcnow()
#     df['billing_det_last_updated'] = billing_last_updated
#     df['billing_det_last_updated'] = pd.to_datetime(df['billing_det_last_updated'], utc=True)
#     billing_last_audit_id = core_functions.generate_uuid()
#     df['billing_det_last_updated'] = billing_last_audit_id
#     df.head()

#     df_clean = core_functions.enforce_schema(df, n90_schema)

#     # os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
#     # core_functions.write_hive_partitioned_parquet(df_1, veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
#     # print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
#     os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
#     core_functions.write_hive_partitioned_parquet(df_clean, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
#     print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

#     # new cell
#     partition_cols
#     # print(encodings_bvs_df_to_write.dtypes)
#     # new data starts 4/9/2024 - check if mid-day or midnight 	
#     #  2024-04-08 13:10:23 UTC is the first detection in the new data

#     # change to veil format
#     veil_keys = list(veil_schema.keys())

#     os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
#     core_functions.write_hive_partitioned_parquet(df_clean[veil_keys], veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
#     print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
#     # os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
#     # core_functions.write_hive_partitioned_parquet(df_2, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
#     # print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

#     # new cell
#     partition_cols
#     # print(encodings_bvs_df_to_write.dtypes)

#     del df_clean
#     del needed_encodings_df
#     gc.collect()
    



In [None]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
reload_detections = True
test_mode = True
test_filter = "WHERE date_time >= '2024-01-01' AND date_time < '2024-11-01'"
detections_table = None
if reload_detections:
    detections_table = 'adhoc-billing.avs_billing_process.avs_detections_master'
    dates_sql = f"""
    SELECT  
        CAST(DATE(MIN(date_time)) AS STRING) AS min_date, 
        CAST(LEAST(DATE_ADD(DATE(MAX(date_time)), INTERVAL 1 DAY), CURRENT_DATE()) AS STRING) AS max_date
    FROM `{detections_table}` {test_filter}
    """
else:
    detections_table = 'adhoc-billing.avs_billing_process.detections'
    dates_sql = f"""
        SELECT  CAST(DATE_ADD(DATE(MAX(date_time)), INTERVAL 1 DAY) AS STRING) as min_date , cast(current_date() as string) as max_date
        FROM `{detections_table}`
        """
dates_df = core_functions.fetch_gbq_data(query=dates_sql, bigquery_client=bigquery_client)
min_date = dates_df['min_date'][0]
max_date = dates_df['max_date'][0]

print(f"min_date: {min_date}, max_date: {max_date}")
detections_df = None
    

In [None]:

# if detections_df:
#     del detections_df
#     gc.collect()
min_date = '2023-09-01'
max_date = '2024-01-01'
process_month_group_sql = f"""
    select distinct CONCAT(EXTRACT(YEAR FROM date_time),'-',FORMAT('%02d', EXTRACT(MONTH FROM date_time))) as process_month_group
    FROM `{detections_table}` 
    WHERE date_time >= '{min_date}' 
    ORDER BY process_month_group
"""

process_month_groups = core_functions.fetch_gbq_data(query=process_month_group_sql, bigquery_client=bigquery_client)
process_month_groups = process_month_groups['process_month_group'].tolist()
process_month_groups

In [None]:
process_month_groups = [
   '2023-09',
 '2023-10',
 '2023-11',
 '2023-12']

In [None]:

# process_month_group = '2024-08'
completed_months = []

for process_month_group in process_month_groups:

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
    detections_sql = f"""
        SELECT CAST(occurrence_id AS FLOAT64) AS occurrence_id, encoding_id, date_time, broadcaster_id, cost, tv_show_id, origin, group_occurrence_id, 
        EXTRACT(YEAR FROM date_time) AS year, EXTRACT(MONTH FROM date_time) as month, EXTRACT(DAY FROM date_time) as day,
        CONCAT(EXTRACT(YEAR FROM date_time),'-',FORMAT('%02d', EXTRACT(MONTH FROM date_time))) as process_month_group,
        -- Calculate week_within_month
        CAST((EXTRACT(DAY FROM date_time) - 1) / 7 + 1 AS INT64) AS week_within_month,
        last_updated, last_audit_id 

        FROM `{detections_table}` 

        WHERE date_time > '{min_date}' AND date_time < '{max_date}'
        AND CONCAT(EXTRACT(YEAR FROM date_time),'-',FORMAT('%02d', EXTRACT(MONTH FROM date_time))) = '{process_month_group}'
        """
    detections_df = core_functions.fetch_gbq_data(query=detections_sql, bigquery_client=bigquery_client)
    print(f"Loaded {len(detections_df)} detections for {process_month_group}: {len(detections_df)} rows")
    if len(detections_df) == 0:
        print(f"No detections found for {process_month_group}")
        continue
    encoding_ids = detections_df['encoding_id'].unique().tolist()
    cloned_encodings = encodings_df.loc[encodings_df['clone_of'].isin(encoding_ids)].copy()
    cloned_encoding_ids = cloned_encodings['clone_of'].unique().tolist()
    cloned_encoding_clone_df = None
    cloned_encoding_clone_df = cloned_encodings[['encoding_id', 'clone_of']].copy()
    cloned_encoding_clone_df.sort_values(by=['clone_of', 'encoding_id'], inplace=True)
    cloned_encoding_clone_df.rename(columns={'encoding_id': 'clone_to_encoding_id'}, inplace=True)
    cloned_encoding_clone_df.rename(columns={'clone_of': 'encoding_id'}, inplace=True)
    cloned_detections_df = detections_df.loc[detections_df['encoding_id'].isin(cloned_encoding_ids)].copy()
    cloned_detections_df['occurrence_id'] = (cloned_detections_df['occurrence_id'] + .1).astype('Float64')
    cloned_detections_df = cloned_detections_df.merge(cloned_encoding_clone_df, how='left', left_on='encoding_id', right_on='encoding_id')
    cloned_detections_df['encoding_id'] = cloned_detections_df['clone_to_encoding_id']
    cloned_detections_df = cloned_detections_df.drop(columns=['clone_to_encoding_id'])
    try:
        detections_df = pd.concat([detections_df, cloned_detections_df], ignore_index=True).sort_values(by=['occurrence_id', 'date_time'], ascending=[True, True]).reset_index(drop=True)
    except Exception as e:
        print(e)
        print('No cloned detections found')
        
    month_weeks = detections_df['week_within_month'].unique().tolist()
    month_weeks.sort()

        
    detections_df['occurrence_id'] = detections_df['occurrence_id'].astype('Float64')
    detections_df['cost'] = detections_df['cost'].astype('Float64')

    min_occurrence_id = detections_df['occurrence_id'].min()
    max_occurrence_id = detections_df['occurrence_id'].max()

    archive_sql = f"""
    select occurrence_id , any_value(affiliate) as affiliate, any_value(callsign) as callsign, any_value(dma_id) as dma_id
    from `adhoc-billing.avs_billing_process.billing_records_archive`
    where occurrence_id >= {min_occurrence_id} and occurrence_id <= {max_occurrence_id}
    group by occurrence_id
    """

    # archive_df = core_functions.fetch_gbq_data(query=archive_sql, bigquery_client=bigquery_client)
    print(len(detections_df))
    # print(len(archive_df))
    detection2 = detections_df.merge(master_channel_df, how='left', left_on='broadcaster_id', right_on='broadcaster_id')

    # detection2.loc[detection2['affiliate'].isnull(), 'affiliate'] 

    detections_df = detection2.copy()
    del detection2
    gc.collect()

    print('starting the append of encodings')
    # prod is good from april 2024 on the 8th


    needed_encodings_df = encodings_df.loc[encodings_df['encoding_id'].isin(encoding_ids)].copy()
    # # def process_detections(df, encodings_df):
    df = detections_df.copy().reset_index(drop=True)
    df = df.sort_values(by=['occurrence_id', 'date_time'], ascending=[True, False])
    df = df.drop_duplicates(subset=['occurrence_id'], keep='first')
    df = df.reset_index(drop=True)
    # # # def process_detections(df):
    # df.head()
    # # for col in df.columns:
    # #     print(f"'{col}': '{df[col].dtype}',")

    # df['date_time'] = pd.to_datetime(df['date_time'], utc=True)
    # df['year'] = df['date_time'].dt.year
    # df['month'] = df['date_time'].dt.month
    # df['day'] = df['date_time'].dt.day
    df['tv_show_id'] = df['tv_show_id'].fillna(-10).astype('Int64')
    df['group_occurrence_id'] = df['group_occurrence_id'].fillna(-6)
    # dmas_df['dma_rank'].replace('', 0).fillna(0, inplace=True)

    # df['occurrence_id'] = df['occurrence_id'].astype(float)
    # # detections_df = detections_df.merge(master_channels_df, how='left', left_on='broadcaster_id', right_on='broadcaster_id')
    # dmas_df.loc[mask, 'neustar_id'] = 808080
    # dmas_df['neustar_id'] = dmas_df['neustar_id'].fillna(808080).astype(int)
    print('starting the merge of dmas')
    df = df.merge(dmas_df, how='left', left_on='dma_id', right_on='dma_id')

    shows_df['show_id'] = shows_df['show_id'].fillna(-5)
    # mask = ((dmas_df['dma_id'].isin(unknown_dma_overrides)) )
    df = df.merge(shows_df, how='left', left_on='tv_show_id', right_on='show_id')


    geo_df['geo_location'] = geo_df['geo_location'].astype('Int64')
    df = df.merge(geo_df, how='left', left_on='neustar_id', right_on='geo_location')
    gc.collect()

    # needed_encodings = df['encoding_id'].unique().tolist()
    # encodings_df.sort_values(by=['encoding_id', 'billing_last_updated'], ascending=[True, False], inplace=True)
    # encodings_df = encodings_df.drop_duplicates(subset=['encoding_id'], keep='first')
    # needed_encodings_df = encodings_df[encodings_df['encoding_id'].isin(needed_encodings)].copy()
    df = df.merge(needed_encodings_df, how='left', left_on='encoding_id', right_on='encoding_id', suffixes=('', '_encoding'))

    print('adding broadcast cal details')
    df['key'] = 1
    broadcast_cal_df['key'] = 1
    df['airing_date_time'] = pd.to_datetime(df['date_time'])
    broadcast_cal_df['bcw_start_date'] = pd.to_datetime(broadcast_cal_df['bcw_start_date'])
    broadcast_cal_df['bcw_end_date'] = pd.to_datetime(broadcast_cal_df['bcw_end_date'])
    ref_df = None
    ref_df = broadcast_cal_df.loc[(broadcast_cal_df['bcw_start_date'] >= df['airing_date_time'].min()) & (broadcast_cal_df['bcw_end_date'] <= df['airing_date_time'].max())]
    merged_df = None
    merged_df = pd.merge(df, ref_df, on='key').drop(['key', 'airing_date_time'], axis=1)
    merged_df.sort_values(by=['occurrence_id'], inplace=True)
    merged_df = merged_df.drop_duplicates(subset=['occurrence_id'], keep='first')
    print(f'number of merged_df rows: {len(merged_df)}')
    merged_df['bc_year_index'] = merged_df['bcm_index'].astype(str).str[:4].astype('Int64')
    merged_df['bcm_index'] = merged_df['bcm_index'].astype('Float64')
    merged_df['bcw_index'] = merged_df['bcw_index'].astype('Float64')
    merged_df['segments_format_id_group'] = merged_df['segments_format_id_group'].astype('Int64')
    merged_df['dma_rank'] = merged_df['dma_rank'].astype('Int64')
    merged_df['neustar_id'] = merged_df['neustar_id'].astype('Int64')
    merged_df['show_id'] = merged_df['show_id'].astype('Int64')

    merged_df['last_updated'] = merged_df['last_updated'].apply(lambda x: core_functions.time_to_seconds(x))
    merged_df
    # df2 = core_functions.process_detections_segments(merged_df)
    print('Starting paralell processing')

    df2 = core_functions.process_detections_in_parallel(merged_df)
    # original_dtypes = merged_df.dtypes.to_dict()
    # merged_data_dict = merged_df.to_dict(orient='records')
    # processed_data = core_functions.process_detections_in_parallel(merged_data_dict)
    # df = pd.DataFrame.from_records(processed_data)

    # Restore original data types
    # for col, dtype in original_dtypes.items():
    #     if col in df:
    #         df[col] = df[col].astype(dtype)

    df = df2[final_detections_cols].sort_values(by=['year', 'month', 'week_within_month', 'day', 'occurrence_id']).copy().reset_index(drop=True)
    df[['_YEAR', '_MONTH', '_WEEK_WITHIN_MONTH']] = df[['year', 'month', 'week_within_month']]

    billing_last_updated = pd.Timestamp.utcnow()
    df['billing_det_last_updated'] = billing_last_updated
    df['billing_det_last_updated'] = pd.to_datetime(df['billing_det_last_updated'], utc=True)
    billing_last_audit_id = core_functions.generate_uuid()
    df['billing_det_last_updated'] = billing_last_audit_id
    df.head()

    print('beginning to write')
    partition_cols = None
    partition_cols = ['_YEAR', '_MONTH', '_WEEK_WITHIN_MONTH']

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
    core_functions.write_hive_partitioned_parquet(df, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
    print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")
    core_functions.write_hive_partitioned_parquet(df, n90_bucket_2, n90_output_prefix_2, partition_cols, n90_storage_options)
    print(f"Finished writing to {n90_bucket_2}/{n90_output_prefix_2}")
    # new cell
    # partition_cols
    # print(encodings_bvs_df_to_write.dtypes)
    # new data starts 4/9/2024 - check if mid-day or midnight 	
    #  2024-04-08 13:10:23 UTC is the first detection in the new data
    # change to veil format

    try:
        core_functions.write_hive_partitioned_parquet_s4(df, s4_bucket, s4_output_prefix, partition_cols, s4_storage_options)
        print(f"Finished writing to {s4_bucket}/{s4_output_prefix}")
    except Exception as e:
        print(f"Error writing to {s4_bucket}/{s4_output_prefix}")
        print(e)

    veil_keys = list(veil_schema_dict.keys()) + ['_YEAR', '_MONTH', '_WEEK_WITHIN_MONTH']
    
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
    core_functions.write_hive_partitioned_parquet(df[veil_keys], veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
    print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
    # os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
    # core_functions.write_hive_partitioned_parquet(df_2, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
    # print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")


    # new cell
    # partition_cols
    # print(encodings_bvs_df_to_write.dtypes)

    completed_months.append(process_month_group)
    print(f"Completed {process_month_group} with {len(df)} rows")
    print('------------------------------------------')
    print('------------------------------------------')
    del df
    del needed_encodings_df
    gc.collect()

    
print('All months completed: ', completed_months)
    



In [None]:
# veil_keys = list(veil_schema_dict.keys()) + ['_YEAR', '_MONTH', '_WEEK_WITHIN_MONTH']
veil_keys
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
core_functions.write_hive_partitioned_parquet(df[veil_keys], veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")

In [None]:
encodings_df

In [None]:
encoding_ids
cloned_encodings = encodings_df.loc[encodings_df['clone_of'].isin(encoding_ids)].copy()
cloned_encoding_ids = cloned_encodings['clone_of'].unique().tolist()
cloned_encoding_clone_df = cloned_encodings[['encoding_id', 'clone_of']].copy()

In [None]:
cloned_encoding_clone_df.sort_values(by=['clone_of', 'encoding_id'], inplace=True)
cloned_encoding_clone_df.rename(columns={'encoding_id': 'clone_to_encoding_id'}, inplace=True)
cloned_encoding_clone_df.rename(columns={'clone_of': 'encoding_id'}, inplace=True)


In [None]:
cloned_encoding_clone_df

In [None]:
cloned_encoding_ids
cloned_detections_df = detections_df.loc[detections_df['encoding_id'].isin(cloned_encoding_ids)].copy()
cloned_detections_df['occurrence_id'] = (cloned_detections_df['occurrence_id'] + .1).astype('Float64')
cloned_detections_df = cloned_detections_df.merge(cloned_encoding_clone_df, how='left', left_on='encoding_id', right_on='encoding_id')
cloned_detections_df['encoding_id'] = cloned_detections_df['clone_to_encoding_id']

In [None]:
cloned_detections_df

In [None]:

# cloned_detections_df.drop(columns=['clone_to_encoding_id'], inplace=True)
cloned_detections_df

In [None]:
cloned_detections_df = cloned_detections_df.drop(columns=['clone_to_encoding_id'])
cloned_detections_df

In [None]:
ddf = cloned_detections_df.copy()
ddf['week'] = pd.to_datetime(ddf['date_time']).dt.tz_localize(None).dt.to_period('W').dt.start_time.dt.date.astype('string')
ddf['week_within_month'] = (
    (ddf['date_time'].dt.day - 1) // 7 + 1
)
ddf

In [None]:
len(encoding_ids)

In [None]:
len(needed_encodings_df['encoding_id'].unique())

In [None]:
broadcast_cal_df.dtypes

In [None]:
# d2_df = detections_df.merge(needed_encodings_df, how='left', left_on='encoding_id', right_on='encoding_id', suffixes=('', '_encoding'))


In [None]:
# # d2_df.loc[d2_df['sfdc_account_id'].isnull()]
# encodings_df = needed_encodings_df.copy()

In [None]:
importlib.reload(core_functions)

    #



In [None]:
print(df.columns)

In [None]:
importlib.reload(core_functions)




# # Filter the rows where the airing datetime falls within the week range
# result_df = merged_df[
#     (merged_df['date_time'] >= merged_df['bcw_start_date']) &
#     (merged_df['date_time'] < merged_df['bcw_end_date'])
# ]
# # .drop(columns=['week_start', 'week_end'])
# len(result_df)


In [None]:
for col in df.columns:
    print(f"'{col}': '{df[col].dtype}', {df[col][0]}")

In [None]:
core_functions.print_dataframe_parquet_schema(df, 'detections')

In [None]:






# df.loc[df['is_barter'].isnull(), 'is_barter'] = False
# df['length_in_seconds'] = df['length_in_seconds'].fillna(0).astype(str)

# df_clean = core_functions.enforce_schema(df, n90_schema_dict)


In [None]:
core_functions.print_dataframe_python_schema(df, 'detections')

In [None]:
df.head()


In [None]:
df_clean = core_functions.enforce_schema(df, n90_schema)

# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
# core_functions.write_hive_partitioned_parquet(df_1, veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
# print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
core_functions.write_hive_partitioned_parquet(df_clean, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

# new cell
partition_cols
# print(encodings_bvs_df_to_write.dtypes)
# new data starts 4/9/2024 - check if mid-day or midnight 	
#  2024-04-08 13:10:23 UTC is the first detection in the new data

# change to veil format
veil_keys = list(veil_schema.keys())

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
core_functions.write_hive_partitioned_parquet(df_clean[veil_keys], veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
# core_functions.write_hive_partitioned_parquet(df_2, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
# print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

# new cell
partition_cols
# print(encodings_bvs_df_to_write.dtypes)

del df_clean
del needed_encodings_df
gc.collect()
    


In [None]:

date_list = [['2022-01-01', '2022-02-01'], ['2022-02-01', '2022-03-01'], ['2022-03-01', '2022-04-01'], ['2022-04-01', '2022-05-01'], ['2022-05-01', '2022-06-01'], ['2022-06-01', '2022-07-01'], ['2022-07-01', '2022-08-01'], ['2022-08-01', '2022-09-01'], ['2022-09-01', '2022-10-01'], ['2022-10-01', '2022-11-01'], ['2022-11-01', '2022-12-01'], ['2022-12-01', '2023-01-01'], ['2023-01-01', '2023-02-01'], ['2023-02-01', '2023-03-01'], ['2023-03-01', '2023-04-01'], ['2023-04-01', '2023-05-01'], ['2023-05-01', '2023-06-01'], ['2023-06-01', '2023-07-01'], ['2023-07-01', '2023-08-01'], ['2023-08-01', '2023-09-01'], ['2023-09-01', '2023-10-01'], ['2023-10-01', '2023-11-01'], ['2023-11-01', '2023-12-01'], ['2023-12-01', '2024-01-01'], ['2024-01-01', '2024-02-01'], ['2024-02-01', '2024-03-01']]


for dates in date_list:
    new_min = dates[0]
    new_max = dates[1]
    
    archive_sql = f"""
    select occurrence_id , detection_encoding_id as encoding_id, broadcaster_id, detection_timestamp as date_time, cost, tv_show_id, origin, group_occurrence_id, last_updated, last_audit_id, 

    affiliate, callsign,  dma_id
    from `adhoc-billing.avs_billing_process.billing_records_archive`
    where detection_timestamp >= '{new_min}' and detection_timestamp < '{new_max}'
    and clone_of is null
    """

    archive_df = core_functions.fetch_gbq_data(query=archive_sql, bigquery_client=bigquery_client)

    # archive_df['occurrence_id'] = archive_df['occurrence_id'].astype(int)
    archive_df['group_occurrence_id'] = archive_df['encoding_id'].astype(int)
    archive_df.dtypes
    detections_df = archive_df.copy()
    del archive_df
    gc.collect()
    process_detections(detections_df, encodings_df)


In [None]:
for dates in date_list:
    new_min = dates[0]
    new_max = dates[1]
    
    archive_sql = f"""
    select occurrence_id , detection_encoding_id as encoding_id, broadcaster_id, detection_timestamp as date_time, cost, tv_show_id, origin, group_occurrence_id, last_updated, last_audit_id, 

    affiliate, callsign,  dma_id
    from `adhoc-billing.avs_billing_process.billing_records_archive`
    where detection_timestamp >= '{new_min}' and detection_timestamp < '{new_max}'
    and clone_of is null
    """

    archive_df = core_functions.fetch_gbq_data(query=archive_sql, bigquery_client=bigquery_client)

    # archive_df['occurrence_id'] = archive_df['occurrence_id'].astype(int)
    archive_df['group_occurrence_id'] = archive_df['encoding_id'].astype(int)
    archive_df.dtypes
    detections_df = archive_df.copy()
    del archive_df
    gc.collect()
    process_detections(detections_df, encodings_df)


In [None]:
from datetime import datetime
from dateutil.relativedelta import relativedelta

def generate_overlapping_date_sets(start_date, num_sets):
    """
    Generate overlapping date sets.

    Args:
        start_date (str): Start date in 'YYYY-MM-DD' format.
        num_sets (int): Number of overlapping date sets to generate.

    Returns:
        list: List of date sets in the format [['YYYY-MM-DD', 'YYYY-MM-DD'], ...].
    """
    start = datetime.strptime(start_date, "%Y-%m-%d")
    date_sets = []

    for i in range(num_sets):
        end = start + relativedelta(months=1)
        date_sets.append([start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")])
        start = end  # Update start to be the end of the previous set

    return date_sets

start_date = "2022-01-01"
num_sets = 30  # Number of overlapping date sets to generate
date_sets = generate_overlapping_date_sets(start_date, num_sets)

print(date_sets)

In [None]:
# detections_df2 = detections_df.merge(master_channel_df, how='left', left_on='broadcaster_id', right_on='broadcaster_id')
# # detections_df2['affiliate'].isna()
# detections_df2


In [None]:

detections_df.dtypes

detections_df['date_time'] = pd.to_datetime(detections_df['date_time'], utc=True)
detections_df['year'] = detections_df['date_time'].dt.year
detections_df['month'] = detections_df['date_time'].dt.month
detections_df['day'] = detections_df['date_time'].dt.day
detections_df['tv_show_id'] = detections_df['tv_show_id'].fillna(-10) 
detections_df['group_occurrence_id'] = detections_df['group_occurrence_id'].fillna(-6)
# dmas_df['dma_rank'].replace('', 0).fillna(0, inplace=True)

detections_df['occurrence_id'] = detections_df['occurrence_id'].astype(float)
# # detections_df = detections_df.merge(master_channels_df, how='left', left_on='broadcaster_id', right_on='broadcaster_id')
# dmas_df.loc[mask, 'neustar_id'] = 808080
# dmas_df['neustar_id'] = dmas_df['neustar_id'].fillna(808080).astype(int)

detections_df = detections_df.merge(dmas_df, how='left', left_on='dma_id', right_on='dma_id')

shows_df['show_id'] = shows_df['show_id'].fillna(-5)
# mask = ((dmas_df['dma_id'].isin(unknown_dma_overrides)) )
detections_df = detections_df.merge(shows_df, how='left', left_on='tv_show_id', right_on='show_id')


geo_df['geo_location'] = geo_df['geo_location'].astype(int)
detections_df = detections_df.merge(geo_df, how='left', left_on='neustar_id', right_on='geo_location')
gc.collect()

needed_encodings = detections_df['encoding_id'].unique().tolist()
encodings_df.sort_values(by=['encoding_id', 'billing_last_updated'], ascending=[True, False], inplace=True)
encodings_df = encodings_df.drop_duplicates(subset=['encoding_id'], keep='first')
needed_encodings_df = encodings_df[encodings_df['encoding_id'].isin(needed_encodings)].copy()
detections_df = detections_df.merge(needed_encodings_df, how='left', left_on='encoding_id', right_on='encoding_id', suffixes=('', '_encoding'))

len(needed_encodings_df)
detections_df = detections_df.sort_values(by=['occurrence_id', 'year', 'month', 'day'])
billing_last_updated = pd.Timestamp.utcnow()
detections_df['billing_det_last_updated'] = billing_last_updated
detections_df['billing_det_last_updated'] = pd.to_datetime(detections_df['billing_det_last_updated'], utc=True)
billing_last_audit_id = core_functions.generate_uuid()
detections_df['billing_det_last_updated'] = billing_last_audit_id
detections_df.head()

detections_df_clean = core_functions.enforce_schema(detections_df, n90_schema)

# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
# core_functions.write_hive_partitioned_parquet(df_1, veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
# print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
core_functions.write_hive_partitioned_parquet(detections_df_clean, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

# new cell
partition_cols
# print(encodings_bvs_df_to_write.dtypes)
# new data starts 4/9/2024 - check if mid-day or midnight 	
#  2024-04-08 13:10:23 UTC is the first detection in the new data

# change to veil format
veil_keys = list(veil_schema.keys())

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_ADHOC_BILLING')
core_functions.write_hive_partitioned_parquet(detections_df_clean[veil_keys], veil_billing_bucket, veil_output_prefix, partition_cols, veil_storage_options)
print(f"Finished writing to {veil_billing_bucket}/{veil_output_prefix}")
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('SA_N90_CORE_APPS')
# core_functions.write_hive_partitioned_parquet(df_2, n90_bucket, n90_output_prefix, partition_cols, n90_storage_options)
# print(f"Finished writing to {n90_bucket}/{n90_output_prefix}")

# new cell
partition_cols
# print(encodings_bvs_df_to_write.dtypes)

del detections_df_clean
del needed_encodings_df
gc.collect()


In [None]:
# month_list = generate_month_list(min_date, max_date)
# month_list

# start_date = f'{month_list[0]}-01'
# end_date = f'{month_list[1]}-01'
# detections_sql = f"""
#     with cleanMasterChannels AS (
#     SELECT channel_id, affiliate, callsign, monitored_channels, dma_id, safe_cast(broadcaster_id as int64) as broadcaster_id 
#     FROM `{veil_billing.get('avs_project_id')}.{veil_billing.get('mongo_dataset_id')}.master_channels`

#     )
#     select * from 
#     `{veil_billing.get('avs_project_id')}.{veil_billing.get('avs_dataset_id')}.detections` d
#     left join 
#     cleanMasterChannels mc
#     using(broadcaster_id)
#     WHERE date_time >= '{start_date}' AND date_time < '{end_date}'
#     """
# detections_df = core_functions.fetch_gbq_data(query=detections_sql, bigquery_client=bigquery_client)






In [None]:

print("GOOGLE_APPLICATION_CREDENTIALS:", os.environ['GOOGLE_APPLICATION_CREDENTIALS'])