In [None]:
import os
import boto3
import numpy as np
import pandas as pd
import yaml
import time
from tqdm import tqdm

from vectorgeo.transfer import download_file
from vectorgeo import constants as c
from qdrant_client import QdrantClient
from qdrant_client.http.models import PointStruct

CHECK_DELAY = 60

# Load secrets (adjust the path as necessary)
secrets = yaml.load(open('secrets.yml'), Loader=yaml.FullLoader)

# Initialize S3 client
s3 = boto3.client('s3', aws_access_key_id=secrets['aws_access_key_id'], aws_secret_access_key=secrets['aws_secret_access_key'])

# Initialize Qdrant client
qdrant_client = QdrantClient(
    url=secrets['qdrant_url'], 
    api_key=secrets['qdrant_api_key']
)

# Specify your bucket name and prefix
bucket_name = c.S3_BUCKET
prefix = 'vectors/'

# Create set of files already run
checked_keys = set()

while True:

    # List all Parquet files in the S3 bucket with the specified prefix

    objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    print(f"Found {len(objects['Contents'])} files in S3")

    # Filter out the files that have already been run
    objects['Contents'] = [
        obj for obj in objects['Contents']
        if obj['Key'] not in checked_keys and '.parquet' in obj['Key']
    ]
    print(f"Found {len(objects['Contents'])} files to run")

    for obj in objects['Contents']:
        print(f"...Downloading {obj['Key']} from S3")
        basename = os.path.basename(obj['Key'])
        local_path = os.path.join(c.TMP_DIR, basename)
        download_file(obj['Key'], local_path)
        
        # Load the data into a Pandas DataFrame
        df = pd.read_parquet(local_path)
        
        # Extract vectors and other necessary information
        print(f"...Uploading {obj['Key']} to Qdrant")
        
        for df_piece in tqdm(np.array_split(df, 100)):
        
            points = [
                PointStruct(
                    id=row['id'],
                    vector=row['vector'],
                    payload={"location": {"lon": row['lng'], "lat": row['lat']}}
                )
                for _, row in df_piece.iterrows()
            ]

            # Batch the vectors and upload them to Qdrant
            qdrant_client.upsert(
                collection_name=c.QDRANT_COLLECTION_NAME,
                wait=True,
                points=points
            )

        # Add the file to the set of files that have already been run
        checked_keys.add(obj['Key'])
    time.sleep(CHECK_DELAY)

Found 41 files in S3
Found 40 files to run
...Downloading vectors/vector-upload-1694210549.parquet from S3
File /home/ubuntu/vectorgeo/tmp/vector-upload-1694210549.parquet already exists; skipping download
...Uploading vectors/vector-upload-1694210549.parquet to Qdrant


100%|██████████| 100/100 [00:17<00:00,  5.64it/s]


...Downloading vectors/vector-upload-1694210589.parquet from S3
File /home/ubuntu/vectorgeo/tmp/vector-upload-1694210589.parquet already exists; skipping download
...Uploading vectors/vector-upload-1694210589.parquet to Qdrant


100%|██████████| 100/100 [00:20<00:00,  4.90it/s]


...Downloading vectors/vector-upload-1694210632.parquet from S3
File /home/ubuntu/vectorgeo/tmp/vector-upload-1694210632.parquet already exists; skipping download
...Uploading vectors/vector-upload-1694210632.parquet to Qdrant


 13%|█▎        | 13/100 [00:02<00:19,  4.50it/s]