In [2]:
import pandas as pd
import os
from dotenv import load_dotenv
from pymongo import MongoClient, UpdateOne
from datetime import datetime

# 1. Setup Connection
load_dotenv()


True

In [3]:
username = os.getenv("MONGODB_USERNAME")
password = os.getenv("MONGODB_PASSWORD")
cluster_url = os.getenv("MONGODB_CLUSTER")
CONNECTION_STRING = f"mongodb+srv://{username}:{password}@{cluster_url}/"

client = MongoClient(CONNECTION_STRING)
db = client["aqi_prediction"]
fs = db["feature_store"]

In [11]:
cursor = fs.find({}, {"_id": 0})

# Convert to DataFrame
df = pd.DataFrame(list(cursor))

print(df.head())
print(df.shape)

      timestamp    year  month   day  hour  epa_aqi  pm2_5   pm10      co  \
0  1.760418e+09  2025.0   10.0  14.0  10.0      3.0  27.55  66.19  128.87   
1  1.760540e+09  2025.0   10.0  15.0  20.0      3.0  35.21  86.83  130.97   
2  1.760562e+09  2025.0   10.0  16.0   2.0      3.0  33.95  81.22  128.22   
3  1.760594e+09  2025.0   10.0  16.0  11.0      3.0  32.95  84.19  130.34   
4  1.760638e+09  2025.0   10.0  16.0  23.0      3.0  33.51  86.48  130.98   

    no2  ...   no2    so2     o3       temp_c   humidity_pct   pressure_hpa  \
0  0.16  ...    NaN     NaN      NaN      NaN            NaN            NaN   
1  0.20  ...    NaN     NaN      NaN      NaN            NaN            NaN   
2  0.12  ...    NaN     NaN      NaN      NaN            NaN            NaN   
3  0.20  ...    NaN     NaN      NaN      NaN            NaN            NaN   
4  0.18  ...    NaN     NaN      NaN      NaN            NaN            NaN   

    wind_speed_kmh   wind_dir_deg   rain_mm  solar_rad_wm2  
0

## Push CSV Data to MongoDB Feature Store

In [6]:
# Load CSV file
csv_file = '../backup/datav3.csv'  # Change to your CSV filename
data_df = pd.read_csv(csv_file)

print(f"Loaded {len(data_df)} records from {csv_file}")
print(f"Columns: {list(data_df.columns)}")
data_df.head()

Loaded 216 records from ../backup/datav3.csv
Columns: ['timestamp ', ' year', ' month', ' day', ' hour', ' epa_aqi', ' pm2_5 ', ' pm10  ', ' co    ', ' no2 ', ' so2  ', ' o3    ', ' temp_c', ' humidity_pct', ' pressure_hpa', ' wind_speed_kmh', ' wind_dir_deg', ' rain_mm', ' solar_rad_wm2']


Unnamed: 0,timestamp,year,month,day,hour,epa_aqi,pm2_5,pm10,co,no2,so2,o3,temp_c,humidity_pct,pressure_hpa,wind_speed_kmh,wind_dir_deg,rain_mm,solar_rad_wm2
0,1769367600,2026,1,26,0,3,31.22,74.37,302.3,1.58,5.43,127.28,14.7,46,1019.1,12.8,55,0.0,0.0
1,1769371200,2026,1,26,1,3,32.22,78.05,306.29,1.53,5.3,125.67,14.4,46,1018.4,13.7,55,0.0,0.0
2,1769374800,2026,1,26,2,3,33.06,80.82,307.62,1.44,5.09,125.51,14.2,46,1018.0,13.4,54,0.0,0.0
3,1769378400,2026,1,26,3,3,33.94,83.33,307.18,1.35,4.97,125.83,14.1,46,1017.6,13.3,55,0.0,0.0
4,1769382000,2026,1,26,4,3,35.59,87.68,310.05,1.29,5.06,126.75,13.9,46,1017.3,13.4,61,0.0,0.0


In [7]:
# Convert DataFrame to list of dictionaries (MongoDB documents)
records = data_df.to_dict('records')

# Optional: Add metadata or timestamps
for record in records:
    record['inserted_at'] = datetime.utcnow()
    # Handle NaN values (MongoDB doesn't like NaN)
    for key, value in list(record.items()):
        if pd.isna(value):
            record[key] = None

print(f"Prepared {len(records)} records for insertion")
print(f"\nSample record:")
print(records[0])

Prepared 216 records for insertion

Sample record:
{'timestamp ': 1769367600, ' year': 2026, ' month': 1, ' day': 26, ' hour': 0, ' epa_aqi': 3, ' pm2_5 ': 31.22, ' pm10  ': 74.37, ' co    ': 302.3, ' no2 ': 1.58, ' so2  ': 5.43, ' o3    ': 127.28, ' temp_c': 14.7, ' humidity_pct': 46, ' pressure_hpa': 1019.1, ' wind_speed_kmh': 12.8, ' wind_dir_deg': 55, ' rain_mm': 0.0, ' solar_rad_wm2': 0.0, 'inserted_at': datetime.datetime(2026, 2, 5, 14, 28, 37, 185645)}


  record['inserted_at'] = datetime.utcnow()


In [9]:
# Method 1: Insert all records (use if collection is empty or you want duplicates)
try:
    result = fs.insert_many(records)
    print(f"✓ Successfully inserted {len(result.inserted_ids)} records")
except Exception as e:
    print(f"✗ Error: {e}")

✓ Successfully inserted 216 records


In [8]:
# Method 2: Upsert (update if exists, insert if not) - prevents duplicates
# Use timestamp as unique key to avoid duplicate records
operations = []
for record in records:
    operations.append(
        UpdateOne(
            {'timestamp': record['timestamp']},  # Match by timestamp
            {'$set': record},
            upsert=True
        )
    )

try:
    result = fs.bulk_write(operations)
    print(f"✓ Upsert completed:")
    print(f"  - Inserted: {result.upserted_count}")
    print(f"  - Modified: {result.modified_count}")
    print(f"  - Matched: {result.matched_count}")
except Exception as e:
    print(f"✗ Error: {e}")

KeyError: 'timestamp'

In [10]:
# Verify the data was inserted
count = fs.count_documents({})
print(f"\nTotal documents in feature_store: {count}")

# Show a sample
sample = fs.find_one({}, {"_id": 0})
print(f"\nSample document:")
print(sample)


Total documents in feature_store: 2669

Sample document:
{'timestamp': 1760418000, 'year': 2025, 'month': 10, 'day': 14, 'hour': 10, 'epa_aqi': 3, 'pm2_5': 27.55, 'pm10': 66.19, 'co': 128.87, 'no2': 0.16, 'so2': 1.39, 'o3': 112.52, 'temp_c': 30.0, 'humidity_pct': 49, 'pressure_hpa': 1012.8, 'wind_speed_kmh': 8.4, 'wind_dir_deg': 315, 'rain_mm': 0.0, 'solar_rad_wm2': 418.0, 'inserted_at': datetime.datetime(2026, 1, 25, 7, 30, 56, 738000)}


## Optional: Create Index for Better Query Performance

In [None]:
# Create indexes for faster queries
# Index on timestamp (for time-based queries)
fs.create_index("timestamp", unique=True)
print("✓ Created index on 'timestamp'")

# Index on datetime if available (for date range queries)
if 'year' in data_df.columns and 'month' in data_df.columns:
    fs.create_index([("year", 1), ("month", 1), ("day", 1)])
    print("✓ Created compound index on year/month/day")

# List all indexes
print("\nCurrent indexes:")
for index in fs.list_indexes():
    print(f"  - {index}")