In [1]:
%run oeai_py

In [2]:
# CHANGE VALUES FOR YOUR KEY VAULT
keyvault = "kv-oea-oeai"  
keyvault_linked_service = "LS_KeyVault"  

# Synapse OEA environment paths
bronze_path = oeai.get_secret(spark, "wonde-bronze", keyvault_linked_service, keyvault)
school_ids_secret = oeai.get_secret(spark, "school-ids", keyvault_linked_service, keyvault)
school_ids = school_ids_secret.split(",")

# Set up date parameters
today = datetime.today()
last_year = today - timedelta(days=365)
DateFrom = last_year.strftime('%Y-%m-%d')
DateTo = today.strftime('%Y-%m-%d')

In [3]:
# initialise the audit log
audit_log = oeai.load_audit_log(spark, bronze_path + "audit_log.json")
audit_logs = []
error_log_path = bronze_path + "error_log.txt"

In [4]:
def get_school_data(token: str, school_id: str, endpoint: str, query: str, pagination_type: str = "cursor") -> dict:
    """
    Fetches data for a specific school using the Wonde API.

    Args:
        token (str): The authentication token for the Wonde API.
        school_id (str): The unique identifier of the school.
        endpoint (str): The specific endpoint of the API to query.
        query (str): Additional query parameters for the API call.
        pagination_type (str, optional): Type of pagination to use. Defaults to 'cursor'.
            Supported values: 'cursor', 'offset'.

    Returns:
        dict: A collection of data for the specified school.
    """

    # Format the endpoint URL
    endpoint = endpoint.rstrip('/') + '/'

    # Construct the base URL
    base_url = f"https://api.wonde.com/v1.0/schools/{school_id}{endpoint}"
    
    # Format the query string
    query = f"?{query.lstrip('&?')}"

    url = base_url + query
    headers = {
        "Authorization": f"Bearer {token}"
    }

    all_data = []
    next_url = url
    page = 1
    per_page_limit = 50  # Limit for number of items per page

    while next_url:
        # Adjust URL based on pagination type
        if pagination_type == "offset":
            paginated_url = f"{url}&page={page}"
        else:
            paginated_url = next_url

        response = requests.get(paginated_url, headers=headers)

        # Handle unsuccessful requests
        if response.status_code != 200:
            error_message = f"Error fetching data from {paginated_url}: {response.status_code} {traceback.format_exc()}"
            oeai.log_error(spark, error_message, error_log_path)
            break

        response_data = response.json()

        # Extract data from response
        data_from_response = response_data.get("data", [])
        
        # Append data based on its type (list or dict)
        if isinstance(data_from_response, dict):
            all_data.append(data_from_response)
        else:
            all_data.extend(data_from_response)

        # Handle pagination logic
        if pagination_type == "cursor":
            next_url = response_data.get("meta", {}).get("pagination", {}).get("next")
        elif pagination_type == "offset":
            if len(data_from_response) < per_page_limit:
                break  # End loop if fewer items than per_page_limit
            page += 1
            next_url = f"{url}&page={page}"

    return all_data


