In [165]:
import pandas as pd
import numpy as np
from scipy.stats import linregress
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import plotly.graph_objects as go
from dotenv import load_dotenv
import os
import requests
import time
import aiohttp
import asyncio
from datetime import datetime

import nest_asyncio
nest_asyncio.apply()
import asyncio


load_dotenv(override=True)

api_key = os.getenv("OPENWEATHER_API_KEY")
if not api_key:
    api_key = input("Введите свой OpenWeatherMap API Key: ").strip()

In [2]:
# Загрузка данных
dir_data = 'data'
file_name = 'temperature_data.csv'
path_data = Path(dir_data) / file_name

df = pd.read_csv(path_data)
print(df.info())
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 54750 entries, 0 to 54749
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   city         54750 non-null  object 
 1   timestamp    54750 non-null  object 
 2   temperature  54750 non-null  float64
 3   season       54750 non-null  object 
dtypes: float64(1), object(3)
memory usage: 1.7+ MB
None


Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,7.162237,winter
1,New York,2010-01-02,8.048603,winter
2,New York,2010-01-03,-2.443189,winter
3,New York,2010-01-04,3.400037,winter
4,New York,2010-01-05,8.778664,winter


In [22]:
def analysis_city(df_city):
  city = df_city['city'].iloc[0]
  df = df_city.copy()
  df['timestamp'] = pd.to_datetime(df['timestamp'])
  df = df.sort_values('timestamp').reset_index(drop=True)

  # 1.1 Скользящее среднее и std, поиск аномалий
  df['temp_smooth'] = df.groupby(['season'])['temperature'].transform(
    lambda x: x.rolling(window=30, center=True).mean()
  )

  smooth_season_profile = df.groupby(['season'])['temp_smooth'].agg(
    temp_mean_smooth = 'mean',
    temp_std_smooth = 'std'
  ).reset_index()

  df = df.merge(smooth_season_profile, on=['season'], how='left')
  df['temp_anomaly'] = np.abs(df['temperature'] - df['temp_mean_smooth']) > 2*df['temp_std_smooth']
  temp_anomaly = df[df['temp_anomaly']].copy()[['timestamp', 'temperature', 'temp_smooth', 'temp_std_smooth']]

  # 1.2 Сезонный профиль: mean, std по сезонам (без скользящего окна)
  season_profile = df.groupby(['season'])['temperature'].agg(
    temp_mean = 'mean',
    temp_std = 'std'
  ).reset_index()

  # 1.3 Тренд
  df['day_index'] = np.arange(len(df))
  slope, intercept, r_value, p_value, std_err = linregress(df['day_index'], df['temperature'])

  if abs(slope) < 1e-8 or p_value > 0.05:
    trend = 'neutral'
  elif slope > 0:
    trend = 'positive'
  else:
    trend = 'negative'

  # 1.4 Общая статистика
  avg_temp = df['temperature'].mean()
  min_temp = df['temperature'].min()
  max_temp = df['temperature'].max()

  return {
      'city': city,
      'avg_temp': avg_temp,
      'min_temp': min_temp,
      'max_temp': max_temp,
      'season_profile': season_profile,
      'trend': trend,
      'temp_anomaly': temp_anomaly
  }

def split_by_city(df):
    for city in df['city'].unique():
      yield df[df['city'] == city].copy()

Экперементы с параллелизмом

In [74]:
%%time
results1 = [analysis_city(city_df) for city_df in split_by_city(df)]

CPU times: user 253 ms, sys: 1.66 ms, total: 255 ms
Wall time: 253 ms


In [90]:
%%time
with ThreadPoolExecutor() as executor:
    results2 = list(executor.map(
        analysis_city, 
        split_by_city(df)
    ))

CPU times: user 326 ms, sys: 30 ms, total: 356 ms
Wall time: 308 ms


In [91]:
%%time
with ProcessPoolExecutor(max_workers=12) as executor:
    results3 = list(executor.map(
        analysis_city, 
        split_by_city(df)
    ))

CPU times: user 117 ms, sys: 189 ms, total: 305 ms
Wall time: 3.02 s


Экперемент с распараллеливанем показал, что ускорить вычисления с помощью многопоточности или многопроцессности для данного df не получается. 

Многопоточность даже теоретически не должна была дать прироста, так как GIL не дает выполняться более чем 1 потоку одновременно, а IO тут как такового нет, как следствие время затраченное на выполение расчетов увеличилось в полтора раза.

Многопроцессность кратно увеличила время работы, так как создание процесса требует большое количество ресурсов.

In [127]:
def plot_season_profile(season_profile, city):
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=season_profile['season'],
        y=season_profile['temp_mean'],
        error_y=dict(type='data', array=season_profile['temp_std']),
        mode='lines+markers',
        name='Seasonal Mean ± STD'
    ))
    fig.update_layout(title=f"Сезонный профиль города — {city}")
    return fig

