In [18]:
# 使用submitter对api进行访问
from sparksampling import Submitter
from sparksampling.var import FILE_TYPE_CSV
from sparksampling.var import SIMPLE_RANDOM_SAMPLING_METHOD
submitter = Submitter()
dataset_uri = 'hdfs://localhost:9000/dataset/ten_million_top1k.csv'
fraction = 0.1
selected_features_list = ['X_20','X_80']
label_index = 'y'

In [3]:
# 提交抽样任务
submit_response = submitter.submit_sampling_simplejob(dataset_uri,
                                              method=SIMPLE_RANDOM_SAMPLING_METHOD,
                                              file_type=FILE_TYPE_CSV,
                                              fraction=fraction,
                                              with_header=True)
job_id = submit_response.job_id
submit_response.to_dict()

2021-05-29 17:59:51,754 - INFO - request: http://localhost:8000/v1/sampling/simplejob/ with data {'path': 'hdfs://localhost:9000/dataset/ten_million_top1k.csv', 'method': 'random', 'type': 'csv', 'with_header': True, 'conf': {'fraction': 0.1}}


{'code': 0, 'msg': '', 'data': {'job_id': 10024}}

In [6]:
# 查询抽样任务
sampling_job_details = submitter.get_sampling_job_details(job_id)
sampled_path = sampling_job_details.sampled_path
print(sampling_job_details.to_dict())
sampled_path

2021-05-29 18:01:14,804 - INFO - request: http://localhost:8000/v1/query/sampling/job/ with data {'job_id': 10024}


{'code': 0, 'msg': '', 'data': {'job_id': 10024, 'job_status': 'Succeed', 'msg': 'succeed', 'method': 'Simple Random Sampling', 'start_time': '2021/05/29/ 17:59:52', 'end_time': '2021/05/29 17:59:59', 'simpled_file_path': 'hdfs://localhost:9000/dataset/ten_million_top1k.csv-sampled-1622282391.7654526', 'request_data': "{'path': 'hdfs://localhost:9000/dataset/ten_million_top1k.csv', 'method': 'random', 'type': 'csv', 'with_header': True, 'conf': {'fraction': 0.1, 'path': 'hdfs://localhost:9000/dataset/ten_million_top1k.csv', 'method': 'random', 'file_type': 'csv', 'with_header': True, 'seed': 58591, 'with_replacement': True, 'col_key': None}}"}}


'hdfs://localhost:9000/dataset/ten_million_top1k.csv-sampled-1622282391.7654526'

In [19]:
# 简单的读取抽样之后的文件
from pyspark.sql import SparkSession
from sparksampling.config import SPARK_CONF

conf = SPARK_CONF
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.csv(sampled_path, header=True).toPandas()
# 可以在这后面做数据分析，或试试看下面的统计、评估功能
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier()

train_y = df[label_index]
train_X = df[selected_features_list]
# train_X = df.drop(["# id"], axis=1)
model.fit(train_X,train_y)
tsdf = spark.read.csv(dataset_uri, header=True)
tdf = tsdf.toPandas()

test_y = tdf[label_index]
test_X = tdf[train_X.columns]
# test_X = test_X[feature_list]
pred_y = model.predict(test_X)
# data analyse here
from sklearn.metrics import classification_report
print(classification_report(y_true=test_y, y_pred=pred_y))

              precision    recall  f1-score   support

           0       0.92      0.85      0.89       517
           1       0.86      0.92      0.89       483

    accuracy                           0.89      1000
   macro avg       0.89      0.89      0.89      1000
weighted avg       0.89      0.89      0.89      1000



In [8]:
# 统计原数据集
submitter.get_statistics(path=dataset_uri, file_type=FILE_TYPE_CSV,with_header=True).to_pandas()

2021-05-29 18:02:17,964 - INFO - request: http://localhost:8000/v1/evaluation/statistics/ with data {'path': 'hdfs://localhost:9000/dataset/ten_million_top1k.csv', 'type': 'csv', 'method': 'basic', 'with_header': True, 'from_sampling': False}


