In [0]:
#References:
## https://pyodata.readthedocs.io/en/latest/usage/querying.html
## Documentation: https://learn.microsoft.com/en-us/rest/api/azure/devops/dashboard/dashboards/list?source=recommendations&view=azure-devops-rest-7.0&tabs=HTTP
## Documentation: https://learn.microsoft.com/en-us/azure/devops/boards/queries/wiql-syntax?view=azure-devops

##API

###Import Auth Header

In [0]:
#ref https://learn.microsoft.com/pt-br/rest/api/azure/devops/?view=azure-devops-rest-7.1 < Reference doc difficult to interpret
#ref https://stackoverflow.com/questions/60341728/is-there-a-way-to-call-azure-devops-via-python-using-requests < How to make auth token work. There is a trick to add ":" at the beginning of the token before the encode
#ref https://learn.microsoft.com/en-us/azure/devops/extend/develop/work-with-urls?view=azure-devops&tabs=http < Explains a little how to get the urls of the desired objects
#ref https://abhijitjana.net/2020/04/11/exploring-azure-devops-apis/ < test the strategies in this post
## https://stackoverflow.com/questions/63021168/get-all-work-items-from-a-project-azure-devops-rest-api < Using WiQl to query workitems in project

import pandas as pd ## Json response handling
import base64  ## Decode of token autenticator 
import requests  ## Requests a API 
from pyspark.sql.types import StructType,StructField, StringType, IntegerType #Creation of schema

pat= ''  ## token 
authorization = str(base64.b64encode(bytes(':'+pat, 'ascii')), 'ascii')
headers = {'Accept': 'application/json', 'Authorization':'Basic' +authorization}



#Creating Initial Table Version

## Get Teams List

In [0]:
##developing by dependency on Board
## https://learn.microsoft.com/en-us/rest/api/azure/devops/core/teams/get-teams?view=azure-devops-rest-7.0&tabs=HTTP
organization = ""
project = ""
projectId = ""
  
SERVICE_URL = f"https://dev.azure.com/{organization}/_apis/projects/{projectId}/teams?api-version=7.0"
print (SERVICE_URL)
session = requests.Session() 
 
params =""  
team_response = session.get(SERVICE_URL,headers=headers ,params=params)

print (team_response.status_code)
team_response= team_response.json()


In [0]:
team_data = pd.json_normalize(data = team_response['value'], # datadict or list of dicts
                            record_path = None, #record_path str or list of str, default None 
                            meta =[]) # metalist of paths (str or list of str), default None]

team_data = team_data.rename(columns={'id':'team_id'})
df_spark_team = spark.createDataFrame(team_data)
df_spark_team.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("")


In [0]:
##start on 2021-07-14 > pick from item 44 of the list onwards  full_date_tuple_list[45:192] [('2021-07-10', '2021-07-15') ...  ('2023-04-05', '2023-04-10')]
full_date_tuple_list = []
last_date = '2021-01-01'
for year in range(2021,2024):
  for month in range(1,13,1):
    for day in [1,5,10,15,20,25,28]:
      print(f'{year}-{str(month).zfill(2)}-{str(day).zfill(2)}')
      current_date = str(f'{year}-{str(month).zfill(2)}-{str(day).zfill(2)}')
      full_date_tuple_list.append((last_date , current_date) )    
      last_date = current_date

In [0]:
## loop to see if all teams start on the same day
## This code generates 2416 duplicates for each ID

## Strategy 2 > Update day
     ## Need to handle exception if there are more workitems than the API supports
     ## Need to have control of dates being executed recorded so as not to lose information

from itertools import chain

schema = StructType([ \
    StructField("id",StringType(),True), \
    StructField("url",StringType(),True)])

headers = {'Content-Type': 'application/json', 'Authorization':'Basic' +authorization}

list_reponses_teams_workItems = [] 

first_run = True

for team_id in team_data['id']:
  for data_inicio , data_fim in full_date_tuple_list[45:192]:
    data_wit_list =  { "query": f"Select [System.Id], [System.Title], [System.State] From WorkItems where [System.ChangedDate] >= '{data_inicio}' and [System.ChangedDate] < '{data_fim}' order by  [System.CreatedDate] asc   " }   
    response_wit_list = session.post(f'https://dev.azure.com/-/--organization--/{team_id}/_apis/wit/wiql?api-version=7.0',headers=headers ,json=data_wit_list)
    print (response_wit_list.status_code)
    response_wit_list = response_wit_list.json()
    list_reponses_teams_workItems.append( (team_id, team_data[team_data['id']==team_id]['name'], data_inicio , data_fim ))
    list_reponses_teams_workItems.append(response_wit_list)

  work_item_list = [] 
  for i in range(1,len(list_reponses_teams_workItems),2):
    work_item_list.append(list_reponses_teams_workItems[i]['workItems'])
  work_item_list_filtered = [x for x in work_item_list if x != []]
  work_item_list_flattened = list(chain(*work_item_list_filtered))
  # Create DF based on list 
  df_tabela =spark.createDataFrame(data=work_item_list_flattened ,schema =schema )
  ## save to table 
  if first_run:
    df_tabela.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("")
    first_run = not first_run
  else:
    df_tabela.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("")
  

