In [9]:
from pydantic import BaseModel, Field, HttpUrl
from typing import List
import configparser

# Define Pydantic models for each config section
class YouTrackConfig(BaseModel):
    Token: str
    BaseUrl: str

class VsemRabotaConfig(BaseModel):
    ProjectId: str
    WBTeam: List[str] = Field(default_factory=list)
    Name: str

# Main configuration model that includes both sections
class AppConfig(BaseModel):
    YouTrack: YouTrackConfig
    VsemRabota: VsemRabotaConfig

# Parse the INI file
def parse_config_to_model(config_file: str) -> AppConfig:
    config = configparser.ConfigParser()
    config.read(config_file)
    
    # YouTrack section
    youtrack_token = config.get('YouTrack', 'Token').strip("'")
    youtrack_base_url = config.get('YouTrack', 'BaseUrl')
    
    # VsemRabota Config section
    project_id = config.get('VsemRabota Config', 'ProjectId')
    name = config.get('VsemRabota Config', 'Name')
    wb_team = config.get('VsemRabota Config', 'WBTeam').split(', ')
    
    # Create instances of the models
    youtrack_config = YouTrackConfig(Token=youtrack_token, BaseUrl=youtrack_base_url)
    vsem_rabota_config = VsemRabotaConfig(ProjectId=project_id, WBTeam=wb_team, Name=name)
    
    # Aggregate into the main config model
    app_config = AppConfig(YouTrack=youtrack_config, VsemRabota=vsem_rabota_config)
    
    return app_config

In [10]:
config_file = 'yt.ini'  # Update this path
config_model = parse_config_to_model(config_file)
print(config_model)

YouTrack=YouTrackConfig(Token='perm:VmFsZW50aW5fTWFyY2h1aw==.NjktMzE1.IgVAwvLdgn8H1dpNCX2FOS9T0Yhdpn', BaseUrl='https://youtrack.wildberries.ru') VsemRabota=VsemRabotaConfig(ProjectId='77-167', WBTeam=['Valentin_Marchuk', 'Liventsev_sergei'], Name='VsemRabota')


In [None]:
import requests

headers = {
    'Authorization': f'Bearer {config_model.YouTrack.Token}',
    'Content-Type': 'application/json',
}

search_query = 'Business Line:HR   State: Resolved sort by: {Threat Score} desc'

response = requests.get(f'{config_model.YouTrack.Token}/api/issues?query={search_query}', headers=headers)

In [70]:
import requests
from urllib.parse import urlencode

class YouTrackAPIError(Exception):
    """Exception raised for errors in the YouTrack API."""
    def __init__(self, status_code, message):
        super().__init__(f"HTTP {status_code}: {message}")
        self.status_code = status_code
        self.message = message

