In [21]:
import time
import numpy as np
import pandas as pd
import random
import copy, sys
from sklearn.neighbors import KNeighborsRegressor
from sklearn.neighbors import KNeighborsClassifier
from sklearn import linear_model
from sklearn import ensemble
from sklearn.metrics import roc_auc_score
from sklearn import preprocessing

from scipy import stats
from numpy import linalg as LA
import math

import re
import sys
from pyspark import SparkConf, SparkContext

conf = SparkConf()
sc = SparkContext(conf=conf)

import torch

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-cae8f1f5cd76>:22 

In [22]:
#Function to load the data file to memeory.

#Input: File path to read.
#Output: A 2d numpy array with all loaded samples from the file to read in string.

def parseFile_raw(file):
    time_start = time.time()

    content = []
    count, count_incomplete,count_complete, count_part = 0, 0, 0, 0
    
    with open(file) as txtfile:
        for row in txtfile:
            
            row = row.split(',')
            row[-1] = row[-1].strip()
            #if count != 0:
            content.append([row[21]] + row[0:4] + [row[22]] + [row[32]] + row[24: 26] + [row[29]] + [row[6]] \
                           + [row[-5]] + [row[-4]] + [row[-3]] + [row[-2]] + [row[12].strip("'")])

            count += 1
            #if count == 1000:
                #break

    content_mat = np.array(content)

    time_end = time.time()
    print('Reading data is complete! Running time is ' + str(time_end - time_start) + 's!')

    return content_mat

In [23]:
#Function to load the data file to memeory, this is for the simulation hash data.

#Input: File path to read.
#Output: A 2d numpy array with all loaded samples from the file to read in string.

def parseFile_reference(file):
    time_start = time.time()

    content = []
    count, count_incomplete,count_complete, count_part = 0, 0, 0, 0
    
    with open(file) as txtfile:
        for row in txtfile:
            row = row.split(',')
            row[-1] = row[-1].strip().strip(']').strip('\n')
            row[0] = row[0][1:]
            row[0] = row[0].strip("'")
    
            content.append(row)

    reference_mat = np.array(content)

    time_end = time.time()
    print('Reading data is complete! Running time is ' + str(time_end - time_start) + 's!')

    return reference_mat

In [24]:
def parseFile_indi(file):

    with open(file, 'r') as csvfile:
        indi_list = []
        for line in csvfile:
            indi_list.append(line.strip().replace('-', ' ').split(','))

    indicator_array = np.array(indi_list)
    
    return indicator_array

def parseFile_hpi(file):

    with open(file, 'r') as csvfile:
        hpi_list = []
        for line in csvfile:
            hpi_list.append(line.strip().split(','))

    hpi_array = np.array(hpi_list)
    
    return hpi_array

In [25]:
#Function to filter the samples with no missing values. 
#Input: mat - 2d Numpy Array.
#Onput: mat - 2d Numpy Array with all samples that have no Missing values.

def filter_full_feature(mat):
    row_count = 0
    full_list = []
    for row in mat:
        if 'N/A' in row or 'NA' in row:
            pass
        else:
            full_list.append(row_count)

        row_count += 1
    print('There are a total of ' + str(len(full_list)) + ' samples fed into the model')
    mat = mat[full_list, :]
    return mat

#Function to split the fullset into training and test sets.
#Input: mat - 2d Numpy Array.
#Onput: train_mat: 2d Numpy Array, test_mat: 2d Numpy Array
def train_test_split(mat):
    train_list = []
    test_list = []
    num_sample, num_var = mat.shape

    for i in range(0, num_sample):
        if i == 0:
            train_list.append(i)
            test_list.append(i)
        else:
            rand = random.random()
            if rand >= 0.1:
                train_list.append(i)
            else:
                test_list.append(i)

    train_mat = mat[train_list, :]
    test_mat = mat[test_list, :]

    return train_mat, test_mat

In [26]:
#convert a probability into the coordinate of a zip code using population probability distritbuion
#prob: float between 0 and 1
#reference_array: a 2-d array contaning the coordinates of the reference zipcodes
#prob_dist: a 1-d array shows the accumulated population distribution as a percentage of the total population in the US.
def getzip(prob, reference_array, prob_dist):
    idx = np.where(prob_dist >= prob)
    idx = idx[0][0]
    coord = reference_array[idx, :]
    
    return coord, idx

