In [4]:
import requests
import pandas as pd
import base64
import gzip
from io import BytesIO
import io
import shutil
from datetime import datetime, timedelta, date
import time
import os
import logging

CD2_client_id = os.environ['CD2_client_id']
CD2_client_secret = os.environ['CD2_client_secret']
CD2_base_url = os.environ['CD2_base_url']



KeyError: 'CD2_client_id'

In [2]:
def les_access_token(logger):
    # Hent access_token
    requesturl = "https://api-gateway.instructure.com/ids/auth/login"
    payload = {'grant_type': 'client_credentials'}
    r = requests.request(
        "POST",
        requesturl,
        data=payload,
        auth=(CD2_client_id, CD2_client_secret)
    )
    if r.status_code == 200:
        respons = r.json()
        access_token = respons['access_token']
        logger.info(f"Henta access_token OK: {access_token}")
        return access_token
    else:
        logger.error(f"Klarte ikkje å skaffe access_token, feil {r.status_code}")
        return None

In [3]:
def lag_logger(log_namn):
    # opprett ein logger
    logger = logging.getLogger('my_logger')
    logger.setLevel(logging.DEBUG)  # Sett ønska loggnivå

    # Opprett formatter
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # Opprett filhandler for å logge til fil (ein loggfil kvar dag)
    file_handler = logging.FileHandler(log_namn)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # Opprett konsollhandler for å logge til konsollen
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(formatter)

    # Legg til handlerne i loggeren
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger

In [4]:
def hent_CD2_filar(innfil, token, svar, logger):
    try:
        requesturl = f"{CD2_base_url}/dap/object/url"
        payload = f"{svar['objects']}"
        payload = payload.replace('\'', '\"')
        headers = {'x-instauth': token, 'Content-Type': 'text/plain'}
        logger.info(f"Request: {requesturl} {payload}")
        respons = requests.request("POST", requesturl, headers=headers, data=payload)
        logger.info(f"Response: {respons.status_code} {respons.reason}")
        respons.raise_for_status()
        fil = respons.json()
        logger.info(f"Objects: {fil}")
        url = fil['urls'][innfil]['url']
        logger.info(f"URL: {url}")
        data = requests.request("GET", url)
        logger.info(f"Response: {data.status_code} {data.reason}")
        buffer = io.BytesIO(data.content)
        logger.info(f"Buffer: {buffer}")
        with gzip.GzipFile(fileobj=buffer, mode='rb') as utpakka_fil:
            utpakka_data = utpakka_fil.read().decode("utf-8", errors='ignore')
            # logger.info(f"Data: {utpakka_data}")
        return utpakka_data
    except requests.exceptions.RequestException as exc:
        raise exc

In [5]:
def les_CD2_tabell(token, tabell, logger):
    headers = {'x-instauth': token, 'Content-Type': 'text/plain'}
    # sist_oppdatert = akv_finn_sist_oppdatert(tabell)
    payload = '{"format": "csv"}' # % (sist_oppdatert)
    requesturl = f"{CD2_base_url}/dap/query/canvas/table/{tabell}/data"
    print(f"Sender søk til {requesturl}")
    try:
        r = requests.request("POST", requesturl, headers=headers, data=payload)
        r.raise_for_status()
        respons = r.json()
        id = respons['id']
        vent = True
        while vent:
            requesturl2 = f"{CD2_base_url}/dap//job/{id}"
            r2 = requests.request("GET", requesturl2, headers=headers)
            time.sleep(5)
            respons2 = r2.json()
            print(respons2)
            if respons2['status'] == "complete":
                vent = False
                filar = respons2['objects']
        dr_liste = []
        print(filar)
        for fil in filar:
            data = io.StringIO(akv_hent_CD2_filar(fil['id'], token, respons2))
            df = pd.read_csv(data, sep=",")
            dr_liste.append(df)
        alledata = pd.concat(df for df in dr_liste if not df.empty)
        return alledata, sist_oppdatert, respons2['until']
    except requests.exceptions.RequestException as exc:
        raise exc

In [6]:

def akv_finn_sist_oppdatert(tabell):
    """
    Return the latest update time for the given table from the akv_sist_oppdatert table.
    """
    conn_str = os.environ["Connection_SQL"] 
    try:
        with pyodbc.connect(conn_str) as connection:
            cursor = connection.cursor()
            print(connection)
            query = """
            SELECT [sist_oppdatert] FROM [dbo].[akv_sist_oppdatert]
            WHERE [tabell] = ?
            """
            cursor.execute(query, (tabell,))
            row = cursor.fetchone()
            print(row)
            if row:
                print("Har henta frå Azure")
                if tabell == "web_logs":
                    return (datetime.now() - timedelta(days=1)).isoformat() + "Z"
                else:
                    return row[0].isoformat() + "Z"
            else:
                print("Har ikkje henta frå Azure")
                if tabell == "web_logs":
                    return (datetime.now() - timedelta(days=1)).isoformat() + "Z"
                else:
                    return (date.today() - timedelta(days=1)).isoformat() + "Z"
    except pyodbc.Error as exc:
        print("Har ikkje henta frå Azure")
        if tabell == "web_logs":
            return (datetime.now() - timedelta(days=1)).isoformat() + "Z"
        else:
            return (datetime.today() - timedelta(days=1)).isoformat() + "Z"

