In [14]:
# Importing Modules

In [1]:
pip install snowflake

Collecting snowflake
  Downloading snowflake-0.7.0-py3-none-any.whl (1.5 kB)
Collecting snowflake-core==0.7.0
  Downloading snowflake_core-0.7.0-py3-none-any.whl (330 kB)
     -------------------------------------- 330.1/330.1 kB 5.1 MB/s eta 0:00:00
Collecting snowflake-legacy
  Downloading snowflake_legacy-0.7.0-py3-none-any.whl (3.1 kB)
Collecting atpublic>=4
  Downloading atpublic-4.1.0-py3-none-any.whl (5.0 kB)
Collecting pydantic>=1.10.7
  Downloading pydantic-2.6.4-py3-none-any.whl (394 kB)
     -------------------------------------- 394.9/394.9 kB 8.2 MB/s eta 0:00:00
Collecting snowflake-snowpark-python<2.0.0,>=1.5.0
  Downloading snowflake_snowpark_python-1.14.0-py3-none-any.whl (419 kB)
     -------------------------------------- 419.7/419.7 kB 6.5 MB/s eta 0:00:00
Collecting annotated-types>=0.4.0
  Downloading annotated_types-0.6.0-py3-none-any.whl (12 kB)
Collecting pydantic-core==2.16.3
  Downloading pydantic_core-2.16.3-cp311-none-win_amd64.whl (1.9 MB)
     -----------


[notice] A new release of pip available: 22.3 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
# from dotenv import load_dotenv
import os
import pymongo
import snowflake.connector
import pandas as pd


In [2]:
# Load environment variables from .env file
# load_dotenv()

In [3]:
mongo_connection_string =  "mongodb+srv://finalproject:finalproject@finalproject.xa5ol.mongodb.net/"

In [4]:
import os
os.environ['Project'] = 'test'

try:
    # Connect to MongoDB Atlas
    mongo_client = pymongo.MongoClient(mongo_connection_string)
    mongo_db = mongo_client[os.environ['Project']]
    
    # Print connection success message
    print("Connected to MongoDB Atlas successfully!")

    # Now, you can perform further operations with mongo_client and mongo_db
except pymongo.errors.ConnectionFailure as e:
    # Print connection failure message
    print(f"Failed to connect to MongoDB Atlas: {e}")

Connected to MongoDB Atlas successfully!


In [5]:
try:
    # Connect to Snowflake using environment variables
    snowflake_conn = snowflake.connector.connect(
        user="Mithun",
        password="Mithun123!!",
        account="lz05490.central-india.azure",
        warehouse="COMPUTE_WH",
        database="TIMESHEET",
        schema="FAKEDATA",
        role = "ACCOUNTADMIN"
    )

    # Print connection success message
    print("Connected to Snowflake successfully!")

    # Now, you can perform further operations with snowflake_conn
except snowflake.connector.errors.DatabaseError as e:
    # Print connection failure message
    print(f"Failed to connect to Snowflake: {e}")

Connected to Snowflake successfully!


## Mongo to Stage

In [6]:
# Create raw_data folder if it doesn't exist
if not os.path.exists("staging_raw_data"):
    os.makedirs("staging_raw_data")

# Iterate over each collection
for collection_name in mongo_db.list_collection_names():
    # Retrieve data from collection
    collection_data = list(mongo_db[collection_name].find())
    
    # Convert data to DataFrame
    df = pd.DataFrame(collection_data)
    
    # Write DataFrame to CSV file
    csv_file_path = f"staging_raw_data/{collection_name}.csv"
    df.to_csv(csv_file_path, index=False)
    print(f"Data from collection '{collection_name}' written to '{csv_file_path}'")



Data from collection 'feedbacks' written to 'staging_raw_data/feedbacks.csv'
Data from collection 'users' written to 'staging_raw_data/users.csv'
Data from collection 'timesheets' written to 'staging_raw_data/timesheets.csv'
Data from collection 'projects' written to 'staging_raw_data/projects.csv'
Data from collection 'allocateprojects' written to 'staging_raw_data/allocateprojects.csv'
Data from collection 'projectallocation' written to 'staging_raw_data/projectallocation.csv'


In [7]:
# Close MongoDB connection
mongo_client.close()

## Ingest Into Snowflake

In [8]:
def sanitize_name(name):
    # Replace invalid characters with underscores
    return ''.join(c if c.isalnum() else '_' for c in name)
if not os.path.exists("staging_raw_data"):
    print("No data to process. Exiting.")
    exit()

# Iterate over each CSV file in the staging_raw_data folder
for filename in os.listdir("staging_raw_data"):
    if filename.endswith(".csv"):
        # Extract table name from filename (remove .csv extension) and sanitize it
        table_name = sanitize_name(os.path.splitext(filename)[0])
        
        # Read CSV file into DataFrame
        df = pd.read_csv(f"staging_raw_data/{filename}")
        
        # Replace NaN values with empty strings
        df = df.fillna('')
        
        # Convert all data to string
        df = df.astype(str)
        
        # Create table in Snowflake if it doesn't exist
        snowflake_cursor = snowflake_conn.cursor()
        
        # Drop the table if it exists
        snowflake_cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
        
        # Create the table
        create_table_query = f"CREATE TABLE {table_name} ("
        for column in df.columns:
            # Sanitize column names
            safe_column_name = sanitize_name(column)
            create_table_query += f'{safe_column_name} VARCHAR,'
        create_table_query = create_table_query[:-1] + ")"  # Remove trailing comma
        snowflake_cursor.execute(create_table_query)
        
        # Prepare INSERT INTO statement
        insert_query = f"INSERT INTO {table_name} VALUES ({','.join(['%s'] * len(df.columns))})"
        
        # Convert DataFrame to list of tuples (rows)
        rows = [tuple(row) for row in df.itertuples(index=False)]
        
        # Execute bulk insert
        snowflake_cursor.executemany(insert_query, rows)
        snowflake_cursor.close()
        
        print(f"Data from '{filename}' inserted into '{table_name}' table in Snowflake.")

# Commit the transaction
snowflake_conn.commit()

# Close Snowflake connection
snowflake_conn.close()

Data from 'allocateprojects.csv' inserted into 'allocateprojects' table in Snowflake.
Data from 'feedbacks.csv' inserted into 'feedbacks' table in Snowflake.
Data from 'projectallocation.csv' inserted into 'projectallocation' table in Snowflake.
Data from 'projects.csv' inserted into 'projects' table in Snowflake.
Data from 'timesheets.csv' inserted into 'timesheets' table in Snowflake.
Data from 'users.csv' inserted into 'users' table in Snowflake.
