The code below is to be run FIRST.

It filters the OpenPayments yearly payment datasets and using list of US-registered physician authors from our included trials and retrieves all the payments that were made to them (from all sources). These authors published spinal cord stimulation (SCS) clinical trial results in a paper that met our search criteria and who are a US-registered physician and therfore included in the OpenPayments datasets. Any non-US based physicians or non-physician authors have been manually filtered out prior to running this code.

In [None]:
from cs103 import *
import csv
from typing import List
from typing import Optional
import time
import pandas as pd
from pandas import DataFrame


@typecheck
def main(open_payments_files: List[str], our_data_filename: str, new_file_names: List[str])-> None:
    """
    Takes in a list of data file of yearly OpenPayments data, a list of physicians of interest, and a name for the 
    the target output files. Outputs the filtered list of payment data for the given years
    that correspond to the physicians in the given list of interest.    
    """
    
    # starting time
    t = time.process_time()
    
    for o, n in zip(open_payments_files, new_file_names):
        pull_and_write(o, our_data_filename, n)
    
    # end time
    elapsed_time = time.process_time() - t
    
    print ('The time the program took to run was: '+ str(elapsed_time/60) + ' minutes')
    
    
@typecheck 
def pull_and_write(yearly_data_filename: str, our_data_filename: str, new_file_name: str) -> None:
    """
    Takes in a data file of yearly payment data, a list of physicians of interest, and a name for the 
    target output file. Outputs the filtered list of payment data for the given year that corresponds to the
    physicians in the given list of interest.
    
    If there was an invalid physician ID in one the lists, that file will not be written
    
    """
   
    all_id_list= filter_to_phys_id(our_data_filename)
    our_id_list= filter_to_listed_phys(all_id_list)
    our_payments= pull_our_id_payments(yearly_data_filename, our_id_list)
   
    if our_id_list[-1] == 'ERROR':
        print('An error occured')
    
    else:
         write_to_csv(our_payments, new_file_name, yearly_data_filename)
    
    ## Uncomment to check for list length and content
    #return len(our_id_list)
    #return our_id_list


@typecheck
def check_if_not_empty(s: str) -> bool:
    """
    Takes in a string, and returns True if the string is not empty.
    """
    
    
    return len(s) != 0



@typecheck
def check_for_no_spaces(s: str) -> bool:
    """
    Takes in a string, and returns False if the string starts or ends with a space.
    Else, returns True.
    
    Cannot take in an empty string.
    """
    
   
    return (s[0] != " " and s[-1] != " ")
     
    
    
@typecheck
def is_valid_phys_id(s:str) -> bool:
    """
    Takes in a string representing a physician ID, and returns True if the ID value is valid.
    This means it is not:  empty, starting with a space, or ending with a space.
    """
    
    
    return (check_if_not_empty(s) and check_for_no_spaces(s))



@typecheck
def filter_to_listed_phys(los: List[str]) -> List[str]:
    """
    Takes in a list of authors, and filters to only those who have an OpenPayments physician ID
    """
    
    
    filtered_phys = []
    
    for s in los:
        if s != '0':
            filtered_phys.append(s)
            
    return filtered_phys
            
        

@typecheck
def is_not_in_list(s: str, los: List[str]) -> bool:
    """
    Returns True if the string (s) is not in the list of strings (los)
    """
    

    
    return s not in los



def filter_to_phys_id(filename: str) -> List[str]:
    """
    Takes in a file of physicians, and returns a list of strings that contains the unique physician ID's for
    all of the authors in the list. Does not duplicate if in list more than once.
    
    If a Phys ID in the list is not valid, empties list and returns the invalid ID only
    """
    
    phys_id_list=[]
    
    with open(filename, newline='') as f:
        reader = csv.reader(f)
        next(reader)
                
        for row in reader:
            if is_valid_phys_id(row[1]) and is_not_in_list(row[1], phys_id_list) == True:
                phys_id_list.append(row[1])
                
            elif is_valid_phys_id(row[1]) == False:
                phys_id_list.append('ERROR')
                return phys_id_list
            
        return phys_id_list



def pull_our_id_payments(data_set: str, los: List[str]) -> List[str]:
    """
    Takes in a list of phys ID's and a yearly data set, and returns a list of all of the corresponding rows 
    from the data set that correspond to those physician ID's.
    
    """
    
    our_filtered_list = []


    with open(data_set, newline='') as infh:
        reader = csv.reader(infh)
        for row in reader:
            if row[5] in los:
                our_filtered_list.append(row)
                
            
        return our_filtered_list



