# ALCOHOL AND GMS/PMS PUBLICATION JOB

This job outputs four quarterly CSVs for the entire financial year after suppression applied 

### CONTENTS

<li>Getting data from the database</li>
<li>Testing outputs</li>
<li>Suppresssion</li>
<li>Quarterly Split</li>
<li>Spot checks on few practices</li>
<li>Addendum : Suppresssion rule 3</li>

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import substring_index,substring,count,countDistinct,col , when
import pandas as pd
import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType, LongType
from pyspark.sql import DataFrame

In [None]:
%run ./parameters

In [None]:
import time
logging_data = {"Time" : [], "Log_Type" : [], "Message" : []}
logging_df = pd.DataFrame(logging_data)

def log_info(report_msg : str) -> str:
  
    """Gets the current time. 
    Parameters: report_msg - A string about the report message
    Returns: now - The current time as a string"""
    
    now = time.strftime("%d/%m/%Y %H:%M")
    logging_df.loc[len(logging_df)] = [now, "Info", report_msg]
    return now
    
def log_test(report_msg : str) -> str:
  
    """Gets the current time. 
    Parameters: report_msg - A string about the report message
    Returns: now - The current time as a string"""
    
    now = time.strftime("%d/%m/%Y %H:%M")
    logging_df.loc[len(logging_df)] = [now, "Test", report_msg]
    return now

log_info("report start = " + report_start)
log_info("report end = " + report_end)
log_info("service year = " + service_year)

In [None]:
dbutils.widgets.removeAll()

In [None]:
#create widget for data report time
dbutils.widgets.text("report_start", defaultValue= report_start , label="Reporting Period Start Date")
dbutils.widgets.text("report_end", defaultValue= report_end, label="Reporting Period End Date")
dbutils.widgets.text("service_year", defaultValue= service_year, label="Service Year")

log_info("Successfully set the widget time texts")

In [None]:
#settings
report_period_start = pd.to_datetime(report_start)
report_period_end = pd.to_datetime(report_end)
quality_service_name = f"Core GP Contract {service_year}"

### Input Checks

In [None]:
%run ./tests/input_checks

In [None]:
print(logging_df)

### Run all functions needed


In [None]:
%run ./functions/run_functions

### Data Ingestion

In [None]:
final_df = data_ingestion(print_tables=False)

### Unit Tests for Testing and Suppression

In [None]:
%run ./tests/function_test_suite

In [None]:
%run ./tests/testing_data

In [None]:
print(logging_df)

In [None]:
%run ./tests/test_suppression

In [None]:
print(logging_df)

### Suppression

In [None]:
main_table, main_supp_out = apply_suppression(final_df)

### Quartely Split

In [None]:
quartely_split(main_supp_out, split_quarters = True)

### QA TESTS

In [None]:
%run ./tests/QA_checks

In [None]:
display(logging_df)

### Spot Checks

In [None]:
#Compare spot checked Spark DataFrames before and after suppression. These tables are randomly generated by picking 4 random suppressed practice_codes.
before_suppression_test_df, after_suppresion_test_df = spot_check(main_table, main_supp_out, display_table = True)