#convert the index and probability from getzip() and get the gender of the simulation from a gender reference
#idx: the index returned from getzip()
#gender_ref: a 2-d Array that contains the gender distribution of each zip code.
# 1-male, 0-female.
def getgender(idx, gender_ref):
    prob = random.random()
    
    if prob >= gender_ref[idx]:
        gender = 0
    else:
        gender = 1
    
    return gender

#convert the index, gender and a probability from getzip() and get the age of the simulation from an age reference
def getage(idx, age_ref, gender):
    age_ref_male = age_ref[:, :18]
    age_ref_female = age_ref[:, 18:]
    prob = random.random()
    
    if gender == 1:
        idx_age = np.where(age_ref_male[idx] >= prob)
        if idx_age[0].size != 0:
            idx_age = idx_age[0][0]
            delta = random.randint(0, 4)
        
            age = idx_age * 5 + delta
        else:
            age = 90
    else:
        idx_age = np.where(age_ref_female[idx] >= prob)
        if idx_age[0].size != 0:
            idx_age = idx_age[0][0]
            delta = random.randint(0, 4)
        
            age = idx_age * 5 + delta
        else:
            age = 90

    return age

#convert the index and a probability from getzip() and get the race of the simulation from a race reference
def getrace(idx, race_ref):
    prob = random.random()
    idx_race = np.where(race_ref[idx] >= prob)
    idx_race = idx_race[0][0]
    
    return idx_race + 1


In [27]:
#convert a zip code to its coresponding coordinate.
#zip_array: a 1-d array that is a list of zip_code
#reference_array: a 2-d array contaning the coordinates of the reference zipcodes
def zip_to_coordinate(zip_array, reference_array):
    count = 0
    coordinate_list = []
    full_list = []
    zip_ref = reference_array[:, 0].astype(np.int)
    for zip_c in zip_array:
        idx = np.argwhere(zip_ref == int(zip_c))
        if idx.size != 0:
            coordinate_pair = reference_array[idx[0][0], 1:3]
            full_list.append(count)
        else: #there are some zipcodes were P.O box addresses and not in our reference. So we look for the nearby zipcodes
            zip_c_back = int(zip_c) - 1
            zip_c_forward = int(zip_c) + 1
            idx_back = np.argwhere(zip_ref == zip_c_back)
            idx_forward = np.argwhere(zip_ref == zip_c_forward)
            if idx_back.size != 0:
                coordinate_pair = reference_array[idx_back[0][0], 1:3]
                full_list.append(count)
            elif idx_forward.size != 0:
                coordinate_pair = reference_array[idx_forward[0][0], 1:3]
                full_list.append(count)
            else:
                coordinate_pair = ['N/A', 'N/A']
                
        count += 1
        coordinate_list.append(coordinate_pair)
    return np.array(coordinate_list), full_list

In [28]:
def random_shuffle(array, upper_array, lower_array):
    element_counter = 0
    for element in array[0]:
        prob = random.random()
        if prob <= 0.15:
            array[0][element_counter] = np.random.choice(np.arange(lower_array[element_counter], upper_array[element_counter]))  
        element_counter += 1
        
    return array

def convert_dummy(array):
    
    num_sample, num_feature = array.shape
    dummy_list = []
    
    combined_df = np.array(pd.get_dummies(array[:, 0]))
    for i in range(1, num_feature):
        dummy_df = pd.get_dummies(array[:, i])
        combined_df = np.concatenate((combined_df, np.array(dummy_df)), axis=1)
    
    return combined_df

In [35]:
# def cf(mat, sim_user, feat, k, mat_norm):
#     #print("start cf")
#     mat += 0.0001
#     #print("mat")
#     #print(mat[:2,:])
#     sim_user += 0.0001
#     sim_user = sim_user.reshape(-1,8) #(n,8)
#     n = sim_user.shape[0]
#     #print("sim_user before concat: ", sim_user)

