You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
the code as follows(i delete some unimportant codes):
import dask.dataframe as dd
def load_data(self):
data = dd.read_csv(self.input_path, blocksize="5Mib")
return data.to_delayed()
def batch_predict(self, batch_data):
app_model = self.load_model()
res = app_model.predict(batch) // type(res) is ndarray
def save_result(self, data):
df = pd.DataFrame(data)
res = df.to_csv(self.output_path, mode='a', header=['predict'])
if res is None:
return 1
else:
return 0
def static_succ_num(self, *flag):
partitions = len(*flag)
actual = sum(*flag)
return actual == partitions
client = Client('scheduler:32666')
client.restart()
batches = delayed(self.load_data)()
batches = batches.compute()
n_partitions = len(batches)
print('partitions: ', n_partitions) // print 1730
batches = client.persist(batches)
print(type(batches[0])) // print Delayed
res = [delayed(self.batch_predict)(batch) for batch in batches]
print(type(res[0])) // print Delayed
res = [delayed(self.save_result)(batch_res) for batch_res in res]
print(type(res[0]))
res = delayed(self.static_succ_num)(res)
res = client.compute(res)
res = client.gather(res)
print(res) // print True
when i run the code above, the output file only have part of 1730, so i think there are same task did not write data into csv, and i want to know the reason, thanks at first!
The text was updated successfully, but these errors were encountered:
the code as follows(i delete some unimportant codes):
when i run the code above, the output file only have part of 1730, so i think there are same task did not write data into csv, and i want to know the reason, thanks at first!
The text was updated successfully, but these errors were encountered: