In [1]:
pip install pymongo snowflake-connector-python

Collecting snowflake-connector-python
  Using cached snowflake_connector_python-3.14.0-cp312-cp312-win_amd64.whl.metadata (69 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Using cached asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting cryptography>=3.1.0 (from snowflake-connector-python)
  Using cached cryptography-44.0.2-cp39-abi3-win_amd64.whl.metadata (5.7 kB)
Collecting pyOpenSSL<26.0.0,>=22.0.0 (from snowflake-connector-python)
  Using cached pyOpenSSL-25.0.0-py3-none-any.whl.metadata (16 kB)
Collecting pyjwt<3.0.0 (from snowflake-connector-python)
  Using cached PyJWT-2.10.1-py3-none-any.whl.metadata (4.0 kB)
Collecting filelock<4,>=3.5 (from snowflake-connector-python)
  Downloading filelock-3.18.0-py3-none-any.whl.metadata (2.9 kB)
Collecting sortedcontainers>=2.4.0 (from snowflake-connector-python)
  Using cached sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tomlkit (from snowflake-connector-python)
  Usin

ERROR: Could not install packages due to an OSError: [WinError 2] The system cannot find the file specified: 'c:\\Python312\\Scripts\\snowflake-dump-certs.exe' -> 'c:\\Python312\\Scripts\\snowflake-dump-certs.exe.deleteme'


[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
import pymongo
from pprint import pprint
import snowflake.connector
from datetime import datetime

#MongoDB connection string
mongo_connection_string = "replace"

def get_reviews():
    """Fetches reviews from MongoDB."""
    try:
        client = pymongo.MongoClient(mongo_connection_string)
        db = client.alpha
        reviews_collection = db.reviews

        #Query all reviews
        all_reviews = list(reviews_collection.find())
        print(f"Found {len(all_reviews)} reviews")
        return all_reviews
    except pymongo.errors.ConnectionFailure as e:
        print(f"Could not connect to MongoDB: {e}")
    except pymongo.errors.OperationFailure as e:
        print(f"Authentication error: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if 'client' in locals():
            client.close()
            print("MongoDB connection closed")

def get_existing_ids_from_snowflake():
    """Fetches existing _id values from Snowflake."""
    try:
        #Connect to Snowflake
        conn = snowflake.connector.connect(
            user='Snowflake_Username',
            password='Snowflake_Password',
            account='AWS_ACCOUNT_ID',
            warehouse='COMPUTE_WH',
            database='DBFRONT',
            schema='TEST11'
        )
        cursor = conn.cursor()

        #Query all existing _id values
        cursor.execute("SELECT _id FROM test11_data")
        existing_ids = {row[0] for row in cursor.fetchall()}
        print(f"Found {len(existing_ids)} existing records in Snowflake")
        return existing_ids
    except snowflake.connector.errors.DatabaseError as e:
        print(f"Database error while fetching existing IDs: {e}")
        return set()
    except Exception as e:
        print(f"An error occurred while fetching existing IDs: {e}")
        return set()
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()
            print("Snowflake connection closed after fetching IDs")

def insert_into_snowflake(reviews):
    """Inserts MongoDB reviews into Snowflake, skipping duplicates."""
    #Get all existing IDs from Snowflake
    existing_ids = get_existing_ids_from_snowflake()

    #Filter out reviews that already exist in Snowflake
    new_reviews = [review for review in reviews if str(review.get('_id', 'NULL')) not in existing_ids]
    print(f"Filtered {len(reviews) - len(new_reviews)} duplicate records")

    if not new_reviews:
        print("No new records to insert")
        return

    try:
        #Connect to Snowflake
        conn = snowflake.connector.connect(
            user='RAVINDU001',
            password='Password',
            account='ABIBFXG-NG98667',
            warehouse='COMPUTE_WH',
            database='DBFRONT',
            schema='TEST11'
        )
        cursor = conn.cursor()

        #SQL insert statement
        insert_sql = """
        INSERT INTO test11_data (_id, age, civilState, createdAt, familyMembers, fuelType, gender,
                                 image, job, monthlyIncome, sector, updatedAt, vehicleBrand, vehicleType)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """

        #Prepare and insert each new review
        for review in new_reviews:
            values = (
                str(review.get('_id', 'NULL')),  #Convert ObjectId to string
                review.get('age', None),
                review.get('civilState', None),
                review.get('createdAt', datetime.now()),  #Default to now if missing
                review.get('familyMembers', None),
                review.get('fuelType', None),
                review.get('gender', None),
                review.get('image', None),  #Assuming it's a URL or base64
                review.get('job', None),
                review.get('monthlyIncome', None),
                review.get('sector', None),
                review.get('updatedAt', datetime.now()),  #Default to now if missing
                review.get('vehicleBrand', None),
                review.get('vehicleType', None)
            )
            cursor.execute(insert_sql, values)

        #Commit transaction
        conn.commit()
        print(f"Inserted {len(new_reviews)} new records into Snowflake.")
    except snowflake.connector.errors.DatabaseError as e:
        print(f"Database error: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()
            print("Snowflake connection closed.")

if __name__ == "__main__":
    reviews_data = get_reviews()
    if reviews_data:
        insert_into_snowflake(reviews_data)