#     sim_user = np.concatenate((np.zeros((n,feat)), sim_user), axis=1)
#     #print("sim_user after concat: ", sim_user)
# #     cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / LA.norm(mat,axis = 1)
    
#     sim_user = torch.tensor(sim_user,dtype=torch.float64).cuda()
#     mat = torch.tensor(mat,dtype=torch.float64).cuda()
#     mat_norm = torch.tensor(mat_norm,dtype=torch.float64).cuda()
#     other = torch.tensor(0,dtype=torch.float64).cuda()
#     sim_norm = torch.dist(sim_user, other)
# #     nume = sim_user.dot(mat.T)
# #     simu_norm = LA.norm(sim_user)
# #     deno =  simu_norm * mat_norm
#     print("shape")
#     print(sim_user.shape)
#     print(mat.shape)
#     print(sim_norm.shape)
#     print(mat_norm.shape)
#     cos_sim = torch.mm(sim_user, torch.t(mat)) / sim_norm / mat_norm
# #     cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / mat_norm
#     print(cos_sim.shape)

#     cos_sim = cos_sim.to(torch.device("cpu")).numpy()
#     mat = mat.to(torch.device("cpu")).numpy()
#     sim_user = sim_user.to(torch.device("cpu")).numpy()
#     idx = np.flip( np.argsort(cos_sim) , 1)[:,:k]
#     tmp = np.flip( np.sort(cos_sim), 1)[:,:k]
    
# #     print(mat[idx,:])
    
#     for i in range(feat):
#         #print("mat[idx,i]: ", mat[idx,i])
# #         sim_user[:,i] = np.sum(mat[idx,i] * tmp, axis = 1)
# #         sim_user[:,i] = np.sum(mat[idx,i], axis = 1) / k # avg
        
#         sim_user[:,i] = stats.mode(mat[idx,i], axis = 1)[0].T[0] # find most common value for each feature
    
#     #print("sim_user - 0.0001: ", sim_user - 0.0001)
    
# #     sim_user = round(sim_user - 0.0001)
#     sim_user[:,:8] = (sim_user - 0.0001)[:,:8].astype(int)
# #     sim_user = (sim_user - 0.001).astype(int)
    
#     return sim_user

def cf(sim_user, mat, feat, k, mat_norm):
# shapes (8,) and (16,856551) not aligned: 8 (dim 0) != 16 (dim 0)
    sim_user = sim_user.reshape(1,16)
    cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / mat_norm
    idx = np.flip( np.argsort(cos_sim).reshape(1,-1), 1)[:,:k]
    tmp = np.flip( np.sort(cos_sim).reshape(1,-1), 1)[:,:k]
    
    sim_user[:8] = stats.mode(mat[idx,:][0], axis = 0)[0][0][:8]
    # could not broadcast input array from shape (8) into shape (1,16)
    return sim_user


In [36]:
def simulation(neigh_model, mat, zip_array ,coordinate_array, gender_array, age_array, race_array, prob_dist, daily_indicator, year, mat_norm):
    
    
    # generate a random probability prop to population distri. (use zip for now)
    prob = random.random() # 0.0~1.0
    
    #longi, lati = getcoord(zip)
    coordinate, idx = getzip(prob, coordinate_array, prob_dist)
    
    #zip_c = int(zip_array[idx])
    
    gender = getgender(idx, gender_array)
    
    x_knn = np.append(coordinate, gender)
    
    age = getage(idx, age_array, gender)
    
    x_knn = np.append(x_knn, age)
    
    race = getrace(idx, race_array)
    
    x_knn = np.append(x_knn, race)
    
    x_knn = np.append(x_knn, daily_indicator)
    
    
    '''
    hpi_array = parseFile_hpi("CleanedData/hpi_cleaned.csv")
    hpi_locator = hpi_array[1:, 0:2].astype(np.int)
    idx_hpi = np.argwhere(np.logical_and(hpi_locator[:,0] == zip_c, hpi_locator[:,1] == year))
    
    if idx_hpi.size != 0:
        x_knn = np.append(x_knn, float(hpi_array[idx_hpi[0][0], 2])).reshape(1, 9)
    else:
        x_knn = np.append(x_knn, 0).reshape(1, 9)
    
    x_knn_for_tran = copy.deepcopy(x_knn)
    #scaled_x_knn = norm_model.transform([x_knn_for_tran])
    '''
    # ------------------------------------------------
    #print("start simulation")
    #print("x_knn: ", x_knn)
    
    # use cf to generate sentiment features
    # 1. get mat
    #print("get mat: ", mat.shape)
    # 2. sim_user