Unnamed: 0,summary,# id,X_0,X_1,X_2,X_3,X_4,X_5,X_6,X_7,...,X_91,X_92,X_93,X_94,X_95,X_96,X_97,X_98,X_99,y
0,count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,...,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0
1,mean,499.5,2.049,3.051,2.815,3.118,4.211,3.992,3.351,2.658,...,0.0294856781127999,-0.015578821739792,-0.0343826495595,0.0341069684961999,0.0042519669353,0.0310062506216999,-0.0087983829974,0.00900452388771,-0.0504560444434999,0.483
2,stddev,288.8194360957494,1.8889592815005751,2.394186945735675,2.0559674126799785,1.863208938242715,1.6208350027396687,1.5317579058984108,1.7728344372619638,1.933591160192552,...,1.310014162316104,0.992814923304702,1.3489092513130678,1.0175306226012728,0.9937576399485476,1.2158726897277283,1.4720581081411457,0.8531823148864874,1.0285753546436174,0.4999609594367951
3,min,0.0,0.0,0.0,0.0,0.0,1.0,2.0,1.0,0.0,...,-0.0021861139,-0.00099003483,-0.0072156244999999,-0.0087255505,-0.0049381842,-0.0030899136,-0.0012198847,-0.0017179298999999,-0.00151381,0.0
4,max,999.0,6.0,6.0,6.0,6.0,6.0,6.0,6.0,6.0,...,4.247343099999999,3.266212,3.7208654,3.1033252,3.1775258,3.5107522000000007,5.6920133,3.0476226000000004,3.1554324,1.0


In [9]:
# 统计抽样后的数据集
data = submitter.get_statistics(job_id=job_id, from_sampling=True, file_type=FILE_TYPE_CSV,with_header=True).to_pandas()
data

2021-05-29 18:02:23,231 - INFO - request: http://localhost:8000/v1/evaluation/statistics/ with data {'job_id': 10024, 'type': 'csv', 'method': 'basic', 'with_header': True, 'from_sampling': True}


Unnamed: 0,summary,# id,X_0,X_1,X_2,X_3,X_4,X_5,X_6,X_7,...,X_91,X_92,X_93,X_94,X_95,X_96,X_97,X_98,X_99,y
0,count,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,...,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0
1,mean,514.8981481481482,2.25,3.0277777777777777,2.9166666666666665,2.8981481481481484,4.398148148148148,4.185185185185185,3.425925925925926,3.0,...,0.1250439184546296,-0.0377501460185185,-0.1064211436203704,0.0813165940416666,0.1106930416296296,0.0958203896435185,-0.1807713333055556,0.1035317262527777,-0.0005714228148147963,0.4907407407407407
2,stddev,283.16568050058794,1.860182887200013,2.3539917711340834,2.005250118409965,1.839035783599934,1.5940298359923883,1.4670725389512356,1.8148077507085545,1.844415668681684,...,1.2498019518135066,0.8767414243685469,1.3092069478023307,0.9532536404715464,0.9255972992810526,1.1801704496779792,1.3180501912308955,0.7724878924299564,1.077355924914047,0.5022448740055658
3,min,114.0,0.0,0.0,0.0,0.0,1.0,2.0,1.0,0.0,...,-0.022213316,-0.04997012,-0.0076247096,-0.0087255505,-0.027446876,-0.0030899136,-0.024749287,-0.0020981927,-0.0340015,0.0
4,max,997.0,6.0,6.0,6.0,6.0,6.0,6.0,6.0,6.0,...,2.5984216,2.1624603,3.7208654,3.1033252,2.438946,2.3365866000000004,2.6220742,1.6186575,2.0121164,1.0


In [10]:
# 提交评估任务
cmp_evaluation_job = submitter.submit_evaluation_job(compare_job_id=job_id, file_type=FILE_TYPE_CSV)
print(cmp_evaluation_job.to_dict())
cmp_evaluation_job_id = cmp_evaluation_job.job_id
cmp_evaluation_job_id

2021-05-29 18:02:29,415 - INFO - request: http://localhost:8000/v1/evaluation/job/ with data {'method': 'compare', 'type': 'csv', 'compare_job_id': 10024}


{'code': 0, 'msg': '', 'data': {'job_id': 50004}}


50004

In [11]:
# 查看各属性评估得分
cmp_evaluation_job_data = submitter.get_evaluation_job_details(job_id=cmp_evaluation_job_id)
cmp_df = cmp_evaluation_job_data.to_pandas()
cmp_df

2021-05-29 18:02:37,726 - INFO - request: http://localhost:8000/v1/query/evaluation/job/ with data {'job_id': 50004}


