In [None]:
# pip install requests

In [None]:
# pip install sqlalchemy

In [None]:
# pip install mysql.connector

In [None]:
# pip install --upgrade mysql-connector-python

In [None]:
# pip install pandas

In [None]:
import os
import sys

"""Adding absolute path for to be able to import config"""
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)

In [1]:
import concurrent.futures
import time
import random
import requests
import pandas as pd
from datetime import datetime

from setup.database_loader import DatabaseLoader
from setup.weather_api import get_weather_data

from sqlalchemy.orm import sessionmaker
from sqlalchemy import text



from src.top_20_europe_cities import cities_20 # Importing a list of cities
from config.settings import API_KEY # API key for weather data API calls

In [2]:
from setup.weather_api import get_weather_data

# Database Setup and Design

In [None]:
# EXPORTED TO create_database.py

# """Create database connection and database."""
# db_loader = DatabaseLoader()
# db_loader.create_engine()

# """Create database and update engine."""
# db_loader.delete_database()
# db_loader.create_database()

# """Create all tables in MySQL database."""
# db_loader.create_tables()

# """Add cities to the database."""
# db_loader.add_cities(cities_20)

# """Adding.removing additional cities to/from the database."""
# db_loader.add_cities(["Vilnius", "Kaunas"])
# db_loader.remove_cities(["Kaunas"])

In [3]:
"""Instantiating Database connection."""
db_loader = DatabaseLoader()
db_loader.create_engine_with_db()

In [4]:
"""Retrieving up-to-date cities from MySQL database for querying."""
Session = sessionmaker(bind=db_loader.engine)

with Session() as session:
    result = session.execute(text("SELECT city_name FROM cities ORDER BY city_id ASC"))
    # Extract city names into a list
    lookup_cities = [row[0] for row in result]

print(lookup_cities)

['Istanbul', 'Moscow', 'London', 'Saint Petersburg', 'Berlin', 'Madrid', 'Kyiv', 'Rome', 'Bucharest', 'Paris', 'Minsk', 'Vienna', 'Warsaw', 'Hamburg', 'Budapest', 'Belgrade', 'Barcelona', 'Munich', 'Kharkiv', 'Milan', 'Vilnius']


In [5]:
lookup_cities[:3]

['Istanbul', 'Moscow', 'London']

In [None]:
# """Turn off foregin key constaint checks."""
# db_loader.turn_off_fk_check()

# """Upload all the data from the dataframes to MySQL database tables."""
# for df, db_table in zip(dfs_list, db_tables):
#     print(f"Inserting into table: {db_table}")
#     db_loader.send_data(df, db_table)
# print("Upload complete.")    

# """Turn on foreign key checks."""
# db_loader.turn_on_fk_check()

# API

In [12]:
"""Variables:"""
OUTPUT_TABLE='weather_data'
NUM_THREADS = 3

"""Initialize an empty DataFrame to store all results"""
all_weather_data = pd.DataFrame()

"""Retrieving up-to-date cities from MySQL database for querying."""
Session = sessionmaker(bind=db_loader.engine)

with Session() as session:
    result = session.execute(text("SELECT city_name FROM cities ORDER BY city_id ASC"))
    # Extract city names into a list
    lookup_cities = [row[0] for row in result]

print(lookup_cities)
# cities = ["London", "Berlin"]  # Example list of cities

['Istanbul', 'Moscow', 'London', 'Saint Petersburg', 'Berlin', 'Madrid', 'Kyiv', 'Rome', 'Bucharest', 'Paris', 'Minsk', 'Vienna', 'Warsaw', 'Hamburg', 'Budapest', 'Belgrade', 'Barcelona', 'Munich', 'Kharkiv', 'Milan', 'Vilnius']


In [None]:
"""Linear"""
all_weather_data = pd.DataFrame()

# Loop through each city and append the data
for city in lookup_cities:
    # Fetch weather data for the city
    # city_weather_data = get_weather_data(city, API_KEY)
    city_weather_data = get_weather_data(city)
    
    # Check if DataFrame is not empty (i.e., successful data fetch).
    if not city_weather_data.empty:
        # Send the DataFrame to the 'cities' table in the database.
        db_loader.send_data(df=city_weather_data, db_table=OUTPUT_TABLE)
        print(f"Weather data for {city} sent to database.")
    else:
        print(f"Failed to fetch or send data for {city}.")
    
    # Add all to the DataFrame.
    all_weather_data = pd.concat([all_weather_data, city_weather_data], ignore_index=True)

