Skip to content
Cannot retrieve contributors at this time
import json
import random
import re
import string
from copy import deepcopy
from datetime import timedelta
from io import StringIO
from urllib.parse import parse_qsl
import requests
from django.conf import settings
from django.contrib import auth, messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import User
from import dumpdata
from django.db import transaction
from django.db.models import Count, Q
from django.forms.models import model_to_dict
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
from my_oauth.models import Service, Token
from true_coders.models import Coder
def generate_state(size=20, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def query(request, name):
redirect_url = request.GET.get('next', None)
if redirect_url:
request.session['next'] = redirect_url
service = get_object_or_404(Service.active_objects, name=name)
args = model_to_dict(service)
args['redirect_uri'] = settings.HTTPS_HOST_ + reverse('auth:response', args=(name, ))
args['state'] = generate_state()
request.session['state'] = args['state']
url = re.sub('[\n\r]', '', service.code_uri % args)
return redirect(url)
def unlink(request, name):
coder = request.user.coder
if coder.token_set.count() < 2:
messages.error(request, 'Not enough services')
return HttpResponseRedirect(request.META.get('HTTP_REFERER'))
def process_data(request, service, access_token, data):
d = deepcopy(data)
for e in ('email', 'default_email', ):
email = d.get(e, None)
if email:
user_id = d.get(service.user_id_field, None)
if not email or not user_id:
raise Exception('Email or User ID not found.')
token, created = Token.objects.get_or_create(
token.access_token = access_token = data = email
request.session['token_id'] =
return redirect('auth:signup')
def process_access_token(request, service, response):
if response.status_code !=
raise Exception('Response status code not equal ok.')
access_token = json.loads(response.text)
except Exception:
access_token = dict(parse_qsl(response.text))
if service.data_header:
args = model_to_dict(service)
headers = json.loads(service.data_header % args)
headers = None
data = {}
for data_uri in service.data_uri.split():
response = requests.get(data_uri % access_token, headers=headers)
if response.status_code !=
raise Exception('Response status code not equal ok.')
response = json.loads(response.text)
while isinstance(response, list) or isinstance(response, dict) and len(response) == 1:
array = response if isinstance(response, list) else list(response.values())
response = array[0]
for d in array[1:]:
if not isinstance(response, dict):
if not isinstance(d, dict):
if d.get('primary') and not response.get('primary'):
response = d
return process_data(request, service, access_token, data)
def response(request, name):
service = get_object_or_404(Service.active_objects, name=name)
state = request.session.get(service.state_field, None)
if state is None or state != request.GET.get('state'):
raise KeyError('Not found state')
del request.session['state']
args = model_to_dict(service)
args['redirect_uri'] = settings.HTTPS_HOST_ + reverse('auth:response', args=(name, ))
if 'code' not in args:
raise ValueError('Not found code')
if service.token_post:
post = json.loads(service.token_post % args)
response =, data=post)
url = re.sub('[\n\r]', '', service.token_uri % args)
response = requests.get(url)
return process_access_token(request, service, response)
except Exception as e:
messages.error(request, "ERROR: {}".format(str(e).strip("'")))
return signup(request)
def login(request):
redirect_url = request.GET.get('next', 'clist:main')
if request.user.is_authenticated:
return redirect(redirect_url)
services = Service.active_objects.annotate(n_tokens=Count('token')).order_by('-n_tokens')
request.session['next'] = redirect_url
return render(
{'services': services},
def signup(request, action=None):
context = {}
token_id = request.session.pop('token_id', None)
if token_id:
token = Token.objects.get(id=token_id)
except Token.DoesNotExist:
return signup(request)
user = None
coder = token.coder
if coder:
user = coder.user
t = Token.objects.filter(, coder__isnull=False).filter(~Q(id=token_id)).first()
if t:
user = t.coder.user
token.coder = user.coder
if user and user.is_active:
user.backend = 'django.contrib.auth.backends.ModelBackend'
auth.login(request, user)
return signup(request)
if request.user.is_authenticated:
token.coder = request.user.coder
return signup(request)
request.session['token_id'] = token_id
q_token = Q(
if request.POST and 'signup' in request.POST:
username = request.POST.get('username', None)
if not username:
context['error'] = 'Username can not be empty.'
elif len(username) > 30:
context['error'] = '30 characters or fewer.'
elif not re.match(r'^[\-A-Za-z0-9_@\+\.]{1,30}$', username):
context['error'] = 'Username may contain alphanumeric, _, @, +, . and - characters.'
elif User.objects.filter(username__iexact=username).exists():
q_token = q_token | Q(coder__user__username__iexact=username)
context['error'] = 'User already exist.'
with transaction.atomic():
user = User.objects.create_user(username,
token.coder = Coder.objects.create(user=user)
return signup(request)
tokens = Token.objects.filter(q_token).filter(~Q(id=token_id))
context['tokens'] = tokens
if tokens.count():
if token.n_viewed_tokens >= settings.LIMIT_N_TOKENS_VIEW:
now =
if token.tokens_view_time is None:
token.tokens_view_time = now
if token.tokens_view_time + timedelta(hours=settings.LIMIT_TOKENS_VIEW_WAIT_IN_HOURS) < now:
token.n_viewed_tokens = 0
token.tokens_view_time = None
context['limit_tokens_view'] = True
token.n_viewed_tokens += 1
context['token'] = token
if request.user.is_authenticated:
return redirect(request.session.pop('next', 'clist:main'))
return redirect('auth:login')
return render(
def logout(request):
if request.user.is_authenticated:
return redirect("/")
def services_dumpdata(request):
out = StringIO()
'--format', 'json'
services = json.loads(out.getvalue())
for service in services:
service['fields']['secret'] = None
service['fields']['app_id'] = None
return HttpResponse(json.dumps(services), content_type="application/json")