Unnamed: 0,# id,X_0,X_1,X_10,X_11,X_12,X_13,X_14,X_15,X_16,...,X_91,X_92,X_93,X_94,X_95,X_96,X_97,X_98,X_99,y
count,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,...,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0,108.0
mean,514.8981481481482,2.25,3.0277777777777777,2.2777777777777777,3.083333333333333,3.3796296296296298,2.083333333333333,2.314814814814815,0.2676804822861111,-0.1807713333055556,...,0.1250439184546296,-0.0377501460185185,-0.1064211436203704,0.0813165940416666,0.1106930416296296,0.0958203896435185,-0.1807713333055556,0.1035317262527777,-0.0005714228148147963,0.4907407407407407
stddev,283.16568050058794,1.860182887200013,2.3539917711340834,1.828299518056562,1.8144024015543951,2.0265416081544365,1.938903245237504,1.4892019323424508,0.9582508085570516,1.3180501912308955,...,1.2498019518135066,0.8767414243685469,1.3092069478023307,0.9532536404715464,0.9255972992810526,1.1801704496779792,1.3180501912308955,0.7724878924299564,1.077355924914047,0.5022448740055658
min,114.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,-0.0026964582,-0.024749287,...,-0.022213316,-0.04997012,-0.0076247096,-0.0087255505,-0.027446876,-0.0030899136,-0.024749287,-0.0020981927,-0.0340015,0.0
max,997.0,6.0,6.0,5.0,6.0,6.0,6.0,5.0,2.3059315,2.6220742,...,2.5984216,2.1624603,3.7208654,3.1033252,2.438946,2.3365866000000004,2.6220742,1.6186575,2.0121164,1.0
mean_bias,0.0308271,0.0980966,0.00761135,0.0979098,0.0123852,0.0962146,0.0590184,0.0334802,1.0,1.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.988675,0.0
stddev_bias,0.0195754,0.015234,0.0167887,0.00740913,0.032143,0.0236702,0.00936722,0.058047,0.00838547,0.104621,...,0.045963,0.116914,0.0294329,0.0631696,0.0685885,0.0293635,0.104621,0.0945805,0.0474254,0.00456819
score,98.7399,97.1667,99.39,97.367,98.8868,97.0029,98.2904,97.7118,74.7904,72.3845,...,73.8509,72.0772,74.2642,73.4208,73.2853,74.2659,72.3845,72.6355,74.0975,99.8858


In [12]:
import pandas as pd
cmp_df = cmp_df[selected_features_list]
score_list = cmp_df.loc['score'].to_list()
while -1 in score_list:
    score_list.remove(-1)
score_list
import numpy as np
np.mean(score_list)

73.15674482498201

In [13]:
km_conf = {
    "compare_job_id": job_id,
    "type": "csv",
    "method": "kmeans",
    "key": "y",
    "selected_features_list": selected_features_list
}
# 提交评估任务
km_evaluation_job = submitter.submit_evaluation_job(**km_conf)
print(km_evaluation_job.to_dict())
km_evaluation_job_id = km_evaluation_job.job_id
km_evaluation_job_id

2021-05-29 18:02:45,644 - INFO - request: http://localhost:8000/v1/evaluation/job/ with data {'method': 'kmeans', 'type': 'csv', 'compare_job_id': 10024, 'key': 'y', 'selected_features_list': ['X_20', 'X_80']}


{'code': 0, 'msg': '', 'data': {'job_id': 50005}}


50005

In [16]:
# 查看各属性评估得分
km_evaluation_job_data = submitter.get_evaluation_job_details(job_id=km_evaluation_job_id)
km_score = km_evaluation_job_data.result
km_score

2021-05-29 18:03:08,961 - INFO - request: http://localhost:8000/v1/query/evaluation/job/ with data {'job_id': 50005}


{'score': 99.0, 'accuracy': 0.9741379310344828, 'centers_result': 100}

In [20]:
# 简单的读取抽样之后的文件
from pyspark.sql import SparkSession
from sparksampling.config import SPARK_CONF

conf = SPARK_CONF
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.csv(dataset_uri, header=True).toPandas()
# 可以在这后面做数据分析，或试试看下面的统计、评估功能
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import RandomForestClassifier

model = RandomForestClassifier()

train_y = df[label_index]
train_X = df[selected_features_list]
# train_X = df.drop(["# id"], axis=1)
model.fit(train_X,train_y)
tsdf = spark.read.csv(dataset_uri, header=True)
tdf = tsdf.toPandas()

test_y = tdf[label_index]
test_X = tdf[train_X.columns]
# test_X = test_X[feature_list]
pred_y = model.predict(test_X)
# data analyse here
from sklearn.metrics import classification_report
print(classification_report(y_true=test_y, y_pred=pred_y))

              precision    recall  f1-score   support

           0       1.00      1.00      1.00       517
           1       1.00      1.00      1.00       483

    accuracy                           1.00      1000
   macro avg       1.00      1.00      1.00      1000
weighted avg       1.00      1.00      1.00      1000