all_weather_data

In [13]:
"""Threaded model, with 3 threads."""
"""With 3 threads."""
# Initialize an empty DataFrame to store all weather data
all_weather_data = pd.DataFrame()

# Use ThreadPoolExecutor to fetch weather data concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    # Create a future to city mapping
    # future_to_city = {executor.submit(get_weather_data, city, API_KEY): city for city in cities}
    future_to_city = {executor.submit(get_weather_data, city): city for city in lookup_cities}

    # loop iterates over the Future objects as they are completed. as_completed yields futures as they complete (either successfully or with an exception).
    # A Future object is a representation of an eventual result of an asynchronous operation. When submit is called, it immediately returns a Future, even though the associated function might not have completed yet.
    # as_completed yields futures as they complete, regardless of the order in which they were submitted.
    for future in concurrent.futures.as_completed(future_to_city):
        city = future_to_city[future]
        # future.result() retrieves the result of the completed task. It's wrapped in a try block to handle potential exceptions.
        try:
            city_weather_data = future.result()
            # Check and Process the Result
            # If the result (data for a city) is not empty, it is sent to a database using db_loader
            if not city_weather_data.empty:
                # Send the DataFrame to the 'cities' table in the database
                db_loader.send_data(df=city_weather_data, db_table=OUTPUT_TABLE)
                print(f"Weather data for {city} sent to database.")
            else:
                print(f"Failed to fetch or send data for {city}.")
        except Exception as exc:
            print(f"{city} generated an exception: {exc}")

        # Add all to the DataFrame
        all_weather_data = pd.concat([all_weather_data, city_weather_data], ignore_index=True)

all_weather_data

