In [122]:
import requests
import pandas as pd
from datetime import datetime
import logging

In [165]:
#configurations

api_key = "30533b3299acce78a42d535b81f7e4d9"
base_url = "https://api.openweathermap.org/data/2.5/weather"

columns = ['city', 'temperature', 'feels_like', 'humidity', 'wind_speed', 'weather', 'timestamp']
weather_df = pd.DataFrame(columns=columns)

server = 'localhost'
database = 'Weather'
driver = 'ODBC Driver 17 for SQL Server'

In [124]:
logging.basicConfig(
    filename="weather_etl.log",   # Log file name
    level=logging.DEBUG,          # Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format="%(asctime)s - %(levelname)s - %(message)s"  # Log format (timestamp, log level, message)
)

In [125]:
def extract_weather(cities):
    
    logging.info("Starting data extraction...")
    
    
    try:
        
        weather_city = {} #dictionary to contain extracted data
        count = 0
        
        for city in cities: #loop through cities list to get data for each city
            
            count += 1 #counter for how many times loop runs
            url = f"{base_url}?q={city}&appid={api_key}&units=metric" #url
            response = requests.get(url) #get request response

            if response.status_code == 200: #check response status code
                weather_city[city] = response.json() #get data for current city and update the dictionary

            else:
                print(f"Failed to fetch data: {response.status_code}")
                logging.error(f"Failed to fetch data: {response.status_code}") #log error message
                return None
            
        logging.info(f"Successfully extracted {count} records.") #log number of records extracted
        
        return weather_city #return extracted data
        
    except Exception as e: #error handling
    
        logging.error(f"Error occurred during data extraction: {e}")
        raise

In [131]:
#convert from meter/sec to km/hr
def convert_speed(speed):

    new_speed = speed * 3.6

    return new_speed

#convert time from UTC to human readable format
def convert_time(timestamp):

    new_time = datetime.utcfromtimestamp(timestamp)

    return new_time.strftime('%Y-%m-%d %H:%M:%S')

#data transformation
def transform_weather(raw_data):
    
    logging.info("Starting data transformation...")
    
    if not raw_data: #if extracted data is none
        logging.warning("No raw data received for transformation.")
        return None
    
    try:
        
        transformed_data_dict = {} #initilise empty dictionary to store transformed data
        
        for city in raw_data: #loop through to get data per city
            
            #extract useful fields
            transformed_data = {
                "city": raw_data[city].get("name", "Unknown City"), #unknown if key does not exist
                "temperature": raw_data[city].get("main", {}).get("temp", None), #"None" if key does not exist
                "feels_like": raw_data[city].get("main", {}).get("feels_like", None), #"None" if key does not exist
                "humidity": raw_data[city].get("main", {}).get("humidity", None), #"None" if key does not exist
                "wind_speed": raw_data[city].get("wind", {}).get("speed", 0),  # Default speed as 0
                "weather": raw_data[city].get("weather", [{}])[0].get("description", "No Description"), # "No description" if key does not exist
                "timestamp": raw_data[city].get("dt", None) #"None" if key does not exist
            }


            if transformed_data["wind_speed"] is not None: #if wind speed is not none
                transformed_data["wind_speed"] = convert_speed(transformed_data["wind_speed"]) #convert from meter/sec to km/hr
                
            if transformed_data["timestamp"] is not None:
                transformed_data["timestamp"] = convert_time(transformed_data["timestamp"]) #convert from UTC to human readable
            
            transformed_data_dict[city] = transformed_data #update dictionary with new transformed data for current city

        logging.info(f"Successfully transformed {len(transformed_data_dict)} records.")
        
    #error handling
    except Exception as e:
        
        logging.error(f"Error occurred during data transformation: {e}")
        raise
        return None
    
    return transformed_data_dict

In [183]:
import pyodbc
import os

#Code to connect to the database

