In [1]:
import os
import csv
import numpy as np
import pandas as pd

import mindspore as ms
from mindspore import context
from mindspore import nn
from mindspore.ops import operations as ops
from mindspore.ops import functional as F

context.set_context(device_target="CPU")

In [2]:
#读取数据
with open('data/train.csv') as csv_file:
    data = list(csv.reader(csv_file))
X_train = np.array([[float(x) for x in s[0:4]] for s in data[1:]], np.float32)
Y_train = np.array([s[4] for s in data[1:]], np.int32)

with open('data/val.csv') as csv_file:
    data_val = list(csv.reader(csv_file))
X_val = np.array([[float(x) for x in s[0:4]] for s in data_val[1:]], np.float32)
Y_val = np.array([s[4] for s in data_val[1:]], np.int32)

with open('data/test_data.csv') as csv_file:
    data_test = list(csv.reader(csv_file))
X_test = np.array([[float(x) for x in s] for s in data_test[1:]], np.float32)

In [3]:
#利用MindSpore提供的tile, suqare, ReduceSum, sqrt, TopK等算子
#通过矩阵运算的方式同时计算输入样本x和已明确分类的其他样本X_train的距离，并计算出top k近邻
class KnnNet(nn.Cell):
    def __init__(self, k):
        super(KnnNet, self).__init__()
        self.tile = ops.Tile()  # 复制输入张量
        self.sum = ops.ReduceSum()  # 指定维度求和
        self.topk = ops.TopK()  # 获取最大的 K 个值及其索引
        self.k = k  
    def construct(self, x, X_train):
        x_tile = self.tile(x, (97, 1))# 将输入 x 复制以匹配 X_train 样本数
        square_diff = F.square(x_tile - X_train)
        square_dist = self.sum(square_diff, 1)
        dist = F.sqrt(square_dist)
        values, indices = self.topk(-dist, self.k)# 获取最大的 K 个值及其索引
        return indices

def knn(knn_net, x, X_train, Y_train):
    x, X_train = ms.Tensor(x), ms.Tensor(X_train)
    indices = knn_net(x, X_train)
    topk_cls = [0]*len(indices.asnumpy())
    for idx in indices.asnumpy():
        topk_cls[Y_train[idx]] += 1
    cls = np.argmax(topk_cls)
    return cls

In [4]:
#对验证集预测，得出准确率
acc = 0
knn_net = KnnNet(5)
times=0
for x, y in zip(X_val, Y_val):
    times+=1
    pred = knn(knn_net, x, X_train, Y_train)
    acc += (pred == y)

    print('test '+str(times)+' label: %d, prediction: %s' % (y, pred))
print('Validation accuracy is %f' % (acc/len(Y_val)))

test 1 label: 1, prediction: 1
test 2 label: 2, prediction: 2
test 3 label: 2, prediction: 2
test 4 label: 0, prediction: 0
test 5 label: 1, prediction: 1
test 6 label: 1, prediction: 1
test 7 label: 1, prediction: 1
test 8 label: 1, prediction: 1
test 9 label: 0, prediction: 0
test 10 label: 0, prediction: 0
test 11 label: 0, prediction: 0
test 12 label: 2, prediction: 2
test 13 label: 1, prediction: 1
test 14 label: 2, prediction: 2
test 15 label: 0, prediction: 0
Validation accuracy is 1.000000


In [9]:
#预测数据集
acc = 0
result=[]
test_data=pd.read_csv('data/test_data.csv',header = 0)#测试集
knn_net = KnnNet(5)
for x in zip(X_test):
    pred = knn(knn_net, x, X_train, Y_train)
    result.append(pred)
    print('prediction: %d' % (pred))
test_mindspore=pd.DataFrame(result, columns=['label'])

print("mindspore done")
test_mindspore.to_csv('mypredicsion_csv/task3_test_prediction.csv',index=0)
#用于可视化
test_euclidean_for_visual=pd.DataFrame(np.column_stack((test_data,result)),columns
    =['Sepal Length','Sepal Width','Petal Length','Petal Width','My prediction'])

test_euclidean_for_visual.to_csv('mypredicsion_csv/for visual/task3_test_prediction.csv',index=0)

prediction: 2
prediction: 1
prediction: 0
prediction: 2
prediction: 0
prediction: 2
prediction: 0
prediction: 1
prediction: 1
prediction: 1
prediction: 2
prediction: 1
prediction: 1
prediction: 1
prediction: 1
prediction: 0
prediction: 1
prediction: 1
prediction: 0
prediction: 0
prediction: 2
prediction: 1
prediction: 0
prediction: 0
prediction: 2
prediction: 0
prediction: 0
prediction: 1
prediction: 1
prediction: 0
prediction: 2
prediction: 1
prediction: 0
prediction: 2
prediction: 2
prediction: 1
prediction: 0
prediction: 2
mindspore done
