diff --git a/dev/prepare_db.pql b/dev/prepare_db.pql index 4d4063c4..ee90bdf5 100644 --- a/dev/prepare_db.pql +++ b/dev/prepare_db.pql @@ -4,41 +4,73 @@ func run_sql(code) { force_eval( SQL( nulltype, code )) } +func drop_table(t) { + run_sql("DROP TABLE IF EXISTS " + t) +} + +func create_indices(tbl) { + tbl.add_index("id", true) + tbl.add_index("timestamp") + tbl.add_index(["id", "timestamp"]) +} + // Cleanup func cleanup() { - run_sql("DROP TABLE IF EXISTS rating") - run_sql("DROP TABLE IF EXISTS tmp_rating") - run_sql("DROP TABLE IF EXISTS rating_del1") - run_sql("DROP TABLE IF EXISTS rating_update1") - run_sql("DROP TABLE IF EXISTS rating_update001p") - run_sql("DROP TABLE IF EXISTS rating_update1p") - run_sql("DROP TABLE IF EXISTS rating_del1p") - run_sql("DROP TABLE IF EXISTS rating_update50p") + drop_table("rating") + drop_table("tmp_rating") + drop_table("rating_del1") + drop_table("rating_update1") + drop_table("rating_update001p") + drop_table("rating_update1p") + drop_table("rating_del1p") + drop_table("rating_update50p") commit() } cleanup() // Import CSV -if (db_type == "snowflake") { - print "Uploading ratings CSV" - - run_sql("RM @~/ratings.csv.gz") - run_sql("PUT file://dev/ratings.csv @~") +if (db_type == "snowflake" or db_type == "redshift") { + if (db_type == "snowflake") { + print "Uploading ratings CSV" + + run_sql("RM @~/ratings.csv.gz") + run_sql("PUT file://dev/ratings.csv @~") + + print "Loading ratings CSV" + + bare table tmp_rating { + userid: int + movieid: int + rating: float + timestamp: int + } + + run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)") + + } else if (db_type == "redshift") { + // NOTE: Requires that the csv already exists on s3 in the given path + print "Loading ratings CSV (already uploaded)" + + table tmp_rating { + userid: int + movieid: int + rating: float + timestamp: int + } + + run_sql(""" + COPY "public"."tmp_rating" (userid, movieid, rating, timestamp) + FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv' + IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff' + DELIMITER ',' + IGNOREHEADER 1; + """) - print "Loading ratings CSV" - - table tmp_rating { - id: int - movieid: int - rating: float - timestamp: int } - run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)") - table rating { - id: int + id: int // explicit id, instead of identity type userid: int movieid: int rating: float @@ -47,40 +79,10 @@ if (db_type == "snowflake") { run_sql(""" INSERT INTO rating(id, userid, movieid, rating, timestamp) - SELECT row_number() over (order by tmp_rating.id, movieid, timestamp) AS id, tmp_rating.id as userid, movieid, rating, timestamp FROM tmp_rating + SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp + FROM tmp_rating """) -} else if (db_type == "redshift") { - // NOTE: Requires that the csv already exists on s3 in the given path - print "Loading ratings CSV (already uploaded)" - - table tmp_rating { - userid: int - movieid: int - rating: float - timestamp: int - } - - run_sql(""" - COPY "public"."tmp_rating" (userid, movieid, rating, timestamp) - FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv' - IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff' - DELIMITER ',' - IGNOREHEADER 1; - """) - - table rating { - id: int // explicit id, to avoid identity type - userid: int - movieid: int - rating: float - timestamp: int - } - - run_sql(""" - INSERT INTO rating(id, userid, movieid, rating, timestamp) - SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp FROM tmp_rating - """) } else if (db_type == "mssql") { run_sql("drop table if exists tmp_rating") run_sql("create table tmp_rating(userid int, movieid int, rating float, timestamp int)") @@ -100,12 +102,10 @@ if (db_type == "snowflake") { timestamp: int } import_csv(rating, 'dev/ratings.csv', true) - rating.add_index("id", true) - rating.add_index("timestamp") - run_sql("CREATE INDEX index_rating_id_timestamp ON rating (id, timestamp)") + create_indices(rating) } -run_sql("DROP TABLE IF EXISTS tmp_rating") +drop_table("tmp_rating") commit() middle = count(rating) /~ 2 @@ -122,50 +122,23 @@ const table rating_del1p = rating const table rating_update50p = rating print "Create indexes" -if (db_type != "redshift" or db_type != "snowflake") { - rating_del1.add_index("id", true) - rating_del1.add_index("timestamp") - run_sql("CREATE INDEX index_rating_del1_id_timestamp ON rating_del1 (id, timestamp)") - rating_update1.add_index("id", true) - rating_update1.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update1_id_timestamp ON rating_update1 (id, timestamp)") - rating_update001p.add_index("id", true) - rating_update001p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update001p_id_timestamp ON rating_update001p (id, timestamp)") - rating_update1p.add_index("id", true) - rating_update1p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update1p_id_timestamp ON rating_update1p (id, timestamp)") - rating_del1p.add_index("id", true) - rating_del1p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_del1p_id_timestamp ON rating_del1p (id, timestamp)") - rating_update50p.add_index("id", true) - rating_update50p.add_index("timestamp") - run_sql("CREATE INDEX index_rating_update50p_id_timestamp ON rating_update50p (id, timestamp)") - commit() -} + +create_indices(rating_del1) +create_indices(rating_update1) +create_indices(rating_update001p) +create_indices(rating_update1p) +create_indices(rating_del1p) +create_indices(rating_update50p) +commit() print "Alter tables" rating_del1[middle..(middle+1)] delete [true] assert count(rating) == count(rating_del1) + 1 rating_update1[middle..(middle+1)] update {timestamp: timestamp + 1} -if (db_type == "postgres" or db_type == "redshift") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE random() < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE random() < 0.01') - run_sql('DELETE FROM rating_del1p WHERE random() < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE random() < 0.5') -} else if (db_type == "mysql" or db_type == "mssql") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE rand() < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE rand() < 0.01') - run_sql('DELETE FROM rating_del1p WHERE rand() < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE rand() < 0.5') -} else if (db_type == "snowflake") { - run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.0001') - run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.01') - run_sql('DELETE FROM rating_del1p WHERE uniform(0::float, 1, random()) < 0.01') - run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.5') -} else { - print "Unsupported database: " + db_type -} +rating_update001p[random() < 0.0001] update {timestamp: timestamp + 1} +rating_update1p[random() < 0.01] update {timestamp: timestamp + 1} +rating_update50p[random() < 0.5] update {timestamp: timestamp + 1} +rating_del1p[random() < 0.01] delete [true] commit() diff --git a/pyproject.toml b/pyproject.toml index 9430daa3..13efa882 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,14 +28,14 @@ runtype = "^0.2.4" dsnparse = "*" click = "^8.1" -preql = { version = "^0.2.12", optional = true } +preql = { version = "^0.2.13", optional = true } psycopg2 = { version = "*", optional = true } mysql-connector-python = { version = "*", optional = true} snowflake-connector-python = { version = "*", optional = true } [tool.poetry.dev-dependencies] mysql-connector-python = "*" -preql = "^0.2.12" +preql = "^0.2.13" snowflake-connector-python = "*" psycopg2 = "*"