# Tasks

> How to search for and update tasks in Copper.

In [None]:
#| default_exp tasks

In [None]:
#| hide

from nbdev.showdoc import *

In [None]:
#| export
from cu_api import core, config
from cu_api.query import Query as _Query
from cu_api.core import set_headers as _set_headers
from cu_api.search import _search_loop, get_owners, search_over_field
import pandas as pd
import pytz
from tqdm.autonotebook import tqdm

### Searching Copper Tasks

A `Query` is an object that holds all of your search parameters and tells cu_api what you want from Copper. Once you create a query and give it to cu_api via the `tasks.search(Query)` function

In [None]:
#| exporti
class Query(_Query):
    pass

def create_query(*args, **kwargs):
    """
    Create and return a new Query object
    """
    return _Query(*args, **kwargs)

#### Query info here
blah Blah Blah

In [None]:
#| exporti
def _clean_row(row_data:list, #Returned company data as list of dictionaries,
               cf_fields:list = [], # list of custom fields on companies you would like to include
):
    """Process to clean returned copper data"""

    core.prc_get_cf_fields()

    custom_fields = getattr(config,'CUSTOM_FIELDS')
    custom_fields_dict = getattr(config,'CUSTOM_FIELDS_DICT')
    

    def clean_date(date):
        try:
            if date is None:
                return None  # Handle None values (empty rows)

            if isinstance(date, int) and len(str(date)) >= 6 and len(str(date)) <= 10:
                # If it's an integer with <= 10 digits, assume it's a Unix timestamp
                ct_timezone = pytz.timezone('US/Central')
                new_date = pd.to_datetime(date, unit='s',utc=True).tz_convert(ct_timezone)
                return new_date
            else:
                return date
        except Exception as e:
            return date

    native_items = ['id', 'name', 'assignee_id','tags']

    output_dict = {}
    output_dict['related_id'] = row_data.get('related_resource').get('id',None)
    output_dict['related_type'] = row_data.get('related_resource').get('type',None)
    output_dict['due_date'] = clean_date(row_data['due_date'])
    output_dict['reminder_date'] = clean_date(row_data['reminder_date'])
    output_dict['completed_date'] = clean_date(row_data['completed_date'])

    for item in native_items:
        output_dict[item] = row_data.get(item, None)
    
    custom_field_data = row_data['custom_fields']

    for dict_item in custom_field_data:
        item_id = dict_item['custom_field_definition_id']
        item_name = custom_fields_dict.get(item_id, None)
        data_type = custom_fields[item_id].get('data_type')

        if item_name not in cf_fields and item_id not in cf_fields or item_name is None:
            continue
        elif item_name is not None and ('options' in list(custom_fields[item_id].keys())):
            item_value = dict_item['value']
            value_name = core.cf_option_name(item_id,item_value)
            output_dict[item_name] = value_name
        elif data_type == 'Date':
            cleaned_date = clean_date(dict_item['value'])
            output_dict[item_name] = cleaned_date
        else:
            item_value = dict_item['value']
            output_dict[item_name] = item_value

    return output_dict

In [None]:
#| exporti

def _clean_dataframe(df):
    list_assign_ids = list(df['assignee_id'].dropna().unique().astype(int))
    assignee_dict = get_owners(list_assign_ids)

    df['Owned By'] = df['assignee_id'].apply(lambda value: assignee_dict.get(value))
    df.drop(columns=['assignee_id'])
    return df


In [None]:
#| exporti

def set_headers(AccessToken:str, #Access Token (API Key) provided by Copper
                 UserEmail:str): #Email associated with your API key
    _set_headers(AccessToken,UserEmail)
    

In [None]:
#| export
def search(search_query,            # Instance of Query object
            clean_data:bool = True, # Whether to clean results or not
            drop:list = None,       # Columns to drop from final dataframe
            **kwargs
            )->pd.DataFrame:
    """Search for task records in Copper!
    
    This function allows you to systematically search Copper for tasks 
    that match the parameters specified in your `Query` and returns the 
    returns the data as a pandas dataframe

    Supported standard fields for search: name, 
    """
    
    if 'name' in search_query._native_fields:
        combined_results, Outputs = search_over_field('name','https://api.copper.com/developer_api/v1/tasks/search',search_query)
    else:
        combined_results, Outputs = _search_loop(search_query= search_query, url= 'https://api.copper.com/developer_api/v1/tasks/search')
    
    # Custom Fields
    if 'cf_fields' in kwargs:     cf_fields = kwargs.get('cf_fields')
    else:                         cf_fields = Outputs

    # Processing Rows:
    cleaned_rows = []
    for result in tqdm(combined_results,'Cleaning Data',total=len(combined_results)):
        cleaned_rows.append(_clean_row(result, cf_fields))
    
    # To Clean, or not to Clean
    if not clean_data:
        print('Returning raw results')
        return cleaned_rows

    cleaned_rows_df = pd.DataFrame(cleaned_rows)
    cleaned_data_df = _clean_dataframe(cleaned_rows_df)

    if isinstance(drop,list):      return cleaned_data_df.drop(columns=drop,inplace=True)
    else:                               return cleaned_data_df 

### Updating Tasks

In [None]:
# | export

def update(df: pd.DataFrame, columns: list, cf_ids: list = None):
    """
    Function to update companies in copper using a pandas DataFrame with a column of 'id' to identify
    companies and a list of the columns you would like to update.
    
    Either pass in a list of custom field ids, or a list will be created using matching column names.
    """

    if 'id' not in df.columns: raise 

    if cf_ids is None:
        cf_ids = [reverse_cf_lookup.get(col) for col in columns if col in reverse_cf_lookup]
        columns = [col for col in columns if col in reverse_cf_lookup]

    df = df[columns + ['id']]
    prepared_data = df.set_index('id').to_dict(orient='index')

    total_companies = len(df['id'])
    companies_updated = 0
    max_size = 10
    Sess = get_session(headers)

    # Initialize the tqdm progress bar
    progress_bar = tqdm(total=total_companies)

    while companies_updated < total_companies:
        current_batch_size = min(max_size, total_companies - companies_updated)
        payload = []

        # Loop to make `payload`
        for idx in range(companies_updated, companies_updated + current_batch_size):
            comp_id = int(df['id'].iloc[idx])

            cf_data_list = [{'custom_field_definition_id': cf_id, 'value': prepared_data[comp_id][col]}
                            for cf_id, col in zip(cf_ids, columns)]

            company_json = {"id": comp_id, 'custom_fields': cf_data_list}
            payload.append(company_json)

        json_data = {"companies": payload}
        request_sent = Sess.post('https://api.copper.com/developer_api/v1/companies/bulk_update', json=json_data)

        if request_sent.status_code != 200:
            print(request_sent.text)
            break
        else:
            companies_updated += current_batch_size
            progress_bar.update(current_batch_size)  # Update the progress bar

    progress_bar.close()  # Close the progress bar when done
    print('All companies updated.')

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()