In [2]:
import pandas as pd
import numpy as np
from scipy.stats import linregress
from multiprocessing import Pool
import time
from datetime import datetime
import streamlit as st
from statsmodels.tsa.statespace.sarimax import SARIMAX
from pmdarima import auto_arima
from joblib import Parallel, delayed
import aiohttp
import asyncio
import requests
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="sklearn")

In [3]:
# Чтение данных
data = pd.read_csv("temperature_data.csv", parse_dates=["timestamp"])
data = data.sort_values(by=["city", "timestamp"])

In [7]:
def analyze_city(city_data):
    
    rolling_mean = city_data['temperature'].rolling(window=30, min_periods=1).mean()
    rolling_std = city_data['temperature'].rolling(window=30, min_periods=1).std()

    seasonal_stats = city_data.groupby('season')['temperature'].agg(['mean', 'std']).reset_index()
    seasonal_profile = {
        row['season']: {"mean": row['mean'], "std": row['std']}
        for _, row in seasonal_stats.iterrows()
    }

    anomalies = city_data[np.abs(city_data['temperature'] - rolling_mean) > 2 * rolling_std]

    # Автоподбор параметров SARIMA
    stepwise_fit = auto_arima(city_data['temperature'], seasonal=True, m=12, trace=True, 
                              suppress_warnings=True, stepwise=True)
    order = stepwise_fit.order
    seasonal_order = stepwise_fit.seasonal_order
    model = SARIMAX(city_data['temperature'], order=order, seasonal_order=seasonal_order)
    results = model.fit(disp=0) 

    return rolling_mean, rolling_std, anomalies, seasonal_profile, order, seasonal_order

In [8]:
def parallel_analyze(cities_data):
    start_time = time.time()
    results = Parallel(n_jobs=-1)(delayed(analyze_city)(data) for data in cities_data)
    end_time = time.time()
    print(f"Время параллельного выполнения: {end_time - start_time:.2f} секунд")
    return results

def sequential_analyze(cities_data):
    start_time = time.time()
    results = [analyze_city(data) for data in cities_data]
    end_time = time.time()
    print(f"Время последовательного выполнения {end_time - start_time:.2f} секунд")
    return results

In [9]:
cities = data["city"].unique()

cities_data = [
    data[data["city"] == city].sort_values(by="timestamp").reset_index(drop=True)
    for city in cities
]
sequential_results = sequential_analyze(cities_data)