#Connect to database
def connect_db(server, database, driver):
    print("In CONNECT DB") #just to check whats going on
    
    try:
        #Initial connection to the server
        connection_string = f'DRIVER={driver};SERVER={server};DATABASE={database};Trusted_Connection=yes;'
        connection = pyodbc.connect(connection_string)
        cursor = connection.cursor()

        # Check if the database exists
        cursor.execute(f"SELECT name FROM sys.databases WHERE name = ?", database)
        db_exists = cursor.fetchone()

        if not db_exists:
            #If database does not exist, create it
            print(f"Database '{database}' does not exist. Creating it...")
            logging.warning(f"Database '{database}' does nto exist. Creating it...")
            
            cursor.execute(f"CREATE DATABASE {database}") #execute create query

            print(f"Database '{database}' created successfully.")
            logging.info(f"Database '{database}' created successfully.")
            
        else:
            print(f"Database '{database}' already exists.")
            
        return connection #return the database connection
        
    except Exception as e:
        #If there is an error
        print(f"Unexpected error occured {e}")
        logging.error(f"Error occurred during database connection: {e}")
        return None

In [188]:
#load data into database
def load_weather(data, connection):
    
    try:
        logging.info("Starting data load.")
        
        dataframe = pd.DataFrame()
        
        for city in data: #loop through cities in the data
            
            city_data = pd.DataFrame([data[city]]) #convert to dataframe
            dataframe = pd.concat([dataframe, city_data], ignore_index=True)
        
        #object to interract with database
        cursor = connection.cursor()
        
        #insert data into the database
        for index, row in dataframe.iterrows(): #loop through rows in the dataframe
            
            #insert query
            insert_query = """
                INSERT INTO WeatherData (city, temperature, feels_like, humidity, wind_speed, weather, timestamp)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                """
            #execute the query
            cursor.execute(insert_query, row['city'], row['temperature'], row['feels_like'], 
                           row['humidity'], row['wind_speed'], row['weather'], row['timestamp'])
    
        #commit the transaction
        connection.commit()
        
        print("Records inserted successfully.")
        logging.info(f"Successfully loaded {len(data)} records into the database.")

    except Exception as e:
        logging.error(f"Error occurred during data load: {e}")
        raise
        
    return dataframe #return the dataframe

In [192]:
cities = ["London", "New York", "Paris", "Tokyo", "Lagos"] #cities to extract data for

#ETL execution function
def run_etl(cities, weather_df, server, database, driver):

    logging.info("ETL process started...")

    try:
        
        raw_data = extract_weather(cities) #call extraction function
        
        transformed_data = transform_weather(raw_data) #call transformation function
        
        connection = connect_db(server, database, driver) #call database connection function
        
        if transformed_data: #if data was transformed and is not null
            weather_df = load_weather(transformed_data, connection) #call load function
        logging.info("ETL process completed successfully.")
        
    except Exception as e:
        logging.error(f"ETL process failed: {e}")
        raise
        
    return weather_df #return the final data in a dataframe
        
weather_df = run_etl(cities, weather_df, server, database, driver)

In CONNECT DB
Database 'Weather' already exists.
Records inserted successfully.


In [134]:
weather_df

Unnamed: 0,city,temperature,feels_like,humidity,wind_speed,weather,timestamp
0,London,9.98,7.71,83,16.668,broken clouds,2024-12-16 20:52:56
1,New York,8.83,5.7,94,22.212,mist,2024-12-16 20:50:55
2,Paris,8.4,7.28,95,7.416,light intensity drizzle,2024-12-16 20:49:06
3,Tokyo,5.34,3.7,63,7.416,clear sky,2024-12-16 20:50:30
4,Lagos,25.84,26.73,86,8.676,broken clouds,2024-12-16 20:55:15
5,London,9.96,7.9,84,14.832,broken clouds,2024-12-16 21:03:06
6,New York,8.72,5.74,94,20.376,mist,2024-12-16 21:03:50
7,Paris,8.34,7.21,95,7.416,light intensity drizzle,2024-12-16 21:00:30
8,Tokyo,5.34,3.7,63,7.416,clear sky,2024-12-16 21:02:10
9,Lagos,25.84,26.73,86,8.676,broken clouds,2024-12-16 20:55:15


In [None]:
"""def load_weather(data, dataframe):
    
    try:
        logging.info("Starting data load.")
        
        for city in data:
            
            city_data = pd.DataFrame([data[city]])
            dataframe = pd.concat([dataframe, city_data], ignore_index=True)
            #print(city_data)

        logging.info(f"Successfully loaded {len(data)} records into the database.")

    except Exception as e:
        logging.error(f"Error occurred during data load: {e}")
        raise
        
    return dataframe

#weather_df = load_weather(transformed_data, weather_df)
#weather_df"""