Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 71 additions & 98 deletions dev/prepare_db.pql
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,73 @@ func run_sql(code) {
force_eval( SQL( nulltype, code ))
}

func drop_table(t) {
run_sql("DROP TABLE IF EXISTS " + t)
}

func create_indices(tbl) {
tbl.add_index("id", true)
tbl.add_index("timestamp")
tbl.add_index(["id", "timestamp"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these would be created as part of the table definition instead of built afterward. It's nicer if that's defined together

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I agree, but I'm not sure what's the best way to do it, syntax-wise.

Especially on (id, timestamp).

Also, does it mean that indexes are copied when you copy a table?

This just seemed like the simplest, most robust solution.

}

// Cleanup
func cleanup() {
run_sql("DROP TABLE IF EXISTS rating")
run_sql("DROP TABLE IF EXISTS tmp_rating")
run_sql("DROP TABLE IF EXISTS rating_del1")
run_sql("DROP TABLE IF EXISTS rating_update1")
run_sql("DROP TABLE IF EXISTS rating_update001p")
run_sql("DROP TABLE IF EXISTS rating_update1p")
run_sql("DROP TABLE IF EXISTS rating_del1p")
run_sql("DROP TABLE IF EXISTS rating_update50p")
drop_table("rating")
drop_table("tmp_rating")
drop_table("rating_del1")
drop_table("rating_update1")
drop_table("rating_update001p")
drop_table("rating_update1p")
drop_table("rating_del1p")
drop_table("rating_update50p")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should .drop on the table definition be a preql construct?

Copy link
Contributor Author

@erezsh erezsh May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already remove_table_if_exists(), which does the same thing. But it doesn't print out the SQL.

commit()
}

cleanup()

// Import CSV
if (db_type == "snowflake") {
print "Uploading ratings CSV"

run_sql("RM @~/ratings.csv.gz")
run_sql("PUT file://dev/ratings.csv @~")
if (db_type == "snowflake" or db_type == "redshift") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could we do to remove the Cloud databases as special-cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do something like db.is_cloud(), but that would only solve this specific if.

I don't see how we can get rid of db-specific code.

What we can do is have an separate module for each db, implementing import_sample_csv(), and have the main module call it. So at least the db-specific stuff will be separated.

if (db_type == "snowflake") {
print "Uploading ratings CSV"

run_sql("RM @~/ratings.csv.gz")
run_sql("PUT file://dev/ratings.csv @~")

print "Loading ratings CSV"

bare table tmp_rating {
userid: int
movieid: int
rating: float
timestamp: int
}

run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)")

} else if (db_type == "redshift") {
// NOTE: Requires that the csv already exists on s3 in the given path
print "Loading ratings CSV (already uploaded)"

table tmp_rating {
userid: int
movieid: int
rating: float
timestamp: int
}

run_sql("""
COPY "public"."tmp_rating" (userid, movieid, rating, timestamp)
FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv'
IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff'
DELIMITER ','
IGNOREHEADER 1;
""")

print "Loading ratings CSV"

table tmp_rating {
id: int
movieid: int
rating: float
timestamp: int
}

run_sql("COPY INTO tmp_rating FROM '@~/ratings.csv.gz' file_format=(skip_header=1)")

table rating {
id: int
id: int // explicit id, instead of identity type
userid: int
movieid: int
rating: float
Expand All @@ -47,40 +79,10 @@ if (db_type == "snowflake") {

run_sql("""
INSERT INTO rating(id, userid, movieid, rating, timestamp)
SELECT row_number() over (order by tmp_rating.id, movieid, timestamp) AS id, tmp_rating.id as userid, movieid, rating, timestamp FROM tmp_rating
SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp
FROM tmp_rating
""")

} else if (db_type == "redshift") {
// NOTE: Requires that the csv already exists on s3 in the given path
print "Loading ratings CSV (already uploaded)"

table tmp_rating {
userid: int
movieid: int
rating: float
timestamp: int
}

run_sql("""
COPY "public"."tmp_rating" (userid, movieid, rating, timestamp)
FROM 's3://dev-cf-redshift-datafold-data-diff/ml/ratings.csv'
IAM_ROLE 'arn:aws:iam::760878568205:role/dev-cf-redshift-data-diff'
DELIMITER ','
IGNOREHEADER 1;
""")

table rating {
id: int // explicit id, to avoid identity type
userid: int
movieid: int
rating: float
timestamp: int
}

run_sql("""
INSERT INTO rating(id, userid, movieid, rating, timestamp)
SELECT row_number() over (order by userid, movieid, timestamp) AS id, userid, movieid, rating, timestamp FROM tmp_rating
""")
} else if (db_type == "mssql") {
run_sql("drop table if exists tmp_rating")
run_sql("create table tmp_rating(userid int, movieid int, rating float, timestamp int)")
Expand All @@ -100,12 +102,10 @@ if (db_type == "snowflake") {
timestamp: int
}
import_csv(rating, 'dev/ratings.csv', true)
rating.add_index("id", true)
rating.add_index("timestamp")
run_sql("CREATE INDEX index_rating_id_timestamp ON rating (id, timestamp)")
create_indices(rating)
}

run_sql("DROP TABLE IF EXISTS tmp_rating")
drop_table("tmp_rating")
commit()

middle = count(rating) /~ 2
Expand All @@ -122,50 +122,23 @@ const table rating_del1p = rating
const table rating_update50p = rating

print "Create indexes"
if (db_type != "redshift" or db_type != "snowflake") {
rating_del1.add_index("id", true)
rating_del1.add_index("timestamp")
run_sql("CREATE INDEX index_rating_del1_id_timestamp ON rating_del1 (id, timestamp)")
rating_update1.add_index("id", true)
rating_update1.add_index("timestamp")
run_sql("CREATE INDEX index_rating_update1_id_timestamp ON rating_update1 (id, timestamp)")
rating_update001p.add_index("id", true)
rating_update001p.add_index("timestamp")
run_sql("CREATE INDEX index_rating_update001p_id_timestamp ON rating_update001p (id, timestamp)")
rating_update1p.add_index("id", true)
rating_update1p.add_index("timestamp")
run_sql("CREATE INDEX index_rating_update1p_id_timestamp ON rating_update1p (id, timestamp)")
rating_del1p.add_index("id", true)
rating_del1p.add_index("timestamp")
run_sql("CREATE INDEX index_rating_del1p_id_timestamp ON rating_del1p (id, timestamp)")
rating_update50p.add_index("id", true)
rating_update50p.add_index("timestamp")
run_sql("CREATE INDEX index_rating_update50p_id_timestamp ON rating_update50p (id, timestamp)")
commit()
}

create_indices(rating_del1)
create_indices(rating_update1)
create_indices(rating_update001p)
create_indices(rating_update1p)
create_indices(rating_del1p)
create_indices(rating_update50p)
commit()

print "Alter tables"
rating_del1[middle..(middle+1)] delete [true]
assert count(rating) == count(rating_del1) + 1
rating_update1[middle..(middle+1)] update {timestamp: timestamp + 1}

if (db_type == "postgres" or db_type == "redshift") {
run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE random() < 0.0001')
run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE random() < 0.01')
run_sql('DELETE FROM rating_del1p WHERE random() < 0.01')
run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE random() < 0.5')
} else if (db_type == "mysql" or db_type == "mssql") {
run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE rand() < 0.0001')
run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE rand() < 0.01')
run_sql('DELETE FROM rating_del1p WHERE rand() < 0.01')
run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE rand() < 0.5')
} else if (db_type == "snowflake") {
run_sql('UPDATE rating_update001p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.0001')
run_sql('UPDATE rating_update1p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.01')
run_sql('DELETE FROM rating_del1p WHERE uniform(0::float, 1, random()) < 0.01')
run_sql('UPDATE rating_update50p SET timestamp = (timestamp + 1) WHERE uniform(0::float, 1, random()) < 0.5')
} else {
print "Unsupported database: " + db_type
}
rating_update001p[random() < 0.0001] update {timestamp: timestamp + 1}
rating_update1p[random() < 0.01] update {timestamp: timestamp + 1}
rating_update50p[random() < 0.5] update {timestamp: timestamp + 1}
rating_del1p[random() < 0.01] delete [true]

commit()
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ runtype = "^0.2.4"
dsnparse = "*"
click = "^8.1"

preql = { version = "^0.2.12", optional = true }
preql = { version = "^0.2.13", optional = true }
psycopg2 = { version = "*", optional = true }
mysql-connector-python = { version = "*", optional = true}
snowflake-connector-python = { version = "*", optional = true }

[tool.poetry.dev-dependencies]
mysql-connector-python = "*"
preql = "^0.2.12"
preql = "^0.2.13"
snowflake-connector-python = "*"
psycopg2 = "*"

Expand Down