Performing stepwise search to minimize aic
 ARIMA(2,0,2)(1,0,1)[12] intercept   : AIC=23279.264, Time=7.57 sec
 ARIMA(0,0,0)(0,0,0)[12] intercept   : AIC=28121.186, Time=0.04 sec
 ARIMA(1,0,0)(1,0,0)[12] intercept   : AIC=24452.827, Time=1.78 sec
 ARIMA(0,0,1)(0,0,1)[12] intercept   : AIC=25729.595, Time=1.72 sec
 ARIMA(0,0,0)(0,0,0)[12]             : AIC=31374.708, Time=0.02 sec
 ARIMA(2,0,2)(0,0,1)[12] intercept   : AIC=23279.174, Time=5.18 sec
 ARIMA(2,0,2)(0,0,0)[12] intercept   : AIC=23277.231, Time=0.85 sec
 ARIMA(2,0,2)(1,0,0)[12] intercept   : AIC=23279.208, Time=3.89 sec
 ARIMA(1,0,2)(0,0,0)[12] intercept   : AIC=23275.120, Time=1.42 sec
 ARIMA(1,0,2)(1,0,0)[12] intercept   : AIC=23277.116, Time=4.88 sec
 ARIMA(1,0,2)(0,0,1)[12] intercept   : AIC=23277.116, Time=3.43 sec
 ARIMA(1,0,2)(1,0,1)[12] intercept   : AIC=inf, Time=7.43 sec
 ARIMA(0,0,2)(0,0,0)[12] intercept   : AIC=25582.675, Time=0.99 sec
 ARIMA(1,0,1)(0,0,0)[12] intercept   : AIC=23273.422, Time=1.09 sec
 ARIMA(1,0,

  warn('Non-stationary starting autoregressive parameters'


Performing stepwise search to minimize aic
 ARIMA(2,0,2)(1,0,1)[12] intercept   : AIC=inf, Time=9.06 sec
 ARIMA(0,0,0)(0,0,0)[12] intercept   : AIC=23564.251, Time=0.05 sec
 ARIMA(1,0,0)(1,0,0)[12] intercept   : AIC=22993.428, Time=2.82 sec
 ARIMA(0,0,1)(0,0,1)[12] intercept   : AIC=23108.677, Time=1.44 sec
 ARIMA(0,0,0)(0,0,0)[12]             : AIC=32367.251, Time=0.02 sec
 ARIMA(1,0,0)(0,0,0)[12] intercept   : AIC=23118.991, Time=0.24 sec
 ARIMA(1,0,0)(2,0,0)[12] intercept   : AIC=22925.589, Time=7.03 sec
 ARIMA(1,0,0)(2,0,1)[12] intercept   : AIC=22901.556, Time=13.89 sec
 ARIMA(1,0,0)(1,0,1)[12] intercept   : AIC=22902.256, Time=5.32 sec
 ARIMA(1,0,0)(2,0,2)[12] intercept   : AIC=inf, Time=16.90 sec
 ARIMA(1,0,0)(1,0,2)[12] intercept   : AIC=22901.364, Time=12.29 sec
 ARIMA(1,0,0)(0,0,2)[12] intercept   : AIC=22962.913, Time=4.29 sec
 ARIMA(1,0,0)(0,0,1)[12] intercept   : AIC=23025.824, Time=1.72 sec
 ARIMA(0,0,0)(1,0,2)[12] intercept   : AIC=23017.766, Time=11.88 sec
 ARIMA(2,0,0)

In [25]:
parallel_results = parallel_analyze(cities_data)

Время параллельного выполнения: 1809.15 секунд


Как видно удалось ускорить обработку почти в 2 раза при помощи распаралеливания

In [35]:
API_KEY = ''

In [37]:
async def fetch_temperature_async(city):
    url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={API_KEY}"
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
            return data['main']['temp']

def fetch_temperature_sync(city):
    url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={API_KEY}"
    response = requests.get(url)
    data = response.json()
    return data['main']['temp']

In [38]:
# Определение текущего сезона для определения аномальности
def get_current_season():
    current_date = datetime.now()
    if current_date.month in [12, 1, 2]:
        return 'winter'
    elif current_date.month in [3, 4, 5]:
        return 'spring'
    elif current_date.month in [6, 7, 8]:
        return 'summer'
    else:
        return 'autumn'

def is_temperature_normal(city, current_temp, data):
    city_data = data[data['city'] == city]
    current_season = get_current_season() 
    seasonal_stats = city_data.groupby('season')['temperature'].agg(['mean', 'std'])

    mean = seasonal_stats.loc[current_season, 'mean']
    std = seasonal_stats.loc[current_season, 'std']
    
    return abs(current_temp - mean) <= 2 * std

In [42]:
async def fetch_all_temperatures_async(cities):
    tasks = [fetch_temperature_async(city) for city in cities]
    temperatures = await asyncio.gather(*tasks)
    return dict(zip(cities, temperatures))
    
def fetch_all_temperatures_sync(cities):
    temperatures = {city: fetch_temperature_sync(city) for city in cities}
    return temperatures
    
# Сравним синхронные и асинхронные запросы, сделав вызовы по всем городам.
start_time = time.time()
async_temperatures = await fetch_all_temperatures_async(cities)
async_time = time.time() - start_time
print(f"Время параллельного выполнения:  {async_time:.2f} секунд")


start_time = time.time()
sync_temperatures = fetch_all_temperatures_sync(cities)
sync_time = time.time() - start_time
print(f"Время последовательного выполнения: {sync_time:.2f} секунд")

Время параллельного выполнения:  0.33 секунд
Время последовательного выполнения: 4.09 секунд


Как видно параллельное выполнение нескольких запросов также увеличивает скорость выполнения. Однако так как в streamlit приложении используются запросы по одному по одному городу, то можно ограничиться последовательными. Sarimax доло обучается поэтому в приложении streamlit используем более простую модель - Холта-Винтерса.