In [None]:
import json
import requests
import pandas as pd

In [None]:
token = ''

In [None]:
class GITEA():
    
    def __init__(self, url, token):
        self.headers = {"Content-type": "application/json", "Authorization": "token " + token}
        self.url = url
        self.requests = requests.Session()
        self.username = self.user_get_authenticated_user().get('login')

    def get_url(self, endpoint):
        url = self.url + endpoint
        return url

    def parse_result(self, result):
        if (result.text and len(result.text) > 3):
            return json.loads(result.text)
        return {}
    
    # The API allows admin users to sudo API requests as another user.
    # Sudo: request header with the username of the user to sudo.
    def requests_get(self, endpoint, sudo=None):
        params = dict()
        if sudo:
            params["sudo"] = sudo
        request = self.requests.get(self.get_url(endpoint), headers=self.headers, params=params)
        if request.status_code == 204:
            return None
        if request.status_code not in [200, 201]:
            raise Exception(f'Received staus code: {request.status_code}, {request.text}, {endpoint}')
        return self.parse_result(request)
    
    def requests_delete(self, endpoint):
        request = self.requests.delete(self.get_url(endpoint), headers=self.headers)
        if request.status_code not in [204]:
            raise Exception(f'Received staus code: {request.status_code}, {request.text}, {endpoint}')

    def requests_post(self, endpoint, data, sudo=None):
        params = dict()
        if sudo:
            params["sudo"] = sudo
        request =  self.requests.post(self.get_url(endpoint), headers=self.headers, json=data, params=params)
        if request.status_code not in [200, 201, 202]:
            raise Exception(f'Received staus code: {request.status_code}, {request.text}, {endpoint}')
        return self.parse_result(request)
    
    # Admin:
    
    # 
    def admin_get_allusers(self):
        path = '/admin/users'
        return self.requests_get(path)
    
    #
    def admin_post_create_user(self, username, email, password):
        path = '/admin/users'
        request_data = {
            "source_id": 0,
            "login_name": username,
            "full_name": username,
            "username": username,
            "email": email,
            "password": password,
            "send_notify": True,
            "must_change_password": False}
        return self.requests_post(path, data=request_data)
    
    #
    def admin_delete_user(self, username):
        path = f'/admin/users/{username}'
        return self.requests_delete(path)
    
    # User
    
    # Create a repository
    def user_post_create_repo(self, reponame, sudo=None):
        path = f'/user/repos'
        request_data = {
            "name": reponame,
            "description": "",
            "private": False,
            "auto_init":True,
            "gitignores":None,
            "license":None,
            "issue_labels":None,
            "readme":"Default",
            "default_branch": "master"}
        return self.requests_post(path, data=request_data, sudo=sudo)
    
    #
    def user_get_authenticated_user(self):
        path = '/user'
        return self.requests_get(path)
    
    #
    def user_get_user_exist(self, username):
        path = f'/users/{username}'
        request = self.requests.get(self.get_url(path), headers=self.headers)
        
        if request.status_code != 200:
            return False
        else:
            return True
    
    #
    def user_post_token(self, username, password, token_name):
        headers = {"Authorization": "token " + token}
        url = f'http://{username}:{password}@localhost:3000/api/v1/users/{username}/tokens'
        return requests.post(url, headers=headers, data={"name": token_name}).text
    
    
    # Repo
    
    # Check if repo exist
    def repo_get_repo_exist(self, owner, reponame):
        path = f'/repos/{owner}/{reponame}'
        request = self.requests.get(self.get_url(path), headers=self.headers)
        
        if request.status_code != 200:
            return False
        else:
            return True
        
    # Create a pull request  ##TODO: add arguments title, body, etc... SUDO head
    def repo_post_create_pullrequest(self, owner, reponame, head_branch, sudo=None):
        path = f'/repos/{owner}/{reponame}/pulls'
        request_data = {
              "assignee": None,
              "assignees": [None],
              "base": "master",
              "body": None,
              "due_date": None,
              "head": f"{sudo}:{head_branch}",
              "labels": [0],
              "milestone": 0,
              "title": "Testing"}
        return self.requests_post(path, data=request_data, sudo=sudo)
    
    # 
    def repo_get_pullrequests(self, owner, reponame):
        path = f'/repos/{owner}/{reponame}/pulls'
        return self.requests_get(path)
    
    # List forks of a repo
    def repo_get_forks(self, owner, reponame):
        path = f'/repos/{owner}/{reponame}/forks'
        return self.requests_get(path)
    
    # Fork a repository
    def repo_post_fork(self, owner, reponame, sudo=None):
        path = f'/repos/{owner}/{reponame}/forks'
        request_data = {
            "organization" : None}
        return self.requests_post(path, data=request_data, sudo=sudo)
    
    # Create a branch ##TODO: when created, author is Admin.
    def repo_post_branch(self, owner, reponame, branch_name, sudo=None):
        path = f'/repos/{owner}/{reponame}/branches'
        request_data = {
            "new_branch_name" : branch_name,
            "old_branch_name" : "master"}
        return self.requests_post(path, data=request_data, sudo=sudo)
    
    # List a repository;s branches
    def repo_get_branches(self, owner, reponame):
        path = f'/repos/{owner}/{reponame}/branches'
        return self.requests_get(path)    

In [None]:
gt = GITEA('http://localhost:3000/api/v1',token)

In [None]:
def input_data(gitea, user, repo):
    # only for pull request events.
    repo_owner, repo_name = repo.split('/')
    
    def delete_substring(string, substring):
        return string.replace(substring, '')

    repo_owner = delete_substring(repo_owner, '[bot]')
    repo_name = delete_substring(repo_name, '[bot]')
    user = delete_substring(user, '[bot]')
    
    # Check if user exists. Else, create user.
    if not gitea.user_get_user_exist(user):
        gitea.admin_post_create_user(user, user+"@mail.com", 'password')
    
    # Check if repo_owener exists. Else, create user.
    if not gitea.user_get_user_exist(repo_owner):
        gitea.admin_post_create_user(repo_owner, repo_owner+"@mail.com", 'password')
    
    # Check if repo_name exists. Else, create repo.
    if not gitea.repo_get_repo_exist(repo_owner, repo_name):
        gitea.user_post_create_repo(repo_name, sudo=repo_owner)
        
    # Check if user has forked the repo.
    forks = gitea.repo_get_forks(repo_owner, repo_name)
    forks = [x.get('owner').get('login') for x in forks]
    branch_name = 'wip_'+str(len(forks))
    
    if user not in forks:
        gitea.repo_post_fork(repo_owner, repo_name, sudo=user)
        try:
            gt.repo_post_branch(user, repo_name, branch_name, sudo=user)
        except:
            branch_name = 'wip_'+str(len(gt.repo_get_branches(user, repo_name))+1)
            gt.repo_post_branch(user, repo_name, branch_name, sudo=user)
    
    try:
        gitea.repo_post_create_pullrequest(repo_owner, repo_name, branch_name, sudo=user)
    except:
        branch_name = 'wip_'+str(len(gt.repo_get_branches(user, repo_name))+2)
        gitea.repo_post_branch(user, repo_name, branch_name, sudo=user)
        gitea.repo_post_create_pullrequest(repo_owner, repo_name, branch_name, sudo=user)