In [None]:
from typing import Dict, Union
import sys
import os 
from pathlib import Path
import requests
import socket
import pandas as pd
import webbrowser
from requests_oauthlib import OAuth2Session
import gi
import tempfile
try:
    gi.require_version('NM', '1.0')
    from gi.repository import GLib, NM
except ValueError:
    print("NM not available")
    NM = None

In [None]:
sys.path.append(str(Path().absolute().parent))

In [None]:
from eduvpn.oauth2 import one_request, get_open_port
from eduvpn.crypto import gen_code_challenge, gen_code_verifier, common_name_from_cert

# Settings

In [None]:
# client settings
DISCO_URI = 'https://disco.eduvpn.org/v2/'
ORGANISATION_URI = DISCO_URI + "organization_list.json"
SERVER_URI = DISCO_URI + "server_list.json"
client_id = "org.eduvpn.app.linux"
scope = ["config"]
code_challenge_method = "S256"

In [None]:
# our configuration
organisation = 'SURFnet bv'
server = 'Demo'
institute = 'Demo'
LANGUAGE = 'nl'
COUNTRY = "nl-NL"

# Utils

In [None]:
def extract_translation(d: Union[str, Dict[str, str]]):
    if type(d) != dict:
        return d
    for m in [COUNTRY, LANGUAGE, 'en-US', 'en']:
        try:
            return d[m]
        except KeyError:
            continue
    return list(d.values())[0]  # otherwise just return first in list

# server list

In [None]:
servers_response = requests.get(SERVER_URI)
server_list = pd.DataFrame(servers_response.json()['server_list'])
server_list['display_name'] = server_list['display_name'].apply(extract_translation)
server_list

# institute list

In [None]:
institute_list = server_list[server_list['server_type'] == 'institute_access'].drop(['server_type'], axis=1)
institute_list

# organisation list

In [None]:
organisation_response = requests.get(ORGANISATION_URI)
organization_list = pd.DataFrame(organisation_response.json()['organization_list'])
organization_list['display_name'] = organization_list['display_name'].apply(extract_translation)
organization_list['keyword_list'] = organization_list['keyword_list'].apply(extract_translation)
organization_list

# Secure internet

In [None]:
secure_internet_list = server_list[server_list['server_type'] == 'secure_internet'].drop(['server_type'], axis=1)
secure_internet_list

# make selection

In [None]:
institute_info = institute_list[institute_list['display_name'] == institute]
institute_info

## or in case you select an organisation

In [None]:
organisation_info = organization_list[organization_list['display_name'] == organisation]
organisation_info

In [None]:
info_base_url = institute_info['base_url'].iloc[0]

In [None]:
info_url = info_base_url + 'info.json'
info = requests.get(info_url).json()['api']['http://eduvpn.org/api#2']

In [None]:
api_base_uri = info['api_base_uri']
token_endpoint = info['token_endpoint']
authorization_endpoint = info['authorization_endpoint']

In [None]:
port = get_open_port()
redirect_uri = f'http://127.0.0.1:{port}/callback'

In [None]:
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, auto_refresh_url=token_endpoint, scope=scope)

In [None]:
code_verifier = gen_code_verifier()
code_challenge = gen_code_challenge(code_verifier)
authorization_url, state = oauth.authorization_url(url=authorization_endpoint,
                                                   code_challenge_method=code_challenge_method,
                                                   code_challenge=code_challenge)

In [None]:
webbrowser.open(authorization_url)
response = one_request(port, lets_connect=False)

In [None]:
code = response['code'][0]
assert(state == response['state'][0])

In [None]:
token = oauth.fetch_token(token_url=token_endpoint,
                          code=code,
                          code_verifier=code_verifier,
                          client_id=oauth.client_id,
                          include_client_id=True,
                          )

# profile list

In [None]:
profile_list_response = oauth.get(api_base_uri + '/profile_list')

In [None]:
profile_list_response.json()['profile_list']['data']

In [None]:
response = oauth.post(api_base_uri + '/create_keypair')
keypair = response.json()['create_keypair']['data']
private_key = keypair['private_key']
certificate = keypair['certificate']
common_name = common_name_from_cert(certificate.encode('ascii'))

# profile config

In [None]:
profile_id = 'internet'
response = oauth.get(api_base_uri + f'/profile_config?profile_id={profile_id}')
config = response.text

# check_certificate

In [None]:
response = oauth.get(api_base_uri + f'/check_certificate?common_name={common_name}')
assert(response.json()['check_certificate']['data']['is_valid'])

# system_messages

In [None]:
response = oauth.get(api_base_uri + '/system_messages')

In [None]:
response.json()['system_messages']['data']

# write networkmanager config

In [None]:
tmp = tempfile.NamedTemporaryFile(mode='w+t')
tmp.writelines(config)
tmp.seek(0)
filename = tmp.name

In [None]:
connection = None
for vpn_info in NM.VpnPluginInfo.list_load():
    print("TRY:  plugin %s" % (vpn_info.get_filename()))
    try:
        vpn_plugin = vpn_info.load_editor_plugin()
    except Exception as e:
        print("SKIP: cannot load plugin: %s" % (e))
        continue
    try:
        connection = vpn_plugin.import_(filename)
    except Exception as e:
        print("SKIP: failure to import %s" % (e))
        continue
    break

if connection is None:
    print("None of the VPN plugins was able to import \"%s\"" % (filename))
    sys.exit(1)

In [None]:
connection.normalize()

print("connection imported from \"%s\" using plugin \"%s\" (\"%s\", %s)" % (filename, vpn_info.get_filename(), connection.get_id(), connection.get_uuid()))

client = NM.Client.new(None)

main_loop = GLib.MainLoop()

def added_cb(client, result, data):
    try:
        client.add_connection_finish(result)
        print("The connection profile has been successfully added to NetworkManager.")
    except Exception as e:
        print("ERROR: failed to add connection: %s\n" % e)
    main_loop.quit()

client.add_connection_async(connection, True, None, added_cb, None)

main_loop.run()