# Create Populated Workitem Table

In [0]:
## get list of ids 
df_spark_min_max_id = spark.sql(f'SELECT Max(id) as max_id , min(id) as min_id  FROM ')
df_min_max_id = df_spark_min_max_id.toPandas()
#df_min_max_id

incremento = 100 ## Theoretically it would be 200 ids per request, but I chose half to not get the process stuck in the request
id_start = 104 # to test
id_end =  df_min_max_id['max_id'].values[0]
id_curr = id_start
id_next = int(id_start) + int(incremento) 

dict_colunas_workitem = {
'id':'workitem_id',
'rev':'rev',
'url':'url',
'fields.System.IterationPath':'IterationPath',
'fields.System.WorkItemType':'type',
'fields.System.State':'state',
'fields.System.Reason':'reason',
'fields.System.AssignedTo.id':'AssignedTo_id',
'fields.System.AssignedTo.uniqueName':'AssignedTo_uniqueName',
'fields.System.CreatedDate':'CreatedDate',
'fields.System.CreatedBy.id':'CreatedBy_id',
'fields.System.CreatedBy.uniqueName':'CreatedBy_uniqueName',
'fields.System.ChangedDate':'ChangedDate',
'fields.System.ChangedBy.id':'ChangedBy_id',
'fields.System.ChangedBy.uniqueName':'ChangedBy_uniqueName',
'fields.System.CommentCount':'CommentCount',
'fields.System.Title':'Title',
'fields.Microsoft.VSTS.Common.StateChangeDate':'StateChangeDate',
'fields.Microsoft.VSTS.Common.ActivatedDate':'ActivatedDate',
'fields.Microsoft.VSTS.Common.ActivatedBy.id':'ActivatedBy_id',
'fields.Microsoft.VSTS.Common.ActivatedBy.uniqueName':'ActivatedBy_uniqueName',
'fields.Microsoft.VSTS.Common.ClosedDate':'ClosedDate',
'fields.Microsoft.VSTS.Common.ClosedBy.id':'ClosedBy_id',
'fields.Microsoft.VSTS.Common.ClosedBy.uniqueName':'ClosedBy_uniqueName',
'fields.Microsoft.VSTS.Common.Priority':'Priority',
'fields.Microsoft.VSTS.TCM.AutomationStatus':'AutomationStatus',
'fields.Custom.Environment':'Environment',
'fields.Custom.TestType':'TestType',
'fields.Custom.AutomatedLayer':'AutomatedLayer'}

column_list_target_table = ['workitem_id', 'rev', 'url', 'IterationPath', 'type', 'state', 'reason', 'AssignedTo_id', 'AssignedTo_uniqueName', 'CreatedDate', 'CreatedBy_id', 'CreatedBy_uniqueName', 'ChangedDate', 'ChangedBy_id', 'ChangedBy_uniqueName', 'CommentCount', 'Title', 'StateChangeDate', 'ActivatedDate', 'ActivatedBy_id', 'ActivatedBy_uniqueName', 'ClosedDate', 'ClosedBy_id', 'ClosedBy_uniqueName','TestType', 'Environment', 'AutomationStatus', 'AutomatedLayer', 'Priority']


for work_item_id_list in range(int(id_start),int(id_end),int(incremento)):
  df_spark_lista_id = spark.sql(f'SELECT id  FROM database.tablehere where id between {id_curr} and {id_next} order by id ')
  df_lista_id = df_spark_lista_id.toPandas()
  id_list = df_lista_id['id']
  id_list = id_list.values.tolist()
  id_list_string =  ",".join([x  for x in id_list] )  
  #id_list_string
  params_wit = { } 
  session = requests.Session()
  response_wit_id = session.get(f'https://dev.azure.com///_apis/wit/workitems?ids={id_list_string}&api-version=7.0',headers=headers ,params=params_wit)
  print (response_wit_id.status_code)
  response_wit_id = response_wit_id.json()
  df_workitem_json_response = pd.DataFrame.from_dict(response_wit_id)
  workitem_data = pd.json_normalize(data = response_wit_id['value'], # datadict or list of dicts
                            record_path = None, #record_path str or list of str, default None 
                            meta =['System.TeamProject', 'System.WorkItemType']) # metalist of paths (str or list of str), default None
##Select desired columns and rename 
  df_workitem = workitem_data.copy()
#rename columns
  df_workitem = df_workitem.rename(columns=dict_colunas_workitem) #= df_workitem [['']]

##Validate which columns exist
  column_list_current = df_workitem.columns.values.tolist()
  column_list_insert = [x for x in column_list_current if x in column_list_target_table] ##put in column list and insert only existing columns in dataframe and destination table
  #select columns 
  df_workitem= df_workitem[column_list_insert]
## Transform to SparkDataframe 
  df_spark_workitem = spark.createDataFrame(df_workitem)
  df_spark_workitem.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("")
  

In [0]:
df_spark_workitem.display()