From 44567279e17ea61668809cc2b9ff2658f7d0465f Mon Sep 17 00:00:00 2001 From: Erez Shinan Date: Fri, 13 May 2022 09:33:57 +0200 Subject: [PATCH 1/4] Refactor and use Preql's update syntax --- dev/prepare_db.pql | 76 ++++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 47 deletions(-) diff --git a/dev/prepare_db.pql b/dev/prepare_db.pql index 4d4063c4..ba81ee7e 100644 --- a/dev/prepare_db.pql +++ b/dev/prepare_db.pql @@ -4,16 +4,20 @@ func run_sql(code) { force_eval( SQL( nulltype, code )) } +func drop_table(t) { + run_sql("DROP TABLE IF EXISTS " + t) +} + // Cleanup func cleanup() { - run_sql("DROP TABLE IF EXISTS rating") - run_sql("DROP TABLE IF EXISTS tmp_rating") - run_sql("DROP TABLE IF EXISTS rating_del1") - run_sql("DROP TABLE IF EXISTS rating_update1") - run_sql("DROP TABLE IF EXISTS rating_update001p") - run_sql("DROP TABLE IF EXISTS rating_update1p") - run_sql("DROP TABLE IF EXISTS rating_del1p") - run_sql("DROP TABLE IF EXISTS rating_update50p") + drop_table("rating") + drop_table("tmp_rating") + drop_table("rating_del1") + drop_table("rating_update1") + drop_table("rating_update001p") + drop_table("rating_update1p") + drop_table("rating_del1p") + drop_table("rating_update50p") commit() } @@ -102,7 +106,7 @@ if (db_type == "snowflake") { import_csv(rating, 'dev/ratings.csv', true) rating.add_index("id", true) rating.add_index("timestamp") - run_sql("CREATE INDEX index_rating_id_timestamp ON rating (id, timestamp)") + rating.add_index(["id", "timestamp"]) } run_sql("DROP TABLE IF EXISTS tmp_rating") @@ -122,50 +126,28 @@ const table rating_del1p = rating const table rating_update50p = rating print "Create indexes" -if (db_type != "redshift" or db_type != "snowflake") { - rating_del1.add_index("id", true) - rating_del1.add_index("timestamp") - run_sql("CREATE INDEX index_rating_del1_id_timestamp ON rating_del1 (id, timestamp)") - rating_update1.add_index("id", true) - rating_update1.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update1_id_timestamp ON rating_update1 (id, timestamp)") - rating_update001p.add_index("id", true) - rating_update001p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update001p_id_timestamp ON rating_update001p (id, timestamp)") - rating_update1p.add_index("id", true) - rating_update1p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update1p_id_timestamp ON rating_update1p (id, timestamp)") - rating_del1p.add_index("id", true) - rating_del1p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_del1p_id_timestamp ON rating_del1p (id, timestamp)") - rating_update50p.add_index("id", true) - rating_update50p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update50p_id_timestamp ON rating_update50p (id, timestamp)") - commit() +func create_indices(tbl) { + tbl.add_index("id", true) + tbl.add_index("timestamp") + tbl.add_index(["id", "timestamp"]) } +create_indices(rating_del1) +create_indices(rating_update1) +create_indices(rating_update001p) +create_indices(rating_update1p) +create_indices(rating_del1p) +create_indices(rating_update50p) +commit() + print "Alter tables" rating_del1[middle..(middle+1)] delete [true] assert count(rating) == count(rating_del1) + 1 rating_update1[middle..(middle+1)] update {timestamp: timestamp + 1} -if (db_type == "postgres" or db_type == "redshift") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE random() < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE random() < 0.01') - run_sql('DELETE FROM rating_del1p WHERE random() < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE random() < 0.5') -} else if (db_type == "mysql" or db_type == "mssql") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE rand() < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE rand() < 0.01') - run_sql('DELETE FROM rating_del1p WHERE rand() < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE rand() < 0.5') -} else if (db_type == "snowflake") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.01') - run_sql('DELETE FROM rating_del1p WHERE uniform(0::float, 1, random()) < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.5') -} else { - print "Unsupported database: " + db_type -} +rating_update001p[random() < 0.0001] update {timestamp: timestamp + 1} +rating_update1p[random() < 0.01] update {timestamp: timestamp + 1} +rating_update50p[random() < 0.5] update {timestamp: timestamp + 1} +rating_del1p[random() < 0.01] delete [true] commit() From 3c3668ad24d51f0660a0b7a1bef2cfa749ecc9f8 Mon Sep 17 00:00:00 2001 From: Erez Shinan Date: Fri, 13 May 2022 12:24:26 +0200 Subject: [PATCH 2/4] prepare_db: Use bare table and other small update --- dev/prepare_db.pql | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dev/prepare_db.pql b/dev/prepare_db.pql index ba81ee7e..714573e2 100644 --- a/dev/prepare_db.pql +++ b/dev/prepare_db.pql @@ -32,8 +32,8 @@ if (db_type == "snowflake") { print "Loading ratings CSV" - table tmp_rating { - id: int + bare table tmp_rating { + userid: int movieid: int rating: float timestamp: int @@ -51,7 +51,8 @@ if (db_type == "snowflake") { run_sql(""" INSERT INTO rating(id, userid, movieid, rating, timestamp) - SELECT row_number() over (order by tmp_rating.id, movieid, timestamp) AS id, tmp_rating.id as userid, movieid, rating, timestamp FROM tmp_rating + SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp + FROM tmp_rating """) } else if (db_type == "redshift") { @@ -109,7 +110,7 @@ if (db_type == "snowflake") { rating.add_index(["id", "timestamp"]) } -run_sql("DROP TABLE IF EXISTS tmp_rating") +remove_table_if_exists("tmp_rating") commit() middle = count(rating) /~ 2 From 49ee4e389f5a7d5c3a63501909b7488edee4e452 Mon Sep 17 00:00:00 2001 From: Erez Shinan Date: Fri, 13 May 2022 12:41:40 +0200 Subject: [PATCH 3/4] Refactored prepare_db --- dev/prepare_db.pql | 100 ++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 55 deletions(-) diff --git a/dev/prepare_db.pql b/dev/prepare_db.pql index 714573e2..ee90bdf5 100644 --- a/dev/prepare_db.pql +++ b/dev/prepare_db.pql @@ -8,6 +8,12 @@ func drop_table(t) { run_sql("DROP TABLE IF EXISTS " + t) } +func create_indices(tbl) { + tbl.add_index("id", true) + tbl.add_index("timestamp") + tbl.add_index(["id", "timestamp"]) +} + // Cleanup func cleanup() { drop_table("rating") @@ -24,25 +30,47 @@ func cleanup() { cleanup() // Import CSV -if (db_type == "snowflake") { - print "Uploading ratings CSV" - - run_sql("RM @~/ratings.csv.gz") - run_sql("PUT file://dev/ratings.csv @~") +if (db_type == "snowflake" or db_type == "redshift") { + if (db_type == "snowflake") { + print "Uploading ratings CSV" + + run_sql("RM @~/ratings.csv.gz") + run_sql("PUT file://dev/ratings.csv @~") + + print "Loading ratings CSV" + + bare table tmp_rating { + userid: int + movieid: int + rating: float + timestamp: int + } + + run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)") + + } else if (db_type == "redshift") { + // NOTE: Requires that the csv already exists on s3 in the given path + print "Loading ratings CSV (already uploaded)" + + table tmp_rating { + userid: int + movieid: int + rating: float + timestamp: int + } + + run_sql(""" + COPY "public"."tmp_rating" (userid, movieid, rating, timestamp) + FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv' + IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff' + DELIMITER ',' + IGNOREHEADER 1; + """) - print "Loading ratings CSV" - - bare table tmp_rating { - userid: int - movieid: int - rating: float - timestamp: int } - run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)") - table rating { - id: int + id: int // explicit id, instead of identity type userid: int movieid: int rating: float @@ -55,37 +83,6 @@ if (db_type == "snowflake") { FROM tmp_rating """) -} else if (db_type == "redshift") { - // NOTE: Requires that the csv already exists on s3 in the given path - print "Loading ratings CSV (already uploaded)" - - table tmp_rating { - userid: int - movieid: int - rating: float - timestamp: int - } - - run_sql(""" - COPY "public"."tmp_rating" (userid, movieid, rating, timestamp) - FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv' - IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff' - DELIMITER ',' - IGNOREHEADER 1; - """) - - table rating { - id: int // explicit id, to avoid identity type - userid: int - movieid: int - rating: float - timestamp: int - } - - run_sql(""" - INSERT INTO rating(id, userid, movieid, rating, timestamp) - SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp FROM tmp_rating - """) } else if (db_type == "mssql") { run_sql("drop table if exists tmp_rating") run_sql("create table tmp_rating(userid int, movieid int, rating float, timestamp int)") @@ -105,12 +102,10 @@ if (db_type == "snowflake") { timestamp: int } import_csv(rating, 'dev/ratings.csv', true) - rating.add_index("id", true) - rating.add_index("timestamp") - rating.add_index(["id", "timestamp"]) + create_indices(rating) } -remove_table_if_exists("tmp_rating") +drop_table("tmp_rating") commit() middle = count(rating) /~ 2 @@ -127,11 +122,6 @@ const table rating_del1p = rating const table rating_update50p = rating print "Create indexes" -func create_indices(tbl) { - tbl.add_index("id", true) - tbl.add_index("timestamp") - tbl.add_index(["id", "timestamp"]) -} create_indices(rating_del1) create_indices(rating_update1) From 6b6e47176ee8ee893449e288d56e17de29d77754 Mon Sep 17 00:00:00 2001 From: Erez Shinan Date: Fri, 13 May 2022 12:51:03 +0200 Subject: [PATCH 4/4] Require newer preql --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9430daa3..13efa882 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,14 +28,14 @@ runtype = "^0.2.4" dsnparse = "*" click = "^8.1" -preql = { version = "^0.2.12", optional = true } +preql = { version = "^0.2.13", optional = true } psycopg2 = { version = "*", optional = true } mysql-connector-python = { version = "*", optional = true} snowflake-connector-python = { version = "*", optional = true } [tool.poetry.dev-dependencies] mysql-connector-python = "*" -preql = "^0.2.12" +preql = "^0.2.13" snowflake-connector-python = "*" psycopg2 = "*"