In [9]:
tabell = "courses"
logger = lag_logger(f'loggfil-{tabell}.log')
token = les_access_token(logger)
# data = les_CD2_tabell(token, tabell, logger)
headers = {'x-instauth': token, 'Content-Type': 'text/plain'}
sist_oppdatert = akv_finn_sist_oppdatert(tabell)
sist_oppdatert = "2025-01-01T01:00:00Z"


2025-06-18 10:37:48,883 - my_logger - INFO - Henta access_token OK: eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpoeWRyYS5qd3QuYWNjZXNzLXRva2VuIiwidHlwIjoiSldUIn0.eyJhdWQiOltdLCJjbGllbnRfaWQiOiJldS13ZXN0LTEjMTUwNWNjNGUtZjlhYS00MWQ1LWEzNjQtMjE0Njk5ZWJlNjZiIiwiZXhwIjoxNzUwMjM5NDY4LCJleHQiOnt9LCJpYXQiOjE3NTAyMzU4NjgsImlzcyI6Imh0dHBzOi8vaXNzLWV1LXdlc3QtMS5pZGVudGl0eS5pbnN0cnVjdHVyZS5jb20vIiwianRpIjoiMTlkMjg0YzMtNzYwMi00YTlmLWJkOGItMzQxZWRmZWFiMjQwIiwibmJmIjoxNzUwMjM1ODY4LCJzY3AiOlsiZGFwIiwicGFydG5lcjpJTlNUSVRVVElPTkFMX1BBUlRORVIiLCJyZWdpb246ZXUtd2VzdC0xIiwicHJpbmNpcGFsOktqWVhGbFd5cUlQMzA2ak1Mck94T0U2UVNVMVM4b0Y1bTV0ZmJYeWQiXSwic3ViIjoiZXUtd2VzdC0xIzE1MDVjYzRlLWY5YWEtNDFkNS1hMzY0LTIxNDY5OWViZTY2YiJ9.V6CxaSVmdF7vGOAJph8URVy_4O3KDlovJOOfDoUulPp2d_5LG_dMWLELViWj2XRnPQ6AjCSo_40IMcMg7Uq9SXSkK80pu_nIpiSbbwF_GgEjF5eNuxpubeqYBFzSXNkFXMssPhdom5GgX5a1B7ActVsHPUSEJ4xb5I3-7w98oHTyTraIcPDyQxHSH4dO8dkMFk3Tsqdf9ctAN0Yj_hmVPZf8b5RdUsOu_A7FuuuNd6mgu9S6QGs97Rlq0HBJvLBg2Mjw_WHRTsHhyvIlb-YWCAifLnMlFgSkZVTzgoqeJIhtL3o

Har ikkje henta frå Azure


In [8]:
payload = '{"format": "csv", "since": \"%s\"}' % (sist_oppdatert)
requesturl = f"{CD2_base_url}/dap/query/canvas/table/{tabell}/data"
print(f"Sender søk til {requesturl}")
# try:
r = requests.request("POST", requesturl, headers=headers, data=payload)
r.raise_for_status()
respons = r.json()
id = respons['id']
vent = True
while vent:
    requesturl2 = f"{CD2_base_url}/dap//job/{id}"
    r2 = requests.request("GET", requesturl2, headers=headers)
    time.sleep(5)
    respons2 = r2.json()
    print(respons2)
    if respons2['status'] == "complete":
        vent = False
        filar = respons2['objects']
dr_liste = []
print(filar)


Sender søk til https://api-gateway.instructure.com/dap/query/canvas/table/courses/data


HTTPError: 400 Client Error: Bad Request for url: https://api-gateway.instructure.com/dap/query/canvas/table/courses/data

In [None]:
for fil in filar:
    logger.info(f"Henter fil {fil['id']}")
    data = io.StringIO(hent_CD2_filar(fil['id'], token, respons2, logger))
    df = pd.read_csv(data, sep=",")
    dr_liste.append(df)
alledata = pd.concat(df for df in dr_liste if not df.empty)

In [None]:
alledata.info()

#  Lagre data til fil
Den følgjande koden vil bli endra frå gang til gang; den bruker eg for å ta ut dei data som er interessante i kvart tilfelle.

In [24]:
alledata[['key.id', 'value.enrollment_term_id', ]].to_csv("courses.csv", index=False)