#     sim_user = np.array(x_knn).astype(float)
#     feat, k = 8, 10
    #print("sim_user before cf: ", sim_user)
    # 3. apply cf
#     sim_user = cf(mat, sim_user, feat, k, mat_norm)
    #print("sim_user after cf: ", sim_user)
#     senti_feature = sim_user[:,:8]
    #print("senti_feature: ", senti_feature)
#     #knn
#     print("senti_feature: ", senti_feature)
    
# #     x_knn:  [ 3.4203289e+01 -9.7115520e+01  1.0000000e+00  2.8000000e+01
# #               1.0000000e+00  2.4090000e+01  9.9640000e+01  1.4471600e+03]
# #     senti_feature:  [[2 1 5 8 1 1 0 0]]
    # ------------------------------------------------
    
    
    # generate sentiment features (use knn for now)
#     senti_feature = neigh_model.predict(x_knn.reshape(1,8))
    

    
#     upper_limit = [5, 2, 11, 11, 2, 4, 2, 2]
#     lower_limit = [1, -1, 0, 0, 0, 1, 0, 0]
    
#     senti_feature = random_shuffle(senti_feature, upper_limit, lower_limit)
    
    
#     return senti_feature, x_knn

    return x_knn

In [37]:
def model_train(mat, label_location):
    #model = linear_model.LogisticRegression()
    poly = preprocessing.PolynomialFeatures(2)
    num_sam, num_var = mat.shape
    model = ensemble.RandomForestClassifier(n_estimators = 15,min_samples_split= 32, min_samples_leaf = 20)
    feature_mat = np.delete(mat, label_location, axis=1).astype(np.float)
    feature_mat = poly.fit_transform(feature_mat)
    #feature_mat = np.concatenate((feature_mat, (feature_mat[:, 41] * feature_mat[:, 41]).reshape((num_sam, 1))), axis=1)
    labels = mat[:, label_location].astype(np.int)
    print('Model training - Started!')
    time_start = time.time()
    model.fit(feature_mat, labels)
    time_end = time.time()
    print('Model training - Completed! Training time: ' + str(time_end - time_start) + 's')

    predicted_lab = model.predict(feature_mat)
    corrected_pred = np.sum(labels == predicted_lab)

    training_error = 1 - corrected_pred/labels.size

    return model, training_error


def model_test(model, mat, label_location):
    num_sam, num_var = mat.shape
    poly = preprocessing.PolynomialFeatures(2)
    feature_mat = np.delete(mat, label_location, axis=1).astype(np.float)
    feature_mat = poly.fit_transform(feature_mat)
    #feature_mat = np.concatenate((feature_mat, (feature_mat[:, 41] * feature_mat[:, 41]).reshape((num_sam, 1))), axis=1)
    labels = mat[:, label_location].astype(np.int)

    predicted_lab = model.predict(feature_mat)
    corrected_pred = np.sum(labels == predicted_lab)
    
    label_score = model.predict_proba(feature_mat)
    
    print('The current model stands an AUC of ' + str(roc_auc_score(labels, label_score[:, 1])))
    
    np.savetxt('predicted_lab_RF.txt', predicted_lab.astype(np.int))
    np.savetxt('label_test_RF.txt', labels.astype(np.int))

    test_error = 1 - corrected_pred / labels.size
    return test_error

def model_sim(model, mat):
    poly = preprocessing.PolynomialFeatures(2)
    feature_mat = mat.astype(np.float)
    feature_mat = poly.fit_transform(feature_mat)
    num_sim, num_var = feature_mat.shape
    #feature_mat = np.concatenate((feature_mat, (feature_mat[:, 41] * feature_mat[:, 41]).reshape((num_sim, 1))), axis=1)
    predicted_lab = model.predict(feature_mat).reshape(num_sim, 1)
    
    full_mat = np.concatenate((feature_mat, predicted_lab), axis=1)
    
    return full_mat

