# API lab
This is a notebook to experiment before making new additions and improvements to the packages and modules in this project

In [15]:
import json
import logging
import pdb
from datetime import datetime

import requests

from typing import List, Optional
from pydantic import BaseModel, HttpUrl, ValidationError, validator

from app.schemas.zoho_app import ZohoSelfClientApp as ZohoSelfClient
from app.api import deps

In [11]:
logger = logging.getLogger('dev')
logger.setLevel(logging.INFO)

In [None]:
"""
Zoho Ticket properties
    accountId
    approvalCount: int
    assignee: str # assignee id
    attachmentCount
    category
    cf
    channel
    channelCode
    channelRelatedInfo
    classification
    closedTime
    commentCount
    contactId
    createdTime
    customFields
    customerResponseTime
    departmentId
    description
    dueDate
    email
    entitySkills
    id
    isArchived
    isDeleted
    isResponseOverdue
    isSpam
    isTrashed
    language
    layoutDetails
    layoutId
    modifiedBy
    modifiedTime
    onholdTime
    phone
    priority
    productId
    resolution
    responseDueDate
    secondaryContacts
    sharedDepartments
    slaId
    source
    status
    statusType
    subCategory
    subject
    taskCount
    teamId
    threadCount
    ticketNumber
    timeEntryCount
    webUrl
"""

In [16]:
zoho_urls = deps.get_zoho_desk_api().urls

In [17]:
def get_zoho_oauth_tokens(zoho_self_client: ZohoSelfClient, oauth_url: str = zoho_urls["oauth"])-> dict: 
    payload = {
        "code": zoho_self_client.code,
        "grant_type": zoho_self_client.grant_types["authorization_code"],
        "client_id": zoho_self_client.client_id,
        "client_secret": zoho_self_client.client_secret,
        "redirect_uri": zoho_self_client.redirect_uri,
    }
    response = requests.post(oauth_url, params=payload)
    try:
        if response.status_code == 200 and not response.json().get("error"):
            logging.info("Successfully retrieved oauth tokens")
            tokens = response.json()
            zoho_self_client.access_token = tokens["access_token"]
            zoho_self_client.refresh_token = tokens["refresh_token"]
            tokens["status"] = True
        else:
            logging.error("Failed to retrieve oauth tokens")
            tokens = dict()
            tokens["status"] = False
    except (ValueError, KeyError) as e:
        pass # there's no token
    return tokens

In [18]:
def refresh_zoho_oauth_tokens(zoho_self_client: ZohoSelfClient, oauth_url: str = zoho_urls["oauth"])-> dict: 
    if not self_client.refresh_token:
        tokens = get_zoho_oauth_tokens(zoho_self_client)
        if not tokens["status"]:
            return tokens
    else:
        payload = {
            "refresh_token": zoho_self_client.refresh_token,
            "scope": zoho_self_client.scope,
            "grant_type": zoho_self_client.grant_types["refresh_token"],
            "client_id": zoho_self_client.client_id,
            "client_secret": zoho_self_client.client_secret,
            "redirect_uri": zoho_self_client.redirect_uri,
        }
        response = requests.post(oauth_url, params=payload)
        logging.warning(response.url)
        try:
            if response.status_code == 200 and not response.json().get("error"):
                tokens = response.json()
                zoho_self_client.access_token = tokens["access_token"]
                tokens["status"] = True
            else:
                tokens = dict()
                tokens["status"] = False
        except ValueError:
            pass # there's no token
        return tokens

In [None]:
tokens = refresh_zoho_oauth_tokens(zoho_self_client)
zoho_self_client.dict()

## Get departments

In [19]:
def get_departments(
        zoho_self_client: ZohoSelfClient, 
        params: dict = {}, 
        base_url: str = zoho_urls["base_url"],
    ) -> dict:
    headers = {"Authorization": f"Zoho-oauthtoken {zoho_self_client.access_token}"}
