In [46]:
import os
import json
import requests
from pprint import pp
from time import time
import asyncio

from joblib import Parallel, delayed
import aiohttp
import pandas as pd

import nest_asyncio
nest_asyncio.apply()

import warnings
warnings.filterwarnings("ignore")

### Анализ и поиск аномалий

In [2]:
df = pd.read_csv("data/temperature_data.csv")
df = df.sort_values(by=["city", "timestamp"]).reset_index(drop=True)

In [3]:
def analyze_city(city_data):
    city_data["rolling_mean"] = (
        city_data["temperature"].rolling(window=30, min_periods=1).mean()
    )

    seasonal_stats = (
        city_data.groupby("season")["temperature"].agg(["mean", "std"]).reset_index()
    )
    city_data = city_data.merge(seasonal_stats, on="season", how="left")

    city_data["lower_bound"] = city_data["mean"] - 2 * city_data["std"]
    city_data["upper_bound"] = city_data["mean"] + 2 * city_data["std"]
    city_data["is_anomaly"] = ((city_data["temperature"] < city_data["lower_bound"]) | (city_data["temperature"] > city_data["upper_bound"])).astype(int)

    return city_data

In [4]:
def analyze_data_full(df):
    results = []
    for city in df["city"].unique():
        city_data = df[df["city"] == city]
        results.append(analyze_city(city_data))
    return pd.concat(results)

In [5]:
def analyze_data_parallel(df, n_jobs=-1):
    cities = df["city"].unique()
    city_data_splits = [df[df["city"] == city] for city in cities]
    results = Parallel(n_jobs=n_jobs)(
        delayed(analyze_city)(city_data) for city_data in city_data_splits
    )
    return pd.concat(results)

#### Теперь посмотрим на перформанс этих двух подходов

In [6]:
%%timeit
# Последовательно 
res_serial = analyze_data_full(df)

78.5 ms ± 613 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [35]:
%%timeit
# Паралеллелим
res_parallel = analyze_data_parallel(df)

70.9 ms ± 951 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


Итого получилось что с распараллеливание оказалось чууть чуть быстрее)

### OpenWeatherApi

In [8]:
from dotenv import load_dotenv
load_dotenv()

True

In [11]:
API_KEY = os.getenv("OPENWEATHERKEY")
URL_TEMPLATE = "https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={API_KEY}"

In [42]:
# @interact
def get_current_weather(city="Moscow"):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))

    return json.loads(resp.text)

In [43]:
pp(get_current_weather("London"))

{'coord': {'lon': -0.1257, 'lat': 51.5085},
 'weather': [{'id': 804,
              'main': 'Clouds',
              'description': 'overcast clouds',
              'icon': '04n'}],
 'base': 'stations',
 'main': {'temp': 9.68,
          'feels_like': 6.62,
          'temp_min': 9.06,
          'temp_max': 10.08,
          'pressure': 1008,
          'humidity': 74,
          'sea_level': 1008,
          'grnd_level': 1003},
 'visibility': 10000,
 'wind': {'speed': 6.69, 'deg': 240},
 'clouds': {'all': 100},
 'dt': 1734810606,
 'sys': {'type': 2,
         'id': 2075535,
         'country': 'GB',
         'sunrise': 1734768241,
         'sunset': 1734796406},
 'timezone': 0,
 'id': 2643743,
 'name': 'London',
 'cod': 200}


In [44]:
get_current_weather("Jopa")

{'cod': '404', 'message': 'city not found'}

И да, действительно, такого города нету.. Так что все работает корректно)

In [38]:
def is_current_weather_anomaly(city="Moscow", df=None):
    
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))

    curr_temp = json.loads(resp.text)["main"]["temp"]
    
    normal_mean, normal_std = df.loc[
            (df.city == city) & (df.season == "winter"), ["mean", "std"]
        ].values[0]
    
    lower_bound = normal_mean - 2 * normal_std
    upper_bound = normal_mean + 2 * normal_std

    is_anomaly = True if curr_temp > upper_bound or curr_temp < lower_bound else False
    
    return is_anomaly

In [36]:
df_stats = analyze_data_parallel(df)

In [40]:
for city in df_stats["city"].unique():
    anomaly = is_current_weather_anomaly(city=city, df=df_stats)
    print(f"Current weather in {city} is {'' if anomaly else 'not'} an anomaly")

Current weather in Beijing is not an anomaly
Current weather in Berlin is not an anomaly
Current weather in Cairo is not an anomaly
Current weather in Dubai is not an anomaly
Current weather in London is not an anomaly
Current weather in Los Angeles is not an anomaly
Current weather in Mexico City is not an anomaly
Current weather in Moscow is not an anomaly
Current weather in Mumbai is not an anomaly
Current weather in New York is not an anomaly
Current weather in Paris is not an anomaly
Current weather in Rio de Janeiro is not an anomaly
Current weather in Singapore is not an anomaly
Current weather in Sydney is not an anomaly
Current weather in Tokyo is not an anomaly


Хм, кажется что все нормально, и погода во всех городах не аномальная!! Кайф)

In [50]:
start = time()
for city in df_stats["city"].unique():
    get_current_weather(city)
print(f"На получение инфы последовательно ушло {time() - start:.4f}s")

На получение последовательно инфы ушло 5.4948s


In [51]:
async def get_weather_async(city):
    url = URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    )
    
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
                content = await response.text()
                return json.loads(content)

In [52]:
start = time()
await asyncio.gather(*[get_weather_async(city) for city in df.city.unique()])
print(f"На получение инфы параллельно ушло: {time() - start:.4f}s")

На получение инфы параллельно ушло: 0.3783s


Асинхронная версия оказалась blazingly fast 