In [38]:
def main():
    file = "../CleanedData/gallup_clean_NA_determinant.txt"
    file_age = "../CleanedData/ppl_by_zip.txt"
    file_race = "../CleanedData/race_by_zip.txt"
    
    file_indi = "../CleanedData/daily_ind_1.csv"
    file_indi_oil = "../CleanedData/daily_ind_oil.csv"
    file_indi_S = "../CleanedData/daily_ind_SP500.csv"
    
#     simu_iter = 10000        #327500000 is current US population
    simu_iter = 10
    
    raw_data = parseFile_raw(file) #raw_data from Gallup daily survey
    header = raw_data[0,:]
    cleaned_data_input = filter_full_feature(raw_data)[1:,:]  #cleaned_data input from Gallup
    
    label = cleaned_data_input[:, :1] #employmed label
    cleaned_data = cleaned_data_input[:, 1:]
    
    age_data = parseFile_reference(file_age)
    age_data = filter_full_feature(age_data)[1:,:]
    coordinate = age_data[:,2:4].astype(np.float) # (longi,lati)
    index = age_data[:,-1].astype(np.float) # prob
    age_dist = age_data[:, 5:-2].astype(np.float)
    
    race_data = parseFile_reference(file_race)
    race_data = filter_full_feature(race_data)[1:,:]
    race_dist = race_data[:, 2:].astype(np.float)
    
    zipcode = age_data[:,:1]
    gender_distribution = age_data[:, 4:5].astype(np.float)
    
    combined_zip_ref = np.concatenate((zipcode, coordinate), axis=1)
    
    zip_code_ind = cleaned_data[:, -1] 
    ind_coordinate, full_list = zip_to_coordinate(zip_code_ind, combined_zip_ref) # coreponding coordinate of the samples.
    
    content_mat = np.concatenate((cleaned_data,ind_coordinate),axis=1)
    content_mat = content_mat[full_list, :]
    label = label[full_list, :]
    
    num_sam, num_var = content_mat.shape
    
    X_knn = content_mat[:, -2:num_var + 1].astype(np.float)# zip vec
    gender = content_mat[:, 8].astype(np.int)
    age = content_mat[:, 9].astype(np.int)
    race = content_mat[:, 10].astype(np.int)
    daily_ind =  content_mat[:, 11:14].astype(np.float)
    #hpi = content_mat[:, -4].astype(np.float)
    #X_knn = np.concatenate((X_knn, gender.reshape((num_sam, 1)), age.reshape((num_sam, 1)), race.reshape((num_sam, 1)),daily_ind.reshape((num_sam, 3)), hpi.reshape(num_sam, 1) ),axis=1)
    X_knn = np.concatenate((X_knn, gender.reshape((num_sam, 1)), age.reshape((num_sam, 1)), race.reshape((num_sam, 1)),daily_ind.reshape((num_sam, 3))),axis=1)
    y_knn = content_mat[:,:8].astype(np.int) # senti mat
    
#     print("get X_knn and y_knn: ")
#     print(X_knn.shape) # (856551, 8)
#     print(y_knn.shape) # (856551, 8)
    mat = np.concatenate((y_knn,X_knn), axis=1)
#     print("get mat: ", mat.shape)
    '''
    X_for_transform = copy.deepcopy(X_knn)
    
    time_s = time.time()
    print('normalization starts!')
    scaler = preprocessing.MinMaxScaler()
    scaler.fit(X_knn)
    scaled_X_knn = scaler.transform(X_for_transform)
    time_s_end = time.time()
    print('normalization ends after ' + str(time_s_end - time_s))
    
    scaled_X_knn = np.around(scaled_X_knn, decimals = 10)
    '''
    sim_prob_ref = age_data[:, -1].astype(np.float)
    
    neigh = None
#     print('KNN starts!')
#     neigh = KNeighborsClassifier(n_neighbors=10, weights= 'distance')
#     neigh.fit(X_knn, y_knn)
#     print('KNN ends!')
    
    
    
#     dummy_y = convert_dummy(y_knn)
    
