In [1]:
!pip install requests pandas -q
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib -q
!python -m pip install "pymongo[srv]" -q



[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/13.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/13.2 MB[0m [31m85.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━[0m [32m7.0/13.2 MB[0m [31m103.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━[0m [32m10.9/13.2 MB[0m [31m107.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m13.2/13.2 MB[0m [31m112.7 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m13.2/13.2 MB[0m [31m112.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.2/13.2 MB[0m [31m65.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m15.7 MB/s[0m 

# Create directory structure




In [2]:
import os

# Base directory
base_dir = "ETL_Pipeline_Sabih_DS-59"

# Directory structure
structure = {
    "": ["etl_pipeline.py", "scheduler.py", "requirements.txt", "README.md", "load_to_db.py", "report.pdf"],
    "config": ["db_config.json"],
    "data": ["sample_data.csv", "weather_data.json", "google_sheet_sample.csv", 'weather_data.db'],
    "output": ["final_cleaned_data.csv"],
    ".github/workflows": ["ci_cd.yml"]
}

# Create directories and files
for folder, files in structure.items():
    dir_path = os.path.join(base_dir, folder)
    os.makedirs(dir_path, exist_ok=True)
    for file in files:
        file_path = os.path.join(dir_path, file)
        with open(file_path, "w") as f:
            pass  # Creates an empty file

print(f"Directory structure created under '{base_dir}'")


Directory structure created under 'ETL_Pipeline_Sabih_DS-59'


# Create Dummy data from openWeather

In [3]:
# !rm -rf /content/ETL_Pipeline_Sabih_DS-59/data

In [4]:
import requests
import pandas as pd
import sqlite3
from datetime import datetime
import os
import csv
import sqlite3
import json

# --- CONFIGURATION ---

API_KEY = 'cdc23585344f54d1d00caef6a3cffb60'  # Replace with your real API key
CITY = 'London'
CSV_FILE = '/content/ETL_Pipeline_Sabih_DS-59/data/sample_data.csv'
SQLITE_DB_FILE = '/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.db'
JSON_File = '/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.json'


def fetch_forecast(city):
    # Use the forecast endpoint (5 day / 3 hour forecast)
    url = f'http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={API_KEY}&units=metric'
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception(f"Error fetching weather data: {response.status_code}")

    data = response.json()

    # Check if the response contains an error code
    if data.get("cod") != "200":
        error_message = data.get("message", "Unknown error")
        raise Exception(f"Error fetching weather data: {error_message}")

    # Extract forecast entries with temperature, humidity, and wind speed
    forecast_entries = []
    for entry in data.get('list', []):
        try:
            # Convert Unix timestamp to human-readable format
            timestamp = datetime.utcfromtimestamp(entry['dt']).strftime('%Y-%m-%d %H:%M:%S')
            temperature = entry['main']['temp']
            humidity = entry['main']['humidity']
            wind_speed = entry['wind']['speed']

            forecast_entries.append({
                'timestamp': timestamp,
                'temperature': temperature,
                'humidity': humidity,
                'wind_speed': wind_speed,
                'city': city
            })
        except KeyError as e:
            print(f"Key error {e} in entry: {entry}")

    return forecast_entries


# --- STEP 2: Save to CSV ---

def create_weather_csv(data, filename='weather_data.csv'):
    # Ensure the directory exists
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    # Open the file in write mode
    with open(filename, mode='w', newline='') as file:
        writer = csv.writer(file)

        # Write the header with an additional column for Fahrenheit
        writer.writerow(['timestamp', 'temperature_celsius', 'temperature_fahrenheit', 'humidity', 'wind_speed'])

        # Write the data with both Celsius and Fahrenheit
        for entry in data:
            temperature_fahrenheit = (entry['temperature'] * 9/5) + 32  # Convert to Fahrenheit
            writer.writerow([entry['timestamp'], entry['temperature'],
                             temperature_fahrenheit, entry['humidity'], entry['wind_speed'],])

    print(f"Data saved as {filename}.")


# --- STEP 3: Save to SQLite ---

def create_and_insert_weather_data(data):
    # Connect to SQLite database (it will create the file if it doesn't exist)
    conn = sqlite3.connect(SQLITE_DB_FILE)
    cursor = conn.cursor()

    # Create table with timestamp, temperature, and temperature_fahrenheit columns if it doesn't exist
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS weather_data (
        timestamp TEXT,
        temperature_celsius REAL,
        temperature_fahrenheit REAL,
        humidity REAL,
        wind_speed REAL)
    ''')

    # Insert data into the table
    for record in data:
        temperature_fahrenheit = (record['temperature'] * 9/5) + 32  # Convert to Fahrenheit
        cursor.execute('''
        INSERT INTO weather_data (timestamp, temperature_celsius, temperature_fahrenheit, humidity, wind_speed)
        VALUES (?, ?, ?, ?, ?)
        ''', (record['timestamp'], record['temperature'], temperature_fahrenheit,
              record['humidity'], record['wind_speed']))

    # Commit the changes and close the connection
    conn.commit()

    # Optional: Verify by fetching all rows from the table
    cursor.execute('SELECT * FROM weather_data')
    rows = cursor.fetchall()

    # Print the data (optional step for verification)
    for row in rows:
        print(row)

    # Close the connection
    conn.close()



def save_temperature_data(data, filename):
    # Filter data to include timestamp, temperature in Celsius, and temperature in Fahrenheit
    filtered_data = [{'timestamp': entry['timestamp'],
                      'temperature_celsius': entry['temperature'],
                      'temperature_fahrenheit': (entry['temperature'] * 9/5) + 32}
                     for entry in data]

    # Save to JSON file
    with open(filename, 'w') as json_file:
        json.dump(filtered_data, json_file, indent=4)

    print(f"Data saved as {filename}.")



# --- MAIN ---

if __name__ == '__main__':
    weather_data = fetch_forecast(CITY)
    print("Weather data:", weather_data)

    create_weather_csv(weather_data, CSV_FILE)
    create_and_insert_weather_data(weather_data)
    save_temperature_data(weather_data, JSON_File)




Weather data: [{'timestamp': '2025-04-10 12:00:00', 'temperature': 11.55, 'humidity': 64, 'wind_speed': 1.84, 'city': 'London'}, {'timestamp': '2025-04-10 15:00:00', 'temperature': 12.74, 'humidity': 59, 'wind_speed': 2.03, 'city': 'London'}, {'timestamp': '2025-04-10 18:00:00', 'temperature': 13.17, 'humidity': 56, 'wind_speed': 1.63, 'city': 'London'}, {'timestamp': '2025-04-10 21:00:00', 'temperature': 11.06, 'humidity': 70, 'wind_speed': 0.92, 'city': 'London'}, {'timestamp': '2025-04-11 00:00:00', 'temperature': 7.68, 'humidity': 79, 'wind_speed': 1.44, 'city': 'London'}, {'timestamp': '2025-04-11 03:00:00', 'temperature': 9.5, 'humidity': 83, 'wind_speed': 1.46, 'city': 'London'}, {'timestamp': '2025-04-11 06:00:00', 'temperature': 8.98, 'humidity': 82, 'wind_speed': 1.59, 'city': 'London'}, {'timestamp': '2025-04-11 09:00:00', 'temperature': 13.78, 'humidity': 47, 'wind_speed': 2.13, 'city': 'London'}, {'timestamp': '2025-04-11 12:00:00', 'temperature': 18.43, 'humidity': 35, 'w

# Get data from google sheets

In [5]:
# Replace with your actual spreadsheet ID and sheet ID (GID)
spreadsheet_id = "1DIXLTQfPB76206gklGInqhRzg5KW_uHbGXmwkBVg56k"
sheet_id = "1229579343"  # Sheet ID (GID)

# Construct the export URL for CSV format
# url = f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}/export?format=csv&gid={sheet_id}"
url = "https://docs.google.com/spreadsheets/d/1W0n624gcU4A589YJWi_4YhyDNAFjeHqYdoUU-nLMbQk/export?format=csv&gid=1753712753#gid=1753712753"

# Send a GET request to the URL
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Save to a local file
    print(response.content)
    with open('/content/ETL_Pipeline_Sabih_DS-59/data/google_sheet_sample.csv', 'wb') as f:
        f.write(response.content)
else:
    print(f"Failed to download the CSV file. Status code: {response.status_code}")


b'timestamp,temperature_celsius,temperature_fahrenheit,humidity,wind_speed\r\n2025-04-09 18:00:00,13.6,56.48,57,3.78\r\n2025-04-09 21:00:00,11.8,53.24,60,2.96\r\n2025-04-10 0:00:00,7.81,12999999999,,2.75\r\n2025-04-10 3:00:00,3.32,37.976,82,2.1\r\n2025-04-10 6:00:00,2.88,37.184,84,1.55\r\n2025-04-10 9:00:00,10.34,50.612,64,2.42\r\n2025-04-10 12:00:00,15.36,59.648,47,2.29\r\n2025-04-10 15:00:00,17.32,63.176,42,2.24\r\n2025-04-10 18:00:00,15.75,60.35,50,1.79\r\n2025-04-10 21:00:00,12.43,,67,0.96\r\n2025-04-11 0:00:00,10.74,51.332,74,1.2\r\n2025-04-11 3:00:00,9.57,49.226,81,1.43\r\n2025-04-11 6:00:00,8.9,48.02,78,1.48\r\n2025-04-11 9:00:00,13.68,56.624,53,1.98\r\n2025-04-11 12:00:00,18.44,65.192,39,1.33\r\n2025-04-11 15:00:00,20.3,68.54,34,1.58\r\n2025-04-11 18:00:00,18.56,65.408,44,2.36\r\n2025-04-11 21:00:00,13.98,57.164,66,1.81\r\n2025-04-12 0:00:00,12.05,53.69,79,\r\n2025-04-12 3:00:00,-111111111111111,51.206,80,1.6\r\n2025-04-12 6:00:00,9.94,49.892,69,1.63\r\n2025-04-12 9:00:00,15.25

# Verify Data in MYSQL file

In [6]:
import sqlite3

# Replace with the path to your .db file
db_path = '/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.db'

# Connect to the SQLite database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Query to select all data from the table
cursor.execute("SELECT * FROM weather_data")

# Fetch all rows
rows = cursor.fetchall()

# Print each row
for row in rows:
    print(row)

# Close the connection
conn.close()


('2025-04-10 12:00:00', 11.55, 52.79, 64.0, 1.84)
('2025-04-10 15:00:00', 12.74, 54.932, 59.0, 2.03)
('2025-04-10 18:00:00', 13.17, 55.706, 56.0, 1.63)
('2025-04-10 21:00:00', 11.06, 51.908, 70.0, 0.92)
('2025-04-11 00:00:00', 7.68, 45.824, 79.0, 1.44)
('2025-04-11 03:00:00', 9.5, 49.1, 83.0, 1.46)
('2025-04-11 06:00:00', 8.98, 48.164, 82.0, 1.59)
('2025-04-11 09:00:00', 13.78, 56.804, 47.0, 2.13)
('2025-04-11 12:00:00', 18.43, 65.174, 35.0, 1.67)
('2025-04-11 15:00:00', 20.41, 68.738, 29.0, 1.02)
('2025-04-11 18:00:00', 18.37, 65.066, 36.0, 2.42)
('2025-04-11 21:00:00', 13.75, 56.75, 68.0, 1.8)
('2025-04-12 00:00:00', 11.89, 53.402, 79.0, 1.28)
('2025-04-12 03:00:00', 10.5, 50.9, 76.0, 1.78)
('2025-04-12 06:00:00', 9.67, 49.406, 74.0, 2.25)
('2025-04-12 09:00:00', 14.54, 58.172, 55.0, 3.74)
('2025-04-12 12:00:00', 18.89, 66.002, 38.0, 4.34)
('2025-04-12 15:00:00', 18.61, 65.498, 45.0, 5.16)
('2025-04-12 18:00:00', 16.1, 60.980000000000004, 61.0, 2.92)
('2025-04-12 21:00:00', 12.89, 55

# Data Analysis

In [7]:
import pandas as pd
import sqlite3

# 1. Load Data from JSON
json_data = pd.read_json('/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.json')

# 2. Load Data from CSVs
csv_data1 = pd.read_csv('/content/ETL_Pipeline_Sabih_DS-59/data/google_sheet_sample.csv')
csv_data2 = pd.read_csv('/content/ETL_Pipeline_Sabih_DS-59/data/sample_data.csv')

# 3. Load Data from SQLite Database
# Connect to SQLite database (replace with your actual DB path and table)
conn = sqlite3.connect('/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.db')
sqlite_data = pd.read_sql('SELECT timestamp, temperature_celsius, temperature_fahrenheit FROM weather_data', conn)
conn.close()

# 4. Combine the Data
# Assuming each dataset has 'timestamp', 'temperature_celsius', 'temperature_fahrenheit'
combined_data = pd.concat([json_data, csv_data1, csv_data2, sqlite_data], ignore_index=True)

# 5. Optional: Convert timestamp column to datetime (if needed)
combined_data['timestamp'] = pd.to_datetime(combined_data['timestamp'])

# 6. Optional: Sort by timestamp
combined_data.sort_values(by='timestamp', inplace=True)
combined_data.reset_index(drop=True, inplace=True)

# 7. Print the final combined DataFrame
combined_data.head()


Unnamed: 0,timestamp,temperature_celsius,temperature_fahrenheit,humidity,wind_speed
0,2025-04-09 18:00:00,13.6,56.48,57.0,3.78
1,2025-04-09 21:00:00,11.8,53.24,60.0,2.96
2,2025-04-10 00:00:00,7.81,13000000000.0,,2.75
3,2025-04-10 03:00:00,3.32,37.976,82.0,2.1
4,2025-04-10 06:00:00,2.88,37.184,84.0,1.55


In [8]:
combined_data.head()

Unnamed: 0,timestamp,temperature_celsius,temperature_fahrenheit,humidity,wind_speed
0,2025-04-09 18:00:00,13.6,56.48,57.0,3.78
1,2025-04-09 21:00:00,11.8,53.24,60.0,2.96
2,2025-04-10 00:00:00,7.81,13000000000.0,,2.75
3,2025-04-10 03:00:00,3.32,37.976,82.0,2.1
4,2025-04-10 06:00:00,2.88,37.184,84.0,1.55


In [9]:
combined_data.describe()

Unnamed: 0,timestamp,temperature_celsius,temperature_fahrenheit,humidity,wind_speed
count,160,159.0,158.0,79.0,79.0
mean,2025-04-12 18:00:00,-698812000000.0,-6246835000.0,62.177215,-1263.635696
min,2025-04-09 18:00:00,-111111100000000.0,-1000000000000.0,-12.0,-99999.0
25%,2025-04-11 12:00:00,10.26,50.504,48.5,1.43
50%,2025-04-12 18:00:00,12.43,54.302,66.0,1.81
75%,2025-04-14 00:00:00,15.305,59.5985,77.0,2.815
max,2025-04-15 09:00:00,20.41,13000000000.0,95.0,5.55
std,,8811684000000.0,79569040000.0,19.271847,11251.014338


### ETL Process 🚀

1. **Refresh and Collect Data** 🔄📊  
   Collect the latest data to ensure freshness.

2. **Extract and Combine Data Frames** 🧑‍💻🔗  
   Extract individual datasets and combine them into a unified data frame.

3. **Cleaning Steps** 🧹  
   - Remove outliers 🏃‍♂️💨  
   - Replace null values with column mean 💡🔢  
   - Remove duplicate entries ⚠️  
   - Feature engineer weather impact score 🌦️➡️📊  
   - Round off values to 3 decimal places 🔢🔴

4. **Save Cleaned Data** 💾🗂️  
   Save the cleaned data as a `.json` file for easy storage.

5. **Load Cleaned Data to Database** 🚚📥  
   Call the load function to upload the cleaned data to MongoDB Atlas.

In [10]:
# Define the Python code as a string
python_code = """
import numpy as np
import pandas as pd
import requests
import csv
import sqlite3
from datetime import datetime
import os
import load_to_db


def fetch_forecast(city):
    # Use the forecast endpoint (5 day / 3 hour forecast)
    print("fetching Data from openWeather API")
    API_KEY = 'cdc23585344f54d1d00caef6a3cffb60'  # Replace with your real API key
    url = f'http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={API_KEY}&units=metric'
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception(f"Error fetching weather data: {response.status_code}")

    data = response.json()

    # Check if the response contains an error code
    if data.get("cod") != "200":
        error_message = data.get("message", "Unknown error")
        raise Exception(f"Error fetching weather data: {error_message}")

    # Extract forecast entries with temperature, humidity, and wind speed
    forecast_entries = []
    for entry in data.get('list', []):
        try:
            # Convert Unix timestamp to human-readable format
            timestamp = datetime.utcfromtimestamp(entry['dt']).strftime('%Y-%m-%d %H:%M:%S')
            temperature = entry['main']['temp']
            humidity = entry['main']['humidity']
            wind_speed = entry['wind']['speed']

            forecast_entries.append({
                'timestamp': timestamp,
                'temperature': temperature,
                'humidity': humidity,
                'wind_speed': wind_speed,
                'city': city
            })
        except KeyError as e:
            print(f"Key error {e} in entry: {entry}")

    return forecast_entries


def create_weather_csv(data, filename):
      # Ensure the directory exists
      os.makedirs(os.path.dirname(filename), exist_ok=True)

      # Open the file in write mode
      with open(filename, mode='w', newline='') as file:
          writer = csv.writer(file)

          # Write the header with an additional column for Fahrenheit
          writer.writerow(['timestamp', 'temperature_celsius', 'temperature_fahrenheit', 'humidity', 'wind_speed'])

          # Write the data with both Celsius and Fahrenheit
          for entry in data:
              temperature_fahrenheit = (entry['temperature'] * 9/5) + 32  # Convert to Fahrenheit
              writer.writerow([entry['timestamp'], entry['temperature'],
                               temperature_fahrenheit, entry['humidity'], entry['wind_speed'],])

      print(f"Data saved as {filename}.")

def getGoogleSheetsData():
    # Replace with your actual spreadsheet ID and sheet ID (GID)
    spreadsheet_id = "1DIXLTQfPB76206gklGInqhRzg5KW_uHbGXmwkBVg56k"
    sheet_id = "1229579343"  # Sheet ID (GID)

    # Construct the export URL for CSV format
    # url = f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}/export?format=csv&gid={sheet_id}"
    url = "https://docs.google.com/spreadsheets/d/1W0n624gcU4A589YJWi_4YhyDNAFjeHqYdoUU-nLMbQk/export?format=csv&gid=1753712753#gid=1753712753"

    # Send a GET request to the URL
    response = requests.get(url)

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Save to a local file
        print(response.content)
        with open('/content/ETL_Pipeline_Sabih_DS-59/data/google_sheet_sample.csv', 'wb') as f:
            f.write(response.content)
    else:
        print(f"Failed to download the CSV file. Status code: {response.status_code}")


def refreshData():
    weather_data = fetch_forecast('London')
    create_weather_csv(weather_data, "/content/ETL_Pipeline_Sabih_DS-59/data/sample_data.csv")
    getGoogleSheetsData()



def extractData():
    # 1. Load Data from JSON
    json_data = pd.read_json('/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.json')

    # 2. Load Data from CSVs
    csv_data1 = pd.read_csv('/content/ETL_Pipeline_Sabih_DS-59/data/google_sheet_sample.csv')
    csv_data2 = pd.read_csv('/content/ETL_Pipeline_Sabih_DS-59/data/sample_data.csv')

    # 3. Load Data from SQLite Database
    # Connect to SQLite database (replace with your actual DB path and table)
    conn = sqlite3.connect('/content/ETL_Pipeline_Sabih_DS-59/data/weather_data.db')
    sqlite_data = pd.read_sql('SELECT timestamp, temperature_celsius, temperature_fahrenheit FROM weather_data', conn)
    conn.close()

    # 4. Combine the Data
    # Assuming each dataset has 'timestamp', 'temperature_celsius', 'temperature_fahrenheit'
    combined_data = pd.concat([json_data, csv_data1, csv_data2, sqlite_data], ignore_index=True)

    # 5. Optional: Convert timestamp column to datetime (if needed)
    combined_data['timestamp'] = pd.to_datetime(combined_data['timestamp'])

    # 6. Optional: Sort by timestamp
    combined_data.sort_values(by='timestamp', inplace=True)
    combined_data.reset_index(drop=True, inplace=True)

    # 7. Print the final combined DataFrame
    return combined_data


def remove_outliers_and_impute_mean(df, col):

    # For a given DataFrame column, detects outliers using the IQR method,
    # replaces them with NaN, and then fills all NaN values with the column mean.

    # Calculate Q1, Q3 and IQR using non-missing values
    Q1 = df[col].quantile(0.25)
    Q3 = df[col].quantile(0.75)
    IQR = Q3 - Q1

    # Define acceptable range based on the IQR rule
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Debug print statements (optional)
    print(f"{col} - Q1: {Q1:.2f}, Q3: {Q3:.2f}, IQR: {IQR:.2f}")
    print(f"{col} - Lower bound: {lower_bound:.2f}, Upper bound: {upper_bound:.2f}")

    # Replace outliers (non-missing) with NaN. Missing values remain NaN.
    mask_outliers = ((df[col] < lower_bound) | (df[col] > upper_bound)) & (df[col].notna())
    df.loc[mask_outliers, col] = np.nan

    # After marking outliers as missing, compute the mean of the non-missing (non-outlier) values.
    mean_val = df[col].mean()
    print(f"{col} - Mean (computed from non-outliers): {mean_val:.2f}")

    # Fill missing values with the computed mean.
    df[col].fillna(mean_val, inplace=True)


def weatherImpactScore(df):
    df_norm = df.copy()
    df_norm['temperature_celsius'] = (df['temperature_celsius'] - df['temperature_celsius'].min()) / (df['temperature_celsius'].max() - df['temperature_celsius'].min())
    df_norm['humidity'] = (df['humidity'] - df['humidity'].min()) / (df['humidity'].max() - df['humidity'].min())
    df_norm['wind_speed'] = (df['wind_speed'] - df['wind_speed'].min()) / (df['wind_speed'].max() - df['wind_speed'].min())

    # Define weights (adjustable)
    temp_weight = 0.4
    humidity_weight = 0.3
    wind_weight = 0.3

    # Calculate weather impact score
    df['weather_impact'] = (
        temp_weight * df_norm['temperature_celsius'] +
        humidity_weight * df_norm['humidity'] +
        wind_weight * df_norm['wind_speed']) * 100  # scale to a 0–100 score


def cleanData(df):
    # Process each column separately.
    remove_outliers_and_impute_mean(df, 'temperature_celsius')
    remove_outliers_and_impute_mean(df, 'temperature_fahrenheit')
    remove_outliers_and_impute_mean(df, 'humidity')
    remove_outliers_and_impute_mean(df, 'wind_speed')


    weatherImpactScore(df)

    # Round all values in the DataFrame to 3 decimal places
    df = df.round(3)

    df.drop_duplicates(subset='timestamp', keep='last')
    df.to_json('/content/ETL_Pipeline_Sabih_DS-59/data/cleaned_data.json', orient='records')



def main():

    ### Extract
    refreshData()
    combined_data = extractData()

    ### Transform
    cleanData(combined_data) # data cleaning and saving as .json file

    ### Load
    load_to_db.main()

"""

# Open (or create) a Python file to write the code
with open('/content/ETL_Pipeline_Sabih_DS-59/etl_pipeline.py', 'w') as f:
    f.write(python_code)

print("Python code has been written to 'etl_pipeline.py'")


Python code has been written to 'etl_pipeline.py'


### Load Data to MongoDB 🌐

1. **Establish MongoDB Connection** 🔗  
   Using the `MongoClient`, we connect to MongoDB Atlas with the provided URI.

2. **Load Cleaned Data** 📂  
   - Open the cleaned `.json` file and load the data into a Python dictionary.
   
3. **Insert Data into MongoDB** 🗄️  
   - Insert the data into the `temperature_readings` collection of the `weather_data` database.  
   - If the database or collection doesn’t exist, they will be created automatically.

4. **Error Handling** ⚠️  
   - Catch and display errors for file not found, invalid JSON, or unexpected issues.

In [11]:
# Define the Python code as a string
python_code = """
import json


from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = "mongodb+srv://sabih562:MM7mY0pZ2nixoBM6@cluster0.1hrnt5z.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))


def load_to_db():
    file_path = '/content/ETL_Pipeline_Sabih_DS-59/data/cleaned_data.json'
    try:
        # Open the file and load the data as a dictionary
        with open(file_path, 'r') as file:
            data = json.load(file)

        db = client['weather_data']  # 'weather_data' will be created if it doesn't exist

        # Access (or create) the collection within that database
        collection = db['temperature_readings']  # 'temperature_readings' will be created if it doesn't exist

        collection.insert_many(data)

    except FileNotFoundError:
        print(f"Error: The file at {file_path} was not found.")
    except json.JSONDecodeError:
        print(f"Error: The file at {file_path} is not a valid JSON.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


def main():
  load_to_db()

"""

# Open (or create) a Python file to write the code
with open('/content/ETL_Pipeline_Sabih_DS-59/load_to_db.py', 'w') as f:
    f.write(python_code)

print("Python code has been written to 'load_to_db.py'")


Python code has been written to 'load_to_db.py'


### Scheduler for Automated ETL Runs 🕒

1. **Scheduler Function** ⏲️  
   - The `scheduler()` function repeatedly calls the ETL pipeline (`etl_pipeline.main()`) at the specified time interval (in seconds).  
   - After each execution, it waits for the defined interval before running again.

2. **Start Scheduler in a New Thread** 🧵  
   - `start_scheduler()` starts the scheduler in a new thread, allowing the main program to continue running while the scheduler operates in the background.  
   - The thread is set to daemon, meaning it will terminate when the main program ends.

3. **Run Scheduler at Defined Intervals** 🔁  
   - The example sets the scheduler to run every 10 seconds (`start_scheduler(10)`). You can adjust this as needed (e.g., 300 seconds for 5 minutes).

4. **Keep Main Program Running** 🖥️  
   - The `while True` loop keeps the main program alive, so the scheduler continues to run indefinitely.


In [12]:
# Define the Python code as a string
python_code = """
import time
import threading
import etl_pipeline

# Scheduler to run the ETL function at intervals
def scheduler(interval_seconds):
    while True:
        etl_pipeline.main()  # Call the ETL function
        time.sleep(interval_seconds)  # Wait for the interval before calling again

# Function to start the scheduler in a new thread
def start_scheduler(interval_seconds):
    scheduler_thread = threading.Thread(target=scheduler, args=(interval_seconds,))
    scheduler_thread.daemon = True  # Daemon thread will exit when the main program exits
    scheduler_thread.start()

# Run the scheduler every 300 seconds (as an example)
if __name__ == "__main__":
    start_scheduler(10)  # Set the interval to 300 seconds(5 mins)

    # Keep the main program alive to let the scheduler run
    while True:
        time.sleep(1)
"""

# Open (or create) a Python file to write the code
with open('/content/ETL_Pipeline_Sabih_DS-59/scheduler.py', 'w') as f:
    f.write(python_code)

print("Python code has been written to 'scheduler.py'")


Python code has been written to 'scheduler.py'


# Start Pipeline

In [None]:
!python /content/ETL_Pipeline_Sabih_DS-59/scheduler.py

fetching Data from openWeather API
Data saved as /content/ETL_Pipeline_Sabih_DS-59/data/sample_data.csv.
b'timestamp,temperature_celsius,temperature_fahrenheit,humidity,wind_speed\r\n2025-04-09 18:00:00,13.6,56.48,57,3.78\r\n2025-04-09 21:00:00,11.8,53.24,60,2.96\r\n2025-04-10 0:00:00,7.81,12999999999,,2.75\r\n2025-04-10 3:00:00,3.32,37.976,82,2.1\r\n2025-04-10 6:00:00,2.88,37.184,84,1.55\r\n2025-04-10 9:00:00,10.34,50.612,64,2.42\r\n2025-04-10 12:00:00,15.36,59.648,47,2.29\r\n2025-04-10 15:00:00,17.32,63.176,42,2.24\r\n2025-04-10 18:00:00,15.75,60.35,50,1.79\r\n2025-04-10 21:00:00,12.43,,67,0.96\r\n2025-04-11 0:00:00,10.74,51.332,74,1.2\r\n2025-04-11 3:00:00,9.57,49.226,81,1.43\r\n2025-04-11 6:00:00,8.9,48.02,78,1.48\r\n2025-04-11 9:00:00,13.68,56.624,53,1.98\r\n2025-04-11 12:00:00,18.44,65.192,39,1.33\r\n2025-04-11 15:00:00,20.3,68.54,34,1.58\r\n2025-04-11 18:00:00,18.56,65.408,44,2.36\r\n2025-04-11 21:00:00,13.98,57.164,66,1.81\r\n2025-04-12 0:00:00,12.05,53.69,79,\r\n2025-04-12 3:0