Weather data for Moscow sent to database.
Weather data for London sent to database.
Weather data for Istanbul sent to database.
Exception occurred while fetching data for Madrid: HTTPSConnectionPool(host='api.openweathermap.org', port=443): Max retries exceeded with url: /data/2.5/weather?q=Madrid&appid=2a8323acc7761522252eae8f517d4868&units=metric (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x1447ca550>: Failed to resolve 'api.openweathermap.org' ([Errno 8] nodename nor servname provided, or not known)"))
Madrid generated an exception: 'NoneType' object has no attribute 'empty'
Weather data for Berlin sent to database.
Weather data for Kyiv sent to database.
Weather data for Saint Petersburg sent to database.
Weather data for Rome sent to database.
Weather data for Bucharest sent to database.
Weather data for Paris sent to database.
Weather data for Vienna sent to database.
Weather data for Minsk sent to database.
Weather data for Budapest sent to dat

Unnamed: 0,city_name,temperature,condition,conditions_description,timestamp
0,Moscow,1.94,Clouds,overcast clouds,2023-11-15 22:37:35.331422
1,London,11.58,Clouds,scattered clouds,2023-11-15 22:37:35.332592
2,Istanbul,20.64,Clouds,few clouds,2023-11-15 22:37:35.336336
3,Berlin,8.7,Clouds,broken clouds,2023-11-15 22:37:35.653996
4,Kyiv,9.96,Clouds,overcast clouds,2023-11-15 22:37:35.807937
5,Saint Petersburg,1.12,Clouds,overcast clouds,2023-11-15 22:37:35.947717
6,Rome,14.53,Clouds,few clouds,2023-11-15 22:37:36.006538
7,Bucharest,14.98,Clouds,few clouds,2023-11-15 22:37:36.130933
8,Paris,13.22,Clouds,broken clouds,2023-11-15 22:37:36.233939
9,Vienna,12.77,Clear,clear sky,2023-11-15 22:37:36.980300


In [4]:
# API_KEY = "2a8323acc7761522252eae8f517d4868"

In [6]:
# EXPORTED to weather_api.py

# Function to make an API call to OpenWeatherMap and get weather data for a specific city
def get_weather_data(city, API_KEY):
    # Simulating a network request delay to prevent a block from server.
    time.sleep(random.uniform(0.1, 0.2))
    
    try:
        # Constructing the API request URL
        url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
        
        # Making the API request
        response = requests.get(url)
        
        # Checking if the response is successful
        if response.status_code == 200:
            data = response.json()
            # Extracting relevant data from the response
            weather_data = {
                'city_name': city,
                # 'country': data['sys']['country'],
                'temperature': data['main']['temp'],
                'condition': data['weather'][0]['main'],
                'conditions_description': data['weather'][0]['description'],
                'timestamp': datetime.now()  # Add current timestamp
            }
            # Convert the dictionary to a DataFrame
            return pd.DataFrame([weather_data])
        else:
            # Handling unsuccessful responses without adding to DataFrame
            print(f"Failed to fetch data for {city}, status code: {response.status_code}")
            # return pd.DataFrame()
    except Exception as e:
        # Handling any exceptions during the API call without adding to DataFrame
        print(f"Exception occurred while fetching data for {city}: {str(e)}")
        # return pd.DataFrame()


# Running the function for a single city as an example
example_city = "London"
weather_data_df = get_weather_data(example_city, API_KEY)
weather_data_df


Unnamed: 0,city_name,temperature,condition,conditions_description,timestamp
0,London,11.6,Clouds,broken clouds,2023-11-15 21:54:53.135786


In [3]:
example_city = "London"
weather_data_df = get_weather_data(example_city)
weather_data_df


Unnamed: 0,city_name,temperature,condition,conditions_description,timestamp
0,London,11.67,Clouds,scattered clouds,2023-11-15 22:16:06.618421


In [None]:
# In case of non-existing cities are added.
cities = ["London", "Berlin", "Not_real_city"]  # Example list of cities

# Initialize an empty DataFrame to store all results
all_weather_data = pd.DataFrame()

error_log = []  # List to store error messages

# Loop through each city and append the data
for city in cities:
    try:
    # Fetch weather data for the city
    # city_weather_data = get_weather_data(city, API_KEY)
        city_weather_data = get_weather_data(city)
        
        # Check if DataFrame is not empty (i.e., successful data fetch).
        if not city_weather_data.empty:
            # Send the DataFrame to the 'cities' table in the database.
            db_loader.send_data(df=city_weather_data, db_table='weather_data')
            print(f"Weather data for {city} sent to database.")
        else:
                print(f"Failed to fetch or send data for {city}.")
    except Exception as e:
        # Store the error message along with the city name
        error_log.append(f"Error for {city}: {str(e)}")
        continue  # Continue with the next iteration of the loop
    
    # Add all to the DataFrame.
    all_weather_data = pd.concat([all_weather_data, city_weather_data], ignore_index=True)

# After the loop, print all the errors
if error_log:
    print("Errors encountered:")
    for error in error_log:
        print(error)

all_weather_data

In [10]:
cities = ["London", "Berlin"]  # Example list of cities

# Initialize an empty DataFrame to store all results
all_weather_data = pd.DataFrame()

# Loop through each city and append the data
for city in cities:
    # Fetch weather data for the city
    # city_weather_data = get_weather_data(city, API_KEY)
    city_weather_data = get_weather_data(city)
    
    # Check if DataFrame is not empty (i.e., successful data fetch).
    if not city_weather_data.empty:
        # Send the DataFrame to the 'cities' table in the database.
        db_loader.send_data(df=city_weather_data, db_table='weather_data')
        print(f"Weather data for {city} sent to database.")
    else:
        print(f"Failed to fetch or send data for {city}.")
    
    # Add all to the DataFrame.
    all_weather_data = pd.concat([all_weather_data, city_weather_data], ignore_index=True)

all_weather_data

Weather data for London sent to database.
Weather data for Berlin sent to database.
Failed to fetch data for Not_real_city, status code: 404
Errors encountered:
Error for Not_real_city: 'NoneType' object has no attribute 'empty'


Unnamed: 0,city_name,temperature,condition,conditions_description,timestamp
0,London,11.58,Clouds,scattered clouds,2023-11-15 22:25:44.564192
1,Berlin,8.83,Clouds,broken clouds,2023-11-15 22:25:44.922419


In [7]:
"""With 3 threads."""
NUM_THREADS = 3
# Your get_weather_data function definition remains the same

# Assuming `cities` is a list of city names and `API_KEY` is defined
all_weather_data = pd.DataFrame()  # Initialize an empty DataFrame to store all weather data

# Use ThreadPoolExecutor to fetch weather data concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    # Create a future to city mapping
    # future_to_city = {executor.submit(get_weather_data, city, API_KEY): city for city in cities}
    future_to_city = {executor.submit(get_weather_data, city): city for city in cities}

    # loop iterates over the Future objects as they are completed. as_completed yields futures as they complete (either successfully or with an exception).
    # A Future object is a representation of an eventual result of an asynchronous operation. When submit is called, it immediately returns a Future, even though the associated function might not have completed yet.
    # as_completed yields futures as they complete, regardless of the order in which they were submitted.
    for future in concurrent.futures.as_completed(future_to_city):
        city = future_to_city[future]
        # future.result() retrieves the result of the completed task. It's wrapped in a try block to handle potential exceptions.
        try:
            city_weather_data = future.result()
            # Check and Process the Result
            # If the result (data for a city) is not empty, it is sent to a database using db_loader
            if not city_weather_data.empty:
                # Send the DataFrame to the 'cities' table in the database
                db_loader.send_data(df=city_weather_data, db_table='weather_data')
                print(f"Weather data for {city} sent to database.")
            else:
                print(f"Failed to fetch or send data for {city}.")
        except Exception as exc:
            print(f"{city} generated an exception: {exc}")

        # Add all to the DataFrame
        all_weather_data = pd.concat([all_weather_data, city_weather_data], ignore_index=True)

all_weather_data

Weather data for London sent to database.
Weather data for Berlin sent to database.


Unnamed: 0,city_name,temperature,condition,conditions_description,timestamp
0,London,11.67,Clouds,scattered clouds,2023-11-15 22:17:08.997295
1,Berlin,8.73,Clouds,broken clouds,2023-11-15 22:17:09.035810


In [7]:
timestamp = 15
elapsed_time = 200
print("="*30,"\n" f"Timestamp: {timestamp}", "\n" f"Total elapsed time: {elapsed_time} seconds")
# print(f"Timestamp: {timestamp}")
# print(f"Total elapsed time: {elapsed_time} seconds")


Timestamp: 15 
Total elapsed time: 200 seconds


In [19]:
import os
print(f"Current working directory: {os.getcwd()}")

Current working directory: /Users/wxo508/scripts_testing/crontab_trial


In [None]:
# Setting MySQL path
PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/mysql-8.1.0-macos13-arm64/bin

# Weather API call to OpenWeatherMap and storage to MySQL database, Hourly
0 * * * * /Users/wxo508/scripts_testing/crontab_trial/weather_call.py >> /Users/wxo508/scripts_testing/crontab_trial/logs/weather_call.log 2>&1

# Report and Analytical views refresh based on newly ingested data, Hourly
0 * * * * /Users/wxo508/scripts_testing/crontab_trial/shell_scripts/extreme_temperatures.sh >> /Users/wxo508/scripts_testing/crontab_trial/logs/extreme_temperatures.log 2>&1
0 * * * * /Users/wxo508/scripts_testing/crontab_trial/shell_scripts/raininess.sh >> /Users/wxo508/scripts_testing/crontab_trial/logs/raininess.log 2>&1
0 * * * * /Users/wxo508/scripts_testing/crontab_trial/shell_scripts/statistics_per_city.sh >> /Users/wxo508/scripts_testing/crontab_trial/logs/statistics_per_city.log 2>&1

# Weather data database backup and 24h+ age backup deletion, Hourly
0 * * * * /Users/wxo508/scripts_testing/crontab_trial/shell_scripts/database_backup.sh >> /Users/wxo508/scripts_testing/crontab_trial/logs/database_backup.log 2>&1

# Database reindexation for performance, Daily at 1pm
0 1 * * * /Users/wxo508/scripts_testing/crontab_trial/shell_scripts/index_weather_data.sh >> /Users/wxo508/scripts_testing/crontab_trial/logs/index_weather_data.log 2>&1