#     content_mat = np.concatenate((label, dummy_y, X_knn), axis=1)
#     train_mat, test_mat = train_test_split(content_mat)
    
#     model, train_error = model_train(train_mat, 0)
#     test_error = model_test(model, test_mat, 0)
    
#     print('The training error for this trail is: ' + str(train_error))
#     print('The testing error for this trail is: ' + str(test_error))
    
    indi_array = parseFile_indi(file_indi)
    year_list = indi_array[:, 0]
    indi_list = indi_array[:, 1].astype(np.float)
    oil_array = parseFile_indi(file_indi_oil)
    ind_oil_list = oil_array[:,1].astype(np.float)
    S_array = parseFile_indi(file_indi_S)
    ind_S_list = S_array[:,1].astype(np.float)
    
    num_sim_r = indi_list.shape
    
    employment_rate_16 = []
    employment_rate_25_54 = []
    
    #employment_rate_18.append('>18')
    #employment_rate_25_54.append('25 - 54')
    count = 0
    mat += 0.0001
    mat_norm = LA.norm(mat,axis = 1)
#     for index in range(0, num_sim_r[0]):
    for index in range(10):
        time_start = time.time()
        print('Started simulation cycle ' + str(count))
        
        ind_list = [indi_list[index], ind_oil_list[index], ind_S_list[index]]
        year = int(year_list[index][-2:])
        X_classify = []
        coord_list = []
        sim_user = []
        for i in range(simu_iter):
            #senti_fea, coord = simulation(neigh, scaler, coordinate, gender_distribution, age_dist, race_dist, sim_prob_ref, ind_list)
#             senti_fea, coord = simulation(neigh, mat, zipcode, coordinate, gender_distribution, age_dist, race_dist, sim_prob_ref, ind_list, year, mat_norm)

            single_sim_user = simulation(neigh, mat, zipcode, coordinate, gender_distribution, age_dist, race_dist, sim_prob_ref, ind_list, year, mat_norm)
            sim_user.append(single_sim_user)
#         coord_list.append(coord)
#         X_classify.append(senti_fea[0])
        
        sim_user = np.array(sim_user).astype(float)
        feat, k = 8, 10
        sim_user = np.concatenate((np.zeros((simu_iter,8)), sim_user), axis=1)
        sim_user += 0.0001
        print("before cf: ", sim_user.shape) # (10000, 8)
        data = sc.parallelize(sim_user)
        print("data.collect: ", data.take(1))
        data = data.map(lambda user: cf(user, mat, feat, k, mat_norm) )
        print("data.collect: ", data.take(1))
        print("data.collect: ", data.collect())
        print("data.count: ", data.count())
        
#         mat -= 0.0001
        sim_user = np.array(data.collect())
        sim_user -= 0.0001
        
#         sim_user = cf(mat, sim_user, feat, k, mat_norm)
        print("after cf: ", sim_user.shape)
        
        X_classify = sim_user[:,8:]
        coord_list = sim_user[:,:7]
        
        print ("throw simulated data into the model")
#     # throw simulated data into the model, predict their unemplotment rate
        X_classify = np.array(X_classify)
        coord_list = np.array(coord_list)
        
        #print("get all X_classify and coord_list")
        #print(X_classify.shape) # (2, 8)
        #print(coord_list.shape) # (2, 8)
        
#         dummy_classified = convert_dummy(X_classify)
        
#         output_array = np.concatenate((dummy_classified, coord_list), axis=1)
#         print("get output_array: ", output_array.shape) # (2, 17)
        
#         sim_result = model_sim(model, output_array)
        
#         age_array = output_array[:, 41].reshape((simu_iter, 1))
#         print("age_array: ", age_array)
#         employment_array = sim_result[:, -1].reshape((simu_iter, 1))
        
#         over_16_list = np.argwhere(age_array >= 16)
#         age_25_54_list = np.argwhere((age_array >= 25) & (age_array <= 54))
#         #over_18_population = over_18_list.shape
#         num_over_16, dim = over_16_list.shape
#         num_25_54, dim = age_25_54_list.shape
        
#         employment_array_16 = employment_array[over_16_list[:,0]]
#         employment_array_25_54 = employment_array[age_25_54_list[:,0]]
        
