In [None]:
#!/user/bin/env/python3
import tasks.infrastructure.idc.common.idc_sdnb_tags as tags
from dataswarm.operators import( 
     DqOperator,
     DuplicateCheckDqOperator,
     WaitForHivePartitionsOperator,
     GlobalDefaults,)
from dataswarm.operators.data_annotations.dsl import input,output
from dataswarm.operators.data_annotations.features import (
     enable_data_annotations_features,
     get_pipeline_end_dummy_operator,)
from dataswarm_commons.operators import PrestoInsertOperatorWithSchema
from dataswarm_commons.operators.schema import Column, HiveAnon, Table
from task.infrastructure.idc.common.globaldefaults import SdnbGlobalDefaults as GD
from task.infrastructure.idc.common.globaldefaults import SdnGlobalDefaults 

__authors__=["Yang lei(yanglei@fb.com)"]

pipeline_tags= [
    tag.IDC_BIM.name,
    tags.IDC_DASHBOARD.name,
    tags.IDC_MEDIUM.name,
    tags.IDC_QA_QC.name,
    tags.IDC_DEC_GDCC.name
]

GlobalDefaults.set(
    **SdnbGlobalDefaults().all,)

GlobalDefaults.update(
     user="yanglei"
     schedule="@daily",
     tags=GD().tags + pipeline_tags,
     task_tags=GD().tags + pipeline_tags,
     auto_wait_fors=False, # set false for sub partitioned tables
     auto_dep_list =False, # set false for sub partitioned tables)
    
#-------------------------------------------#
# Dedicated waits for sub partitioned tables 
#-------------------------------------------#

wait_for_issue_custom_fields = WaitForHivePartitionsOperator(
     table='tm_idc_bim_main_issue_custom_fields',
     partitions_list=['ds=<DATEID>/continent=US','ds=<DATEID>/continent=EU'])
    
wait_for_customfield_name = WaitForHivePartitionsOperator(
     table='tm_idc_bim_main_custom_fields',
     partitions_list=['ds=<DATEID>/continent=US','ds=<DATEID>/continent=EU'])

wait_for_issue_dm = WaitForHivePartitionsOperator(
     table='tm_idc_bim_main_issue_datamart',
     partitions_list=['ds=<DATEID>/continent=US','ds=<DATEID>/continent=EU'])
    
wait_for_eqp_dm = WaitForHivePartitionsOperator(
     table='tm_idc_bim_equipment_datamart',
     partitions_list=['ds=<DATEID>/continent=US','ds=<DATEID>/continent=EU'])
    
wait_for_ng_custom_attr = WaitForHivePartitionsOperator(
     table='bim_nextgen_issues_datamart',
     partitions_list=['ds=<DATEID>'],)
    
wait_for_ng_eqp_dm = WaitForHivePartitionsOperator(
     table='bim_nextgen_equipment_datamart',
     partitions_list=['ds=<DATEID>'],)

    
wait_for_ng_issue_dm = WaitForHivePartitionsOperator(
     table='bim_nextgen_issues_datamart',
     partitions_list=['ds=<DATEID>'],)
    
wait_for_prj_cont_lkp = WaitForHivePartitionsOperator(
     table='tm_idc_bim_project_configuration_lookup',
     partitions_list=['ds=<DATEID>'],)

#-----------------------#
# 1. Insert bic_counts
#-----------------------#
    
load_bic_counts = PrestoInsertOperatorWithSchema(
     dep_list= [
     wait_for_issue_custom_fields,
     wait_for_customfield_name,
     wait_for_issue_dm,
     wait_for_eqp_dm,
     wait_for_prj_cont_lkp,
     wait_for_com_lkp,
     wait_for_ng_custom_attr,
     wait_for_ng_issue_dm,
     wait_for_ng_eqp_dm,
     ])

input_data= {
     "tm_idc_bim_main_issue_custom_fields":(
       input.table("tm_idc_bim_main_issue_custom_fields").col_ds_eq_dateid()
      ),
     "tm_idc_bim_customfields":(
       input.table("tm_idc_bim_customfields").col_ds_eq_dateid()
      ),
     "tm_idc_bim_issue_datamart":(
       input.table("tm_idc_bin_issue_datamart").col_ds_eq_dateid()
      ),
     "tm_idc_bim_equipment_datamart":(
       input.table("tm_idc_bim_equipment_datamart").col_ds_eq_latest_ds("<dateid-7>")
     ),
     "tm_idc_bim_companies_to_show_lookup":(
       input.table("tm_idc_bim_companies_to_show_lookup").col_ds_eq_latest_ds("<dateid-7>")
     ),
     "bim_nextgen_issues_custom_attributes":(
       input.table("bim_nextgen_issues_custom_attributes").col_ds_eq_latest_ds()
     ),
     "bim_nextgen_equipment_datamart":(
       input.table("bim_nextgen_equipment_datamart").col_ds_eq_dateid()
     ),     
},

output_data = {
     "idc_integrity_counts_snapshot":(
      output.table("<TABLE:idc_integrity_counts_snapshot>").col_ds_eq_dateid()
     )
},

create=Table(
     cols = [
          Column("Project_name","VARCHAR","Project Name", policy=HiveAnon.NONE),
          Column("project_id_err_count","DOUBLE","project id err count",policy=HiveAnon.NONE,),
          Column("issue_type_other_err_count","DOUBLE","issue type other error count", policy=HiveAnon.NONE,)
          Column(),
          ],
     partitions = [
          Column("ds","VARCHAR","Load date"),
          ],
     comment="Daily Snapshot data integrity error counts",
     reorder_columns=True,
     retntion = 1100,
),
    