#     params = {**params, 'isEnabled': True, "chatStatus": "AVAILABLE"}
    response = requests.get(f"{base_url}/departments", headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
    else:
        data = {"error": True}
    return data

In [20]:
def get_department(
        zoho_self_client: ZohoSelfClient, 
        department_id: int,
        params: dict = {}, 
        base_url: str = zoho_urls["base_url"],
    ) -> dict:
    headers = {"Authorization": f"Zoho-oauthtoken {zoho_self_client.access_token}"}
#     params = {**params, 'isEnabled': True, "chatStatus": "AVAILABLE"}
    response = requests.get(f"{base_url}/departments/{department_id}", headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
    else:
        data = {"error": True}
    return data

In [None]:
departments = get_department(zoho_self_client, 286477000000006907)
departments

In [None]:
!curl -X GET "https://desk.zoho.com/api/v1/departments"\
  -H "Authorization:Zoho-oauthtoken 1000.047275292bd89ec13144ad7454eb201b.26600ff92420957e56ff23c0d7a765de"

## Get all tickets

In [None]:
!curl -X GET https://desk.zoho.com/api/v1/tickets?include=contacts,assignee,departments,team,isRead\
  -H "Authorization:Zoho-oauthtoken 1000.62364743ad3344f15fe51a8988eaef42.abd218be429b372f12bad73af6e5b8ac"

In [21]:
def get_all_tickets(zoho_self_client: ZohoSelfClient, params: dict = {}, base_url: str = zoho_urls["base_url"],) -> dict:    
    headers = {"Authorization": f"Zoho-oauthtoken {zoho_self_client.access_token}"}
    response = requests.get(f"{base_url}/tickets", headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
    else:
        data = {"error": True}
    return data

In [None]:
def create_ticket(zoho_self_client: ZohoSelfClient, payload:dict, base_url: str = zoho_urls["base_url"],) -> dict:    
    headers = {"Authorization": f"Zoho-oauthtoken {zoho_self_client.access_token}"}
    response = requests.post(f"{base_url}/tickets", headers=headers, data=json.dumps(payload))
    if response.status_code == 201:
        data = response.json()
    else:
        data = {"error": True}
    return data, response

## Datetime validation experiment (Work in progress)

In [8]:
# 

"""explanation
\b                # Assert position at a word boundary
(                 # Match the regular expression below and capture its match into backreference number 1
   0              # Match the character “0” literally
      *           # Between zero and unlimited times, as many times as possible, giving back as needed (greedy)
   (?:            # Match the regular expression below
                  # Match either the regular expression below (attempting the next alternative only if this one fails)
         [1-9]    # Match a single character in the range between “1” and “9”
         [0-9]    # Match a single character in the range between “0” and “9”
            ?     # Between zero and one times, as many times as possible, giving back as needed (greedy)
      |           # Or match regular expression number 2 below (the entire group fails if this one fails to match)
         100      # Match the characters “100” literally
   )
)
\b                # Assert position at a word boundary
"""
import re
# year_regex = r"\b(0*(?:[1-9][0-9]?|100))\b"
year_regex = r"\b(0*(?:[2][0-9][0-9][0-9]-?))"
month_regex = r"(0*(?:[0][0-9]?|[1][1-2]-?))\b"
# month_regex = r"(0*(?:[0][0-9]|[1][1-2])?))"
regex = re.compile(f"{year_regex}{month_regex}")
# regex = r"\b"
sample_date = "2021-01-12T22:29:12.338Z"
re.search(regex, "2025-13")
# regex.match("2025-06")

In [None]:
from datetime import datetime, timezone, timedelta
print(datetime.now(timezone.utc))
resolution_sla_duration = timedelta(hours=1)
print(datetime.now(timezone.utc) + resolution_sla_duration)

# datetime.date.today()

# API Sandbox Testing

In [1]:
import json

from app.api.deps import get_zoho_desk_api
from app.schemas import Ticket, ZohoTicket
zoho_desk = get_zoho_desk_api()

## Test get Zoho tickets

In [None]:
zoho_desk.refresh_oauth_tokens()
tickets = zoho_desk.get_tickets()
[ZohoTicket(**ticket) for ticket in tickets] 

## Test create Zoho ticket

In [9]:
null = None # simulate JS null value
sample_ticket_data = {
  "subject": "Test automated ticket generation # 5",
  "departmentId": 286477000000006907, # default
  "contact": {
      "email": "cdare@andrew.cmu.edu"
  },
  "email": "cdare@andrew.cmu.edu",
  "phone": "+250786745117",
  "description": "automated ticket generated",
  "priority": "1",
  "language": "English", # default
  "channel": "Phone"
}
sample_ticket = Ticket(**sample_ticket_data)
sample_ticket.dict()

{'subject': 'Test automated ticket generation # 5',
 'departmentId': 286477000000006907,
 'contact': {'email': 'cdare@andrew.cmu.edu'},
 'email': 'cdare@andrew.cmu.edu',
 'phone': '+250786745117',
 'description': 'automated ticket generated',
 'status': None,
 'assigneeId': None,
 'category': None,
 'subCategory': None,
 'resolution': None,
 'priority': '1',
 'language': 'English',
 'channel': 'Phone',
 'classification': None,
 'cf': None,
 'webUrl': None,
 'teamId': None,
 'secondaryContacts': None}

In [7]:
zoho_desk.create_ticket(sample_ticket)



ZohoTicket(subject='Test automated ticket generation # 5', departmentId=286477000000006907, contact=None, email='cdare@andrew.cmu.edu', phone='+250786745117', description='automated ticket generated', status='Open', assigneeId=None, category=None, subCategory=None, resolution=None, priority='1', language='English', channel='Phone', classification=None, cf={}, webUrl=HttpUrl('https://desk.zoho.com/support/serenityhealth/ShowHomePage.do#Cases/dv/286477000001067265', scheme='https', host='desk.zoho.com', tld='com', host_type='domain', path='/support/serenityhealth/ShowHomePage.do#Cases/dv/286477000001067265'), teamId=None, secondaryContacts=[], id=286477000001067265, contactId='286477000001060001', productId=None, uploads=None, dueDate=None, responseDueDate=None)