def plot_time_series(city_df, temp_anomaly, city):
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=city_df['timestamp'], y=city_df['temperature'],
        mode='lines', name='Temperature'
    ))
    if not temp_anomaly.empty:
        fig.add_trace(go.Scatter(
            x=temp_anomaly['timestamp'], y=temp_anomaly['temperature'],
            mode='markers', name='Anomalies', marker=dict(color='red', size=8)
        ))
    fig.update_layout(title=f"Временной ряд температуры города — {city}")
    return fig

In [128]:
city_stats = results1[3]
summary = f"""Климатические показатели города — {city_stats['city']}:
  — Средняя температура за время наблюдений — {city_stats['avg_temp']:.1f}
  — Минимальная температура за время наблюдений — {city_stats['min_temp']:.1f}
  — Максимальная температура за время наблюдений — {city_stats['max_temp']:.1f}
  — Общий температурный тренд за время наблюдений — {city_stats['trend']}
"""
print(summary)

fig_season_plofile = plot_season_profile(city_stats['season_profile'], city_stats['city'])
fig_season_plofile.show()

fig_time_series = plot_time_series(df[df['city'] == city_stats['city']].copy(), city_stats['temp_anomaly'], city_stats['city'])
fig_time_series.show()

Климатические показатели города — Tokyo:
  — Средняя температура за время наблюдений — 16.7
  — Минимальная температура за время наблюдений — -12.9
  — Максимальная температура за время наблюдений — 43.4
  — Общий температурный тренд за время наблюдений — positive



In [151]:
load_dotenv(override=True)

api_key = os.getenv("OPENWEATHER_API_KEY")
if not api_key:
    api_key = input("Введите свой OpenWeatherMap API Key: ").strip()

In [163]:
def get_temp_sync(city, api_key):
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric'
    response = requests.get(url)
    data = response.json()
    return data['main']['temp']

temps_sync = {}
start = time.time()
for city in df['city'].unique():
    temps_sync[city] = get_temp_sync(city, api_key)

end = time.time()

print("Synchronous results:", temps_sync)
print("Time:", end - start)

Synchronous results: {'New York': 19.97, 'London': 16.35, 'Paris': 18.42, 'Tokyo': 20.57, 'Moscow': 20.31, 'Sydney': 12.23, 'Berlin': 24.01, 'Beijing': 20.94, 'Rio de Janeiro': 37, 'Dubai': 35.96, 'Los Angeles': 19.45, 'Singapore': 29.11, 'Mumbai': 26.99, 'Cairo': 30.42, 'Mexico City': 17.75}
Time: 19.477548837661743


In [None]:
async def get_temp_async(session, city, api_key):
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric'
    async with session.get(url) as response:
        data = await response.json()
        return city, data['main']['temp']
    
async def main(cities, api_key):
    temps = {}
    async with aiohttp.ClientSession() as session:
        tasks = [get_temp_async(session, city, api_key) for city in cities]
        for future in asyncio.as_completed(tasks):
            city, temp = await future
            temps[city] = temp
    return temps

start = time.time()
temps_async = asyncio.run(main(df['city'].unique(), api_key))
end = time.time()

print("Asynchronous results:", temps_async)
print("Time:", end - start)

Asynchronous results: {'Cairo': 30.42, 'Tokyo': 20.57, 'London': 16.41, 'Berlin': 24.01, 'Mumbai': 26.99, 'Mexico City': 17.75, 'Paris': 18.42, 'Moscow': 20.31, 'Sydney': 12.23, 'Los Angeles': 19.45, 'Singapore': 29.11, 'Rio de Janeiro': 32.98, 'Dubai': 35.96, 'Beijing': 19.94, 'New York': 20.05}
Time: 3.1653714179992676


Использование асинхронного подхода повзолило ускороить получение данных по api почти в 7 раз

In [174]:
def get_currnet_season():
    month = datetime.now().month
    if month in [12, 1, 2]:
        return 'winter'
    elif month in [3, 4, 5]:
        return 'spring'
    elif month in [6, 7,8]:
        return 'summer'
    else:
        return 'autumn'
    
def check_temp_animaly(current_temp, season_profile, season):
    row = season_profile[season_profile['season'] == season]
    mean = row['temp_mean'].values[0]
    std = row['temp_std'].values[0]
    if abs(current_temp - mean) > 2*std:
        return 'Аномальная температура'
    else:
        return 'Нормальная температура'

In [177]:
season = get_currnet_season()
current_temp = temps_async[city_stats['city']]
season_profile = city_stats['season_profile']

status = check_temp_animaly(current_temp, season_profile, season)
print(f"Текущая температура в городе {city_stats['city']}: {current_temp}°C ({status})")

Текущая температура в городе Tokyo: 20.57°C (Нормальная температура)
