Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feature: finish adding tag_set to the database" #1514

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion convenience.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ reset_script_redis() {

start_local_services() {
echo "Starting local services..."
docker compose up -d db redis qdrant-database s3 s3-client keycloak keycloak-db
docker compose up -d db
docker compose up -d redis
docker compose up -d qdrant-database
docker compose up -d s3
docker compose up -d s3-client
docker compose up -d keycloak
docker compose up -d keycloak-db
}

# Main script logic
Expand Down
62 changes: 62 additions & 0 deletions server/fix_tag_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import psycopg2
import os
import dotenv
import uuid

dotenv.load_dotenv()
# Connect to the PostgreSQL database
# Get the PostgreSQL connection details from environment variables
conn = psycopg2.connect(
os.getenv("DATABASE_URL"),
)


# Create a cursor object to interact with the database
cur = conn.cursor()

# Retrieve the tag_set columns from chunk_metadata, chunk_group, and files tables
tables = ["chunk_metadata", "chunk_group", "files"]
for table in tables:
# Execute the query to retrieve the tag_set column
import psycopg2.extensions

lastBusiness_id = uuid.UUID(int=0)

while True:
# Execute the query to retrieve the tag_set column
cur.execute(
f"SELECT * FROM {table} WHERE id > (%s)::uuid ORDER BY id LIMIT 1000",
(str(lastBusiness_id),),
)

# Fetch the first 10000 rows from the result set
rows = cur.fetchmany(1000)

if not rows:
break

# Convert the tag_set field from a comma-separated string to an array within PostgreSQL
for row in rows:
tag_set = row[5]
if tag_set:
tag_set_array = tag_set.split(",")
tag_set = tag_set.replace("'", "\\'")
cur.execute(
f"UPDATE {table} SET tag_set_array = %s WHERE tag_set = (E'%s')::text",
(
tag_set_array,
psycopg2.extensions.AsIs(tag_set),
),
)

# Fetch the next 10000 rows from the result set

lastRecord = rows[-1]
lastBusiness_id = lastRecord[0]

# Commit the changes to the database
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()
4 changes: 0 additions & 4 deletions server/migrations/2024-05-21-192317_drop_tag_set/down.sql

This file was deleted.

11 changes: 0 additions & 11 deletions server/migrations/2024-05-21-192317_drop_tag_set/up.sql

This file was deleted.

20 changes: 15 additions & 5 deletions server/src/bin/ingestion-worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::NaiveDateTime;
use dateparser::DateTimeUtc;
use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
use itertools::{izip, Itertools};
use itertools::izip;
use qdrant_client::qdrant::{PointStruct, Vector};
use sentry::{Hub, SentryFutureExt};
use signal_hook::consts::SIGTERM;
Expand Down Expand Up @@ -413,7 +413,11 @@ pub async fn bulk_upload_chunks(
.qdrant_point_id
.unwrap_or(uuid::Uuid::new_v4());

let chunk_tag_set = message.chunk.tag_set.clone();
let chunk_tag_set = message
.chunk
.tag_set
.clone()
.map(|tag_set| tag_set.join(","));

let timestamp = {
message
Expand All @@ -439,7 +443,7 @@ pub async fn bulk_upload_chunks(
qdrant_point_id: Some(qdrant_point_id),
created_at: chrono::Utc::now().naive_local(),
updated_at: chrono::Utc::now().naive_local(),
tag_set: chunk_tag_set.map(|tag| tag.into_iter().map(Some).collect_vec()),
tag_set: chunk_tag_set,
chunk_html: message.chunk.chunk_html.clone(),
metadata: message.chunk.metadata.clone(),
tracking_id: chunk_tracking_id,
Expand All @@ -452,6 +456,7 @@ pub async fn bulk_upload_chunks(
.image_urls
.clone()
.map(|urls| urls.into_iter().map(Some).collect()),
tag_set_array: None,
};

(
Expand Down Expand Up @@ -629,7 +634,11 @@ async fn upload_chunk(
false => payload.chunk.chunk_html.clone().unwrap_or_default(),
};

let chunk_tag_set = payload.chunk.tag_set.clone();
let chunk_tag_set = payload
.chunk
.tag_set
.clone()
.map(|tag_set| tag_set.join(","));

let chunk_tracking_id = payload
.chunk
Expand Down Expand Up @@ -659,7 +668,7 @@ async fn upload_chunk(
qdrant_point_id: Some(qdrant_point_id),
created_at: chrono::Utc::now().naive_local(),
updated_at: chrono::Utc::now().naive_local(),
tag_set: chunk_tag_set.map(|tag| tag.into_iter().map(Some).collect_vec()),
tag_set: chunk_tag_set,
chunk_html: payload.chunk.chunk_html.clone(),
metadata: payload.chunk.metadata.clone(),
tracking_id: chunk_tracking_id,
Expand All @@ -671,6 +680,7 @@ async fn upload_chunk(
.chunk
.image_urls
.map(|urls| urls.into_iter().map(Some).collect()),
tag_set_array: None,
};

let embedding_vector = if let Some(embedding_vector) = payload.chunk.chunk_vector.clone() {
Expand Down
Loading
Loading