def get_headers(file_name: str) -> List[str]:
    """
    Takes in a csv file, and produces the header of that file as a list of strings
    """

    with open(file_name) as csvFile:
        reader = csv.reader(csvFile)
        field_names_list = next(reader)
    
    return field_names_list


    
def write_to_csv(filtered_list: List[str], new_file_name: str, data_file_name: str) -> None:
    """
    Takes in a list of strings, and outputs a CSV file with the given name for the output file,
    consisting of the data of the list of strings and the header of the data file.
    
    """
    
    fields = get_headers(data_file_name)
    rows = filtered_list
    with open(new_file_name, 'w', newline='') as f:
    
        writer = csv.writer(f)
      
        writer.writerow(fields)
        writer.writerows(rows)



@typecheck
def merge_files(list_of_filenames: List[str]) -> None:
    """
    Takes in a list of filenames, and merges them to make one file
    """
    
    #data_list contains the data from the files seen so far.
    data_list = []
    
    for filename in list_of_filenames:
        for row in filename:
            data_list.append(row)
    
    return data_list



@typecheck
def pull_columns_of_interest(data_filename: str, new_filename: str) -> DataFrame:
    """
    Takes in a data filename, pulls the columns of interest as a Panda's DataFrame for that
    filename, and writes those to a new csv file.
    
    The columns of interest are defined as : 'Physician_Profile_ID', 'Physician_Middle_Name',
    'Physician_Last_Name', 'Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name',
    'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID','Total_Amount_of_Payment_USDollars',
    'Date_of_Payment', 'Number_of_Payments_Included_in_Total_Amount', and 'Program_Year'.
    """
    

    df = pd.read_csv(data_filename, usecols = ['Physician_Profile_ID',
                                                                        'Physician_First_Name', 
                                                                        'Physician_Middle_Name',
                                                                        'Physician_Last_Name',
                                                                        'Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name',
                                                                        'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID',
                                                                        'Total_Amount_of_Payment_USDollars',
                                                                        'Date_of_Payment',
                                                                        'Number_of_Payments_Included_in_Total_Amount',
                                                                        'Program_Year'],
                          dtype={'Physician_Profile_ID' : str,
                                    'Physician_First_Name' : str, 
                                    'Physician_Middle_Name' : str,
                                    'Physician_Last_Name' : str,
                                    'Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name' : str,
                                    'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID' : str,
                                    'Total_Amount_of_Payment_USDollars' : float,
                                    'Date_of_Payment' : str,
                                    'Number_of_Payments_Included_in_Total_Amount' : int,
                                    'Program_Year' : int},              

                          keep_default_na=False,
                         parse_dates=['Date_of_Payment'])
    
    
    return df



start_testing()



expect(check_for_no_spaces(' true'), False)
expect(check_for_no_spaces(' tree '), False)
expect(check_for_no_spaces('test '), False)
expect(check_for_no_spaces('apple'), True)
expect(check_for_no_spaces('test'), True)


expect(is_valid_phys_id('1'), True)
expect(is_valid_phys_id('99999'), True)
expect(is_valid_phys_id('13 '), False)
expect(is_valid_phys_id(' 2221'), False)
expect(is_valid_phys_id(' '), False)
expect(is_valid_phys_id(''), False)  
expect(is_valid_phys_id('0'), True) 


expect(filter_to_listed_phys([]), [])
expect(filter_to_listed_phys(['12']), ['12'])
expect(filter_to_listed_phys(['0']), [])
expect(filter_to_listed_phys(['13', '12', '81729']), ['13', '12', '81729'])
expect(filter_to_listed_phys(['13', '12', '81729', '0']), ['13', '12', '81729'])


expect(check_if_not_empty(''), False)
expect(check_if_not_empty('hello'), True)
expect(check_if_not_empty(' '), True)
expect(check_if_not_empty(' test'), True)
expect(check_if_not_empty('cat '), True)


expect(get_headers('Tests/COI_data_APR_13_2021_PHYSID_Filled-early_space.csv'), ['Author_name','Physician_Profile_ID','USA_MD','Article_title','Study_PMID','Publication_year','Country','University_Hospital_Affiliation'])
expect(get_headers('Tests/empty_header_test.csv'), [])
expect(get_headers('Tests/one_header_test.csv'), ['one'])


expect(is_not_in_list('1', []), True)
expect(is_not_in_list('1', ['1']), False)
expect(is_not_in_list('1', ['2']), True)
expect(is_not_in_list('2', ['2']), False)
expect(is_not_in_list('2', ['80']), True)
expect(is_not_in_list('1', ['1','2','3']), False)
expect(is_not_in_list('50', ['50','2','50','50']), False)
expect(is_not_in_list('1', ['2','3']), True)
expect(is_not_in_list('901', ['77','1','2','30','40']), True)



summary()