In [5]:
def load_bronze(spark, endpoint: str, subkey: str, school_id: str, token: str, pagination_type, limit=None, query=None, use_date_chunk=False, has_students_array:bool=False, audit_log_file="audit_log.json", override_date=None):
    """
    Loads data from an API into a Bronze layer, handling pagination, date chunking, and audit logging.

    Args:
        spark (SparkSession): Active SparkSession for DataFrame operations.
        endpoint (str): API endpoint to retrieve data from.
        subkey (str): Subkey for identifying the specific data.
        school_id (str): Unique identifier for the school.
        token (str): Authentication token for API access.
        pagination_type (str): Type of pagination used by the API ('cursor' or 'offset').
        limit (int, optional): Limit for the number of records to retrieve. Defaults to None.
        query (str, optional): Additional query parameters for the API call. Defaults to None.
        use_date_chunk (bool, optional): Flag to indicate if date chunking is to be used. Defaults to False.
        has_students_array (bool, optional): Flag to indicate if the data contains a students array. Defaults to False.
        audit_log_file (str, optional): Filename for the audit log. Defaults to "audit_log.json".
        override_date (str, optional): Date string in 'YYYY-MM-DD HH:MM:SS' format to override the last updated logic.
        
    Returns:
        DataFrame: A PySpark DataFrame with the loaded data.
    """
    global audit_log
    df = pd.DataFrame()
    data_list = [] 
    full_data_list = []
    last_updated_str = None

    # Calculate the duration of the API call
    start_time = datetime.now()
    now = datetime.now()
    
    if override_date:
        last_updated_time = datetime.strptime(override_date, "%Y-%m-%d %H:%M:%S")
    else:
        last_updated_str = oeai.safe_get_or_create(LastUpdated, "2018-09-01 00:00:00", school_id, subkey)
        if last_updated_str is None:
            last_updated_time = now - timedelta(weeks=2)
        elif isinstance(last_updated_str, str):
            last_updated_time = datetime.strptime(last_updated_str, "%Y-%m-%d %H:%M:%S")
        elif isinstance(last_updated_str, datetime):
            last_updated_time = last_updated_str
        else:
            last_updated_time = now - timedelta(weeks=2)

    # If last_updated_time is more than two weeks ago, chunk the requests
    if use_date_chunk and (now - last_updated_time).days > 14:
        for start_date, end_date in oeai.generate_date_chunks(last_updated_time, now, chunk_size=timedelta(weeks=2)):
            chunk_query = oeai.update_query_with_chunks(query, start_date, end_date)
            r = get_school_data(token, school_id, endpoint, chunk_query, pagination_type)

            # Check if the response is not None and not empty before processing
            if r:
                if isinstance(r, dict) and 'data' in r:
                    data_list.append(r['data'])
                elif isinstance(r, list):
                    data_list.extend(r)
            else:
                error_message = f"Empty response, not adding to data_list: {traceback.format_exc()}"
                oeai.log_error(spark, error_message, error_log_path)
    else:
        if not override_date and last_updated_str is not None:
            query += "&updated_after=" + last_updated_str
        
        r = get_school_data(token, school_id, endpoint, query, pagination_type)

        # Ensure the data is always a list
        if isinstance(r, dict) and 'data' in r:
            data_list = [r['data']]
        elif isinstance(r, list):
            data_list = r

    # Construct the directory path
    school_folder = os.path.join(bronze_path, school_id)

    # Check and create directory if it doesn't exist
    if not os.path.exists(school_folder):
        os.makedirs(school_folder)

    LastUpdated[school_id][subkey] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    json_str = json.dumps(LastUpdated)
    last_updated_df = spark.createDataFrame([LastUpdated])
    last_updated_df.repartition(1).write.mode("overwrite").json(bronze_path + 'last_run')

    if not data_list:
        oeai.save_empty_json(spark, school_folder + "/" + subkey + ".json")
    else:
        try:
            # Flatten each item in data_list
            if has_students_array:
                flattened_data_list = oeai.flatten_nested_json(json.dumps(data_list))
            else:
                flattened_data_list = [oeai.flatten_json(item) for item in data_list]

            # Convert the list of dictionaries to a Pandas DataFrame
            pandas_df = pd.DataFrame(flattened_data_list)

            # Convert the Pandas DataFrame to a PySpark DataFrame
            r_df = spark.createDataFrame(pandas_df)

            # Add school_id and unique_key to the DataFrame
            r_df = r_df.withColumn("school_id", lit(school_id))
            if "student_data_id" in r_df.columns:
                r_df = r_df.withColumn("unique_key", concat(lit(school_id),r_df["student_data_id"].cast("string"), r_df["id"].cast("string")))
            else:
                r_df = r_df.withColumn("unique_key", concat(lit(school_id), r_df["id"].cast("string")))

            # Save the DataFrame to a JSON file
            r_df.write.mode("overwrite").json(school_folder + "/" + subkey + ".json")
      
        # if the key doesn't exist, skip it    
        except Exception as e:
            error_message = f"Error: {traceback.format_exc()}"
            oeai.log_error(spark, error_message, error_log_path)
            pass

    # Update the audit log
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    duration_str = str(duration)
    audit_data = {
        "school_id": school_id,
        "endpoint": endpoint,
        "query": query,
        "start_time": start_time.strftime('%Y-%m-%d %H:%M:%S'),
        "end_time": end_time.strftime('%Y-%m-%d %H:%M:%S'),
        "duration": duration_str,
        "records_returned": str(len(data_list)),
    }
    audit_log.append(audit_data)

    return df

In [107]:
'''
  BRONZE PROCESS
'''
# introduce a limit for testing or leave as None for Live
Limit = None

# Populate the schools list with the Wonde school_id and secret
schools_list = []
for school_id in school_ids:
    secret_name = f"wonde-{school_id}"
    try:
        token = oeai.get_secret(spark, secret_name, keyvault_linked_service, keyvault)
        schools_list.append({"school_id": school_id, "token": token})
    except Exception as e:
        error_message = f"Error: {traceback.format_exc()}"
        oeai.log_error(spark, error_message, error_log_path)

# Reset LastUpdated at the beginning of your read or function block
LastUpdated = {}

# Read the JSON file using PySpark
df = spark.read.json(bronze_path + 'last_run')
rows = df.collect()
if rows:
    LastUpdated = rows[0].asDict()
else:
    # Handle the case where the JSON file might be empty or not read correctly
    LastUpdated = {}

# Convert 'LastUpdated' Row objects to dictionaries
for key, value in LastUpdated.items():
    if isinstance(value, Row):
        LastUpdated[key] = oeai.row_to_dict(value)

for school in schools_list:
    school_id = school["school_id"]
    token = school["token"]  

    daily_jobs = [
        ("", "schools", "cursor", Limit, "", False, False), 
        ("/students", "students", "cursor", Limit, "", False, False), 
        ("/students", "students_education", "cursor", Limit, "&include=education_details", False, False), 
        ("/students", "students_extended", "cursor", Limit, "&include=extended_details", False, False), 
        ("/attendance-summaries", "attendance-summaries", "cursor", Limit, "", False, False),
        #("/behaviours", "behaviours_students", "cursor", Limit, "&include=students", False, True),
        #("/achievements", "achievements_students", "offset", Limit, "&include=students", False, True),
        ]

    # call load bronze for each of the daily jobs
    for job in daily_jobs:
        #load_bronze(spark, job[0], job[1], school["school_id"], school["token"], job[2], job[3], job[4], job[5], job[6])
        # to override the lastupdated:
        load_bronze(spark, job[0], job[1], school_id, token, job[2], job[3], job[4], job[5], job[6], override_date="2024-01-01 00:00:00")


    # Save the audit log
    oeai.save_audit_log(spark, audit_log, bronze_path + "audit_log.json")
