Skip to content

Commit

Permalink
polars chk fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki committed Apr 30, 2021
1 parent 63a00e5 commit 3fc6e9f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
3 changes: 2 additions & 1 deletion _report/report.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ get_excluded_batch = function() {
c(
1552478772L, 1552482879L # testing different data as 1e9_1e2_0_0 to test logical compression of measures
, 1552454531L, 1555929111L, 1555754148L # dl11 testing
, 1619552039L, 1619596289L ## polars migration
)
}

Expand Down Expand Up @@ -72,7 +73,7 @@ clean_time = function(d) {
][task=="groupby" & solution=="spark" & batch<1548084547, "chk_time_sec" := NA_real_ # spark chk calculation speed up, NA to make validation work on bigger threshold
][task=="groupby" & question%in%old_advanced_groupby_questions & batch<1573882448, c("out_rows","out_cols","chk") := list(NA_integer_, NA_integer_, NA_character_)
][task=="groupby" & solution=="dask" & batch>=1609583373 & batch<Inf & question=="regression v1 v2 by id2 id4", c("out_rows","chk") := .(NA_integer_, NA_character_) ## change Inf to batch after upgrading to dask#7024
][solution=="polars" & batch<1614590447, "chk" := NA_character_
][solution=="polars" & batch<=1619596689, "chk" := NA_character_
][, `:=`(nodename=ft(nodename), in_rows=ft(in_rows), question=ft(question), solution=ft(solution), fun=ft(fun), version=ft(version), git=ft(git), task=ft(task),
data=fctr(data, levels=unlist(get_data_levels())))
][]
Expand Down
28 changes: 14 additions & 14 deletions polars/groupby-polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_mean").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_mean").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -106,7 +106,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_mean").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_mean").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -121,7 +121,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_mean").cast(pl.Int64).sum(), pl.col("v2_mean").cast(pl.Int64).sum(), pl.col("v3_mean").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_mean").sum(), pl.col("v2_mean").sum(), pl.col("v3_mean").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -132,7 +132,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_mean").cast(pl.Int64).sum(), pl.col("v2_mean").cast(pl.Int64).sum(), pl.col("v3_mean").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_mean").sum(), pl.col("v2_mean").sum(), pl.col("v3_mean").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -147,7 +147,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_sum").cast(pl.Int64).sum(), pl.col("v3_sum").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v2_sum").cast(pl.Int64).sum(), pl.col("v3_sum").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -158,7 +158,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v3_sum").cast(pl.Int64).sum(), pl.col("v3_sum").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v1_sum").cast(pl.Int64).sum(), pl.col("v2_sum").cast(pl.Int64).sum(), pl.col("v3_sum").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -173,7 +173,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v3_median").cast(pl.Int64).sum(), pl.col("v3_std").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v3_median").sum(), pl.col("v3_std").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -184,7 +184,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v3_median").cast(pl.Int64).sum(), pl.col("v3_std").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v3_median").sum(), pl.col("v3_std").sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand Down Expand Up @@ -225,7 +225,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["largest2_v3"].cast(pl.Int64).sum()]
chk = [ans["largest2_v3"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -236,7 +236,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["largest2_v3"].cast(pl.Int64).sum()]
chk = [ans["largest2_v3"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -251,7 +251,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["r2"].cast(pl.Int64).sum()]
chk = [ans["r2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -262,7 +262,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["r2"].cast(pl.Int64).sum()]
chk = [ans["r2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -277,7 +277,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v3").cast(pl.Int64).sum(), pl.col("count").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v3").sum(), pl.col("count").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -288,7 +288,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.lazy().select([pl.col("v3").cast(pl.Int64).sum(), pl.col("count").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chk = ans.lazy().select([pl.col("v3").sum(), pl.col("count").cast(pl.Int64).sum()]).collect().to_numpy()[0]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand Down
20 changes: 10 additions & 10 deletions polars/join-polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -68,7 +68,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -83,7 +83,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -94,7 +94,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -109,7 +109,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -120,7 +120,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -135,7 +135,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -146,7 +146,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand All @@ -161,7 +161,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
Expand All @@ -172,7 +172,7 @@
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].cast(pl.Int64).sum(), ans["v2"].cast(pl.Int64).sum()]
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
Expand Down

0 comments on commit 3fc6e9f

Please sign in to comment.