#         employment_rate_16.append(np.sum(employment_array_16)/num_over_16)
#         employment_rate_25_54.append(np.sum(employment_array_25_54)/num_25_54)
        
        time_end = time.time()
        print('Simulation Cycle ' + str(count) + ' finished in ' + str(time_end - time_start) + 's!')
        count += 1
        
#         #if count == 10:
#             #break
        
#     age_16 = np.array(employment_rate_16).reshape((count, 1))
#     age_25_54 = np.array(employment_rate_25_54).reshape((count, 1))
    
#     #ind_list_out = ind_list.reshape(num_sim, 1)
    
#     output_array = np.concatenate((age_16, age_25_54), axis=1)
#     np.savetxt('sim_out_daily_RF_CF_optim'+ str(simu_iter) + '.txt', output_array, delimiter=',', fmt='%1.4f,%1.4f')
    
    print("all simulation done !")
    
if __name__ == "__main__":    
    main()

Reading data is complete! Running time is 26.253950357437134s!
There are a total of 857696 samples fed into the model
Reading data is complete! Running time is 0.3672142028808594s!
There are a total of 32800 samples fed into the model
Reading data is complete! Running time is 0.07768750190734863s!
There are a total of 32800 samples fed into the model
Started simulation cycle 0
before cf:  (10, 16)
data.collect:  [array([ 1.0000000e-04,  1.0000000e-04,  1.0000000e-04,  1.0000000e-04,
        1.0000000e-04,  1.0000000e-04,  1.0000000e-04,  1.0000000e-04,
        3.0562114e+01, -9.6273800e+01,  1.0000000e-04,  2.1000100e+01,
        1.0001000e+00,  2.4090100e+01,  9.9640100e+01,  1.4471601e+03])]


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 51, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-38-aba5151af937>", line 137, in <lambda>
  File "<ipython-input-35-184363b04ac4>", line 62, in cf
ValueError: could not broadcast input array from shape (8) into shape (1,16)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-38-aba5151af937>", line 137, in <lambda>
  File "<ipython-input-35-184363b04ac4>", line 62, in cf
ValueError: could not broadcast input array from shape (8) into shape (1,16)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD$$anonfun$1.apply(PythonRDD.scala:141)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
sim_user = np.array([[1,2,3],[4,5,6]]) # (2,3)
mat = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]) # (4,3)
mat_norm = LA.norm(mat,axis = 1)

sim_user = torch.tensor(sim_user,dtype=torch.float64).cuda()
mat = torch.tensor(mat,dtype=torch.float64).cuda()
mat_norm = torch.tensor(mat_norm,dtype=torch.float64).cuda()
other = torch.tensor(0,dtype=torch.float64).cuda()

sim_norm = torch.dist(sim_user, other)

print("sim_user: ", sim_user)
mat_t = torch.t(mat)
print("mat_t: ",mat_t)
cos_sim = torch.mm(sim_user, mat_t) 
print("cos_sim: ",cos_sim)
print("sim_norm: ", sim_norm)
cos_sim /= sim_norm
print("cos_sim: ",cos_sim)
print("mat_norm: ", mat_norm)
cos_sim /= mat_norm
print("cos_sim: ",cos_sim)

print(cos_sim)
cos_sim = cos_sim.to(torch.device("cpu")).numpy()
print(cos_sim)
# torch.dot(torch.tensor([2, 3]), torch.tensor([2, 1]))

In [None]:
8.0718 / 8.7750


In [12]:
import re
import sys
from pyspark import SparkConf, SparkContext

conf = SparkConf()

In [13]:
sc = SparkContext(conf=conf)
sim_user = np.ones((10000, 8))
print(sim_user[0,:])
# print(sim_user.shape)

data = sc.parallelize(sim_user)
data.take(1)


# # Load the data
# data = sc.textFile('/home/shared/CF/pg100.txt')

# # Parse the data into words
# words = data.flatMap(lambda line: re.split(r'[^\w]+', line))

# # Deal with case
# caps = words.map(lambda word: word.upper())

# # Remove empty words
# final = caps.filter(lambda word: word != '')