class YouTrackClient:
    def __init__(self, config):
        self.config = config
        self.headers = {
            'Authorization': f'Bearer {self.config.Token}',
            'Content-Type': 'application/json',
        }

    #     headers = {
    #     'Authorization': f'Bearer {api_token}',
    #     'Content-Type': 'application/json',
    # }

    def get(self, endpoint: str, params: dict = None) -> dict:
        """Sends a GET request to a specified endpoint with optional query parameters."""
        url = f"{self.config.BaseUrl}{endpoint}"
        if params:
            url += f"?{urlencode(params)}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            raise YouTrackAPIError(response.status_code, response.text) from e

    def post(self, endpoint: str, data: dict = None) -> dict:
        """Sends a POST request to a specified endpoint with optional JSON data."""
        url = f"{self.config.BaseUrl}{endpoint}"
        try:
            response = requests.post(url, json=data, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as e:
            raise YouTrackAPIError(response.status_code, response.text) from e

    def get_issues(self, search_query: str, additional_params: dict = None) -> dict:
        """
        Retrieves issues from YouTrack based on a search query and additional parameters.

        :param search_query: A string representing the search query for filtering issues.
        :param additional_params: Optional dictionary of additional query parameters.
        :return: A dictionary containing the API response with issues.
        """
        params = {'query': search_query}
        if additional_params:
            params.update(additional_params)
        
        return self.get('/api/issues', params=params)

    def get_issue(self, issue_id, fields=None):
        issue_fields = [
            'id', 'idReadable', 'summary', 'description', 'reporter(id,login,fullName)',
            'customFields($type,id,projectCustomField($type,id,field($type,id,name)),value($type,avatarUrl,buildLink,color(id),fullName,id,isResolved,localizedName,login,minutes,name,presentation,text))',
            'linkType(name,sourceToTarget,targetToSource)',
            'issues(id,idReadable,summary)'
        ]
        
        if fields is None:
            fields = 'id,idReadable,summary,description,customFields(id,projectCustomField(field(name))),linkType(name,sourceToTarget,targetToSource),issues(id,idReadable,summary)'
            fields = ','.join(issue_fields)

        params = {
            'fields': fields
        }

        return self.get(f'/api/issues/{issue_id}', params=params)

    def get_projects(self, additional_params: dict = None):
        leader_fields = ['login', 'name', 'id']
        createdby_fields = ['login', 'name', 'id']
        
        params = {'fields': 'id,name,shortName,createdBy(login,name,id),leader(login,name,id),key'}
        if additional_params:
            params.update(additional_params)
        
        return self.get('/api/admin/projects', params=params)
    

In [71]:
# youtrack_config = YouTrackConfig(Token='your_token_here', BaseUrl='https://your-instance.myjetbrains.com/youtrack')
youtrack_client = YouTrackClient(config=config_model.YouTrack)
        

In [None]:
resp = youtrack_client.get_issue('VR-2934')

# resp['customFields']
resp

In [88]:
def get_customfield_value(customfield, issue):
    return next(r['value'] for r in issue['customFields'] if r['projectCustomField']['field']['name'] == customfield)

def get_issue_authority(issue):
    assignee = get_customfield_value('Assignee', issue)
    reviewer = get_customfield_value('Reviewer [HR]', issue)
    product = get_customfield_value('Product', issue)
    reporter = resp['reporter']['fullName']

    return {
        'assignee': assignee['fullName'] if assignee else None,
        'reviewer': reviewer['fullName'] if reviewer else None,
        'products': [p['name'] for p in product],
        'reporter': reporter
    }

In [None]:
import pandas as pd

file_path = r'C:\Users\MGroup\Downloads\VULN tokens.csv'
df = pd.read_csv(file_path)

issues = list(dict.fromkeys(df['Ссылка на youtrack'].values.tolist()))
issues = [ i.replace('https://youtrack.wildberries.ru/issue/', '') for i in issues]
issues

In [None]:
df

In [None]:
issues_authority = []
for id in issues:
    issue = youtrack_client.get_issue(id)
    if issue is None:
        print(f'Issue {id} was not found.')
        continue
    issue_authority = get_issue_authority(issue)
    issues_authority.append({
         'issue_id': id,
        **issue_authority
    })


issues_authority

In [95]:
df['issue_id'] = df['Ссылка на youtrack'].str.extract(r'issue/(VR-\d+)')

json_df = pd.DataFrame(issues_authority)
merged_df = pd.merge(df, json_df, on="issue_id", how="left")

merged_df.to_csv('vulns.csv')

In [104]:
merged_df['products'].apply(assign_team)

0        WB Team
1        WB Team
2        WB Team
3        WB Team
4       ATS Team
         ...    
72      ATS Team
73    Empty Team
74    Empty Team
75    Empty Team
76    Empty Team
Name: products, Length: 77, dtype: object

In [None]:

# Define a function to determine the team based on the product
def assign_team(products):
    if any(product in ['admin', 'vsemrabota', 'wbteam'] for product in products):
        return 'WB Team'
    elif 'ats' in products:
        return 'ATS Team'
    else:
        return 'Empty Team'

# Apply the function to create a new 'Team' column
merged_df['Team'] = merged_df['products'].apply(assign_team)

# Group by 'Team' and then by 'issue_id', and collect tokens
grouped_data = merged_df.groupby(['Team', 'issue_id'])['Секрет'].apply(list)

output_format = {}
for (team, issue_id), tokens in grouped_data.items():
    if team not in output_format:
        output_format[team] = []
    output_format[team].append((issue_id, tokens))

# Print in the desired format
for team, issues in output_format.items():
    print(f"{team}:")
    for issue_id, tokens in issues:
        print(f"{issue_id}")
        for token in tokens:
            print(f"- {token}")
    print("\n")

In [108]:
merged_df.to_csv('vulns.csv')

In [20]:
issues = youtrack_client.get('/api/issues', params={'query': 'Business Line:HR   State: Resolved sort by: {Threat Score} desc'})
print(issues)

[{'id': '92-715620', '$type': 'Issue'}, {'id': '92-730734', '$type': 'Issue'}, {'id': '92-887120', '$type': 'Issue'}, {'id': '92-732032', '$type': 'Issue'}, {'id': '92-779165', '$type': 'Issue'}, {'id': '92-860643', '$type': 'Issue'}, {'id': '92-778840', '$type': 'Issue'}, {'id': '92-690675', '$type': 'Issue'}, {'id': '92-690639', '$type': 'Issue'}, {'id': '92-690615', '$type': 'Issue'}, {'id': '92-721263', '$type': 'Issue'}, {'id': '92-708541', '$type': 'Issue'}, {'id': '92-722135', '$type': 'Issue'}, {'id': '92-584660', '$type': 'Issue'}, {'id': '92-803271', '$type': 'Issue'}, {'id': '92-803320', '$type': 'Issue'}, {'id': '92-672794', '$type': 'Issue'}, {'id': '92-686096', '$type': 'Issue'}, {'id': '92-685816', '$type': 'Issue'}, {'id': '92-729859', '$type': 'Issue'}, {'id': '92-732665', '$type': 'Issue'}, {'id': '92-730768', '$type': 'Issue'}, {'id': '92-693085', '$type': 'Issue'}, {'id': '92-730682', '$type': 'Issue'}, {'id': '92-777825', '$type': 'Issue'}, {'id': '92-1029406', '$t

In [None]:
youtrack_client.get_issues('Business Line:HR   State: Resolved sort by: {Threat Score} desc')

In [None]:
projects = youtrack_client.get_projects(
    {'$top': '100', '$skip': '300'}
)
projects