In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
import pytest

In [None]:
# global variable
file_path = "<file_path>/word_cloud/2010 Federal STEM Education Inventory Data Set.xls" 

In [None]:
def read_file(file_path):
    # Read the input data file
    df = pd.read_excel(file_path , header = 1)
    return df


def calculate_growth_category(df):
    # Calculate the growth parameter based on funding growth
    df['Growth'] = (df['C2) Funding FY2009'] - df['C1) Funding FY2008'])/df['C1) Funding FY2008'] * 100
    
    # Assign categorical binary label based on growth 
    Tag =[]
    val = float()
    for val in df.Growth:
        Tag.append('0') if val < 0 else Tag.append('1')
    df['Target_var'] = pd.Series(Tag)
    return df


# Test cases:
# Test for presence of only set([0,1]) in set(df['Target_var'])
def test_calculate_growth_category():
    df = read_file(file_path)
    df = calculate_growth_category(df)
    set1 = {'0','1'}
    set2 = pd.Series(df['Target_var'])
    set3 = set(set2)
    assert set1 == set3
    
    
def form_training_data(df):
    df = calculate_growth_category(df)
    # Concatenating text data to form the training data
    df.fillna('', inplace=True)
    df['concatented_data'] = df[['Investment Name', 'Agency', 'Subagency', 'A) Brief Description']].apply(lambda x: ''.join(x), axis=1)
    
    # data as concatenated text in the sheet, labels as Target_var
    X = list(df['concatented_data'])
    y = list(df['Target_var'])
    y = [int(val) for val in y]
    return df, X, y


# Test cases:
# Test for X input has certain string 
def test_form_training_data():
    df = read_file(file_path)
    df,X,y = form_training_data(df)
    assert df["concatented_data"].str.contains("Polar Research and EducationNational Science FoundationOffice of Polar Programs").any() == True


# Test for y has only (0, 1)
def test_form_training_data_two(): 
    df = read_file(file_path)
    df,X,y = form_training_data(df)
    set1 = set(y)
    set2 = {0,1}
    assert set1 == set2
    
'''
TfidfVectorizer - Term Frequency (Tf)-Inverse Document Frequency (idf)
Transforms text to feature vectors that can be used as input to estimator
'''
def feature_extraction(X): 
    # Finding TFIDF Features for the text data
    vectorizer = TfidfVectorizer(stop_words='english')
    vectoriser_model = vectorizer.fit(X)
    X = vectorizer.transform(X)
    return vectoriser_model, X


# Test cases:
# Count the length of X_train, y_train to be test_size of total length
# Count the lenght of X_test, y_test to be test_size of total length 
def split_data(X, y, test_size=0.3):
    # Spliting data into train and test set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, stratify=y)
    return X_train, X_test, y_train, y_test


# Test cases:
# Count the length of X_train, X_test to be test_size of total length
def test_split_data(): 
    df = read_file(file_path)
    df, X, y = form_training_data(df)
    X_train, X_test, y_train, y_test = split_data(X, y, test_size=0.3)
    assert len(X) == (len(X_train) + len(X_test))
    
    
# Count the lenght of y_train, y_test to be test_size of total length
def test_split_data_two(): 
    df = read_file(file_path)
    df, X, y = form_training_data(df)
    X_train, X_test, y_train, y_test = split_data(X, y, test_size=0.3)
    assert len(y) == (len(y_train) + len(y_test))
    
    
def train_model(X_train, y_train):
    # fit model on training data
    model = XGBClassifier()
    trained_model = model.fit(X_train, y_train)
    return trained_model


def predict(model, X_test):
    y_pred = model.predict(X_test)
    return y_pred


# Test case:
# Test len(y_test) == len(y_pred)
def test_predict():
    df = read_file(file_path)
    model = XGBClassifier()
    trained_model = model.fit(X_train, y_train)
    y_pred = predict(model, X_test)
    assert (len(y_test) == len(y_pred))
    
    
def evaluate(y_test, y_pred):
    # evaluate predictions
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f_score = f1_score(y_test, y_pred)
    print("Accuracy: %.2f%%" % (accuracy * 100.0))
    print("Precision: %.2f%%" % (precision * 100.0))
    print("Recall: %.2f%%" % (recall * 100.0))
    print("F1_score: %.2f%%" % (f_score * 100.0))
    
    
# Test cases:
# Test that positive_label should belong to set(y_test)
# Test len(y_test) == len(y_pred)
def visualisation(y_test, y_pred, positive_label=1):
    # ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred, pos_label=positive_label)
    roc_auc = auc(fpr, tpr)

    plt.title('Receiver Operating Characteristic')
    plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
    plt.legend(loc = 'lower right')
    plt.plot([0, 1], [0, 1],'r--')
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.show()

In [None]:
# Read file and insert growth and target variable
df = read_file(file_path)
df = calculate_growth_category(df)

In [None]:
# Form training and testing dataset
df, X, y = form_training_data(df)
vectoriser_model, X = feature_extraction(X)
X_train, X_test, y_train, y_test = split_data(X, y)

In [None]:
# Train model and get prediciton on the testing data
XGB_model = train_model(X_train, y_train)
y_pred = predict(XGB_model, X_test)

In [None]:
# Evaluate and visualise
evaluate(y_test, y_pred)
visualisation(y_test, y_pred)