# # Convert each word into a tuple of its first letter and 1
# pairs = final.map(lambda word: (word[0], 1))

# # Sum the counts for each character and save them to disk
# pairs.reduceByKey(lambda c1, c2: c1 + c2).sortByKey().saveAsTextFile('/home/shared/CF/output.txt')

sc.stop()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-cae8f1f5cd76>:22 

In [14]:
def cf(sim_user, mat, feat, k, mat_norm):
    
    cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / mat_norm
    idx = np.flip( np.argsort(cos_sim) , 1)[:,:k]
    tmp = np.flip( np.sort(cos_sim), 1)[:,:k]
    
    for i in range(feat):
        sim_user[:,i] = stats.mode(mat[idx,i], axis = 1)[0].T[0]
    
    sim_user[:,:8] = sim_user[:,:8].astype(int)
    
    return sim_user

def cf1(mat, sim_user, feat, k, mat_norm):
    mat += 0.0001
    sim_user += 0.0001
    sim_user = sim_user.reshape(-1,8) #(n,8)

    sim_user = np.concatenate((np.zeros((n,feat)), sim_user), axis=1)
    cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / LA.norm(mat)

    idx = np.flip( np.argsort(cos_sim) , 1)[:,:k]
    tmp = np.flip( np.sort(cos_sim), 1)[:,:k]
    
    for i in range(feat):
#         sim_user[:,i] = np.sum(mat[idx,i] * tmp, axis = 1)
#         sim_user[:,i] = np.sum(mat[idx,i], axis = 1) / k # avg
        sim_user[:,i] = stats.mode(mat[idx,i], axis = 1)[0].T[0] # find most common value for each feature

    sim_user[:,:8] = (sim_user - 0.0001)[:,:8].astype(int)
    
    return sim_user


In [20]:
def cf(sim_user, mat, feat, k, mat_norm):
    
    cos_sim = sim_user.dot(mat.T) / LA.norm(sim_user) / mat_norm
    idx = np.flip( np.argsort(cos_sim) , 1)[:,:k]
    tmp = np.flip( np.sort(cos_sim), 1)[:,:k]
    
    for i in range(feat):
        sim_user[:,i] = stats.mode(mat[idx,i], axis = 1)[0].T[0]
    
    sim_user[:,:8] = sim_user[:,:8].astype(int)
    
    return sim_user

# sc = SparkContext(conf=conf)
mat = np.ones((856551, 16)) * 2
sim_user = np.ones((100, 8))
sim_user = np.concatenate((np.zeros((100,8)), sim_user), axis=1)
print(sim_user.shape)
data = sc.parallelize(sim_user)
# data.take(2)
# data.collect()

feat, k = 8, 10
mat += 0.0001
sim_user += 0.0001
mat_norm = LA.norm(mat)
data = data.map(lambda user: cf(user, mat, feat, k, mat_norm) )
# sim_user -= 0.0001
mat -= 0.0001

print(data.count())

# np.all(rdd.first() == mat[0])

(100, 16)
test


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 1 times, most recent failure: Lost task 2.0 in stage 4.0 (TID 27, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/numpy/lib/function_base.py", line 206, in flip
    indexer[axis] = slice(None, None, -1)
IndexError: list assignment index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1056, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1056, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-20-fa642c147da8>", line 28, in <lambda>
  File "<ipython-input-20-fa642c147da8>", line 4, in cf
  File "/home/shared/anaconda3/lib/python3.6/site-packages/numpy/lib/function_base.py", line 209, in flip
    % (axis, m.ndim))
ValueError: axis=1 is invalid for the 1-dimensional input array

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/numpy/lib/function_base.py", line 206, in flip
    indexer[axis] = slice(None, None, -1)
IndexError: list assignment index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1056, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/shared/anaconda3/lib/python3.6/site-packages/pyspark/rdd.py", line 1056, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-20-fa642c147da8>", line 28, in <lambda>
  File "<ipython-input-20-fa642c147da8>", line 4, in cf
  File "/home/shared/anaconda3/lib/python3.6/site-packages/numpy/lib/function_base.py", line 209, in flip
    % (axis, m.ndim))
ValueError: axis=1 is invalid for the 1-dimensional input array

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
