In [1]:
import pandas as pd
import numpy as np


# Реальные средние температуры (примерные данные) для городов по сезонам
seasonal_temperatures = {
    "New York": {"winter": 0, "spring": 10, "summer": 25, "autumn": 15},
    "London": {"winter": 5, "spring": 11, "summer": 18, "autumn": 12},
    "Paris": {"winter": 4, "spring": 12, "summer": 20, "autumn": 13},
    "Tokyo": {"winter": 6, "spring": 15, "summer": 27, "autumn": 18},
    "Moscow": {"winter": -10, "spring": 5, "summer": 18, "autumn": 8},
    "Sydney": {"winter": 12, "spring": 18, "summer": 25, "autumn": 20},
    "Berlin": {"winter": 0, "spring": 10, "summer": 20, "autumn": 11},
    "Beijing": {"winter": -2, "spring": 13, "summer": 27, "autumn": 16},
    "Rio de Janeiro": {"winter": 20, "spring": 25, "summer": 30, "autumn": 25},
    "Dubai": {"winter": 20, "spring": 30, "summer": 40, "autumn": 30},
    "Los Angeles": {"winter": 15, "spring": 18, "summer": 25, "autumn": 20},
    "Singapore": {"winter": 27, "spring": 28, "summer": 28, "autumn": 27},
    "Mumbai": {"winter": 25, "spring": 30, "summer": 35, "autumn": 30},
    "Cairo": {"winter": 15, "spring": 25, "summer": 35, "autumn": 25},
    "Mexico City": {"winter": 12, "spring": 18, "summer": 20, "autumn": 15},
}

# Сопоставление месяцев с сезонами
month_to_season = {12: "winter", 1: "winter", 2: "winter",
                   3: "spring", 4: "spring", 5: "spring",
                   6: "summer", 7: "summer", 8: "summer",
                   9: "autumn", 10: "autumn", 11: "autumn"}

# Генерация данных о температуре
def generate_realistic_temperature_data(cities, num_years=100):
    dates = pd.date_range(start="2010-01-01", periods=365 * num_years, freq="D")
    data = []

    for city in cities:
        for date in dates:
            season = month_to_season[date.month]
            mean_temp = seasonal_temperatures[city][season]
            # Добавляем случайное отклонение
            temperature = np.random.normal(loc=mean_temp, scale=5)
            data.append({"city": city, "timestamp": date, "temperature": temperature})

    df = pd.DataFrame(data)
    df['season'] = df['timestamp'].dt.month.map(lambda x: month_to_season[x])
    return df

# Генерация данных
data = generate_realistic_temperature_data(list(seasonal_temperatures.keys()))

In [2]:
import os 
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv('API_KEY')

Расчитаем среднее и стандартное отклонение группировкой по сезонам и городам и будем их хранить в отдельных переменных.

In [3]:
%%time
stds = data.groupby(['city', 'season'])['temperature'].std()
means = data.groupby(['city', 'season'])['temperature'].mean()

CPU times: user 50.5 ms, sys: 11 ms, total: 61.5 ms
Wall time: 61.2 ms


А теперь расчитаем те же показатели и будем их хранить в исходной таблице.

In [4]:
%%time
data['std'] = data.groupby(
        ['city', 'season']
        )['temperature'].transform('std')

data['mean'] = data.groupby(
        ['city', 'season']
        )['temperature'].transform('mean')

CPU times: user 49.6 ms, sys: 14 ms, total: 63.6 ms
Wall time: 62.7 ms


Время вычислений отличается не сильно, но во втором случае мы будем хранить много дубликатов, поэтому используем дополнительную память. 
Расчитаем скользящее среднее.

In [5]:
data['moving_average'] = data.groupby(
        'city'
        )['temperature'].transform(
            lambda x: x.rolling(window=30).mean()
            )

Теперь чтобы рассчитать выбросы нам надо присоеденить таблицу, и нам все равно придется хранить много дубликатов, поэтому будем использовать второй способ расчета стандартного отклонения и среднего.

Расчитаем выбросы с помощью apply

In [6]:
def calc_is_outlier(row):
    return int(row['temperature'] > row['mean'] + 2 * row['std']) | (row['temperature'] < row['mean'] - 2 * row['std'])

In [7]:
%%time
data.apply(calc_is_outlier, axis=1)

CPU times: user 2.98 s, sys: 66.9 ms, total: 3.05 s
Wall time: 3.05 s


0         0
1         1
2         0
3         0
4         0
         ..
547495    0
547496    0
547497    0
547498    1
547499    0
Length: 547500, dtype: int64

Расчеты занимают прилично времени. Посмотрим сколько времени займет векторные вычисления.

In [8]:
%%time
data['is_outlier'] = (
    (data['temperature'] > data['mean'] + 2 * data['std']) |
    (data['temperature'] < data['mean'] - 2 * data['std'])
).astype(int)

CPU times: user 4.13 ms, sys: 40 μs, total: 4.17 ms
Wall time: 3.45 ms


Мы заметно ускорились с помощью векторных вычислений. Попробуем рапаралелить наши вычисления с помощью библиотеки `multiprocessing`

In [9]:
from multiprocessing import Pool

def process_chunk(chunk):
    chunk['is_outlier'] = (
        (chunk['temperature'] > chunk['mean'] + 2 * chunk['std']) |
        (chunk['temperature'] < chunk['mean'] - 2 * chunk['std'])
    ).astype(int)
    
    return chunk

# Разделение DataFrame на части
def parallel_apply(df, func, n_cores=4):
    chunk_size = len(df) // n_cores
    chunks = [df[i * chunk_size:(i + 1) * chunk_size] for i in range(n_cores)]
    with Pool(n_cores) as pool:
        results = pool.map(func, chunks)

    return pd.concat(results)

In [10]:
%%time
parallel_apply(data, process_chunk)

CPU times: user 51.8 ms, sys: 57 ms, total: 109 ms
Wall time: 117 ms


Unnamed: 0,city,timestamp,temperature,season,std,mean,moving_average,is_outlier
0,New York,2010-01-01,-3.058695,winter,4.988176,-0.070198,,0
1,New York,2010-01-02,13.916703,winter,4.988176,-0.070198,,1
2,New York,2010-01-03,-6.809581,winter,4.988176,-0.070198,,0
3,New York,2010-01-04,0.838385,winter,4.988176,-0.070198,,0
4,New York,2010-01-05,0.012133,winter,4.988176,-0.070198,,0
...,...,...,...,...,...,...,...,...
547495,Mexico City,2109-12-03,12.126426,winter,5.029781,12.082468,14.424527,0
547496,Mexico City,2109-12-04,18.281893,winter,5.029781,12.082468,14.332900,0
547497,Mexico City,2109-12-05,12.867828,winter,5.029781,12.082468,14.264653,0
547498,Mexico City,2109-12-06,22.260401,winter,5.029781,12.082468,14.354379,1


Вычисления стали занимать намного больше времени. Попробуем библиотеку `modin` для расспаралеливания.

In [11]:
import modin.pandas as mpd

# Преобразование pandas DataFrame в modin DataFrame
modin_data = mpd.DataFrame(data)



In [12]:
%%time
modin_data['is_outlier'] = (
        (modin_data['temperature'] > modin_data['mean'] + 2 * modin_data['std']) |
        (modin_data['temperature'] < modin_data['mean'] - 2 * modin_data['std'])
    ).astype(int)

CPU times: user 427 ms, sys: 25.3 ms, total: 452 ms
Wall time: 505 ms


До векторных операций pandas далеко.

In [13]:
import polars as pl

# Преобразование pandas DataFrame в polars DataFrame
polars_data = pl.from_pandas(data)

In [14]:
%%time
modin_data['is_outlier'] = (
    (polars_data['temperature'] > polars_data['mean'] + 2 * polars_data['std']) | 
    (polars_data['temperature'] < polars_data['mean'] - 2 * polars_data['std'])
    )

CPU times: user 375 ms, sys: 27 ms, total: 402 ms
Wall time: 391 ms


In [15]:
from joblib import Parallel, delayed

def parallel_apply_chunks(df, func, n_jobs=4):
    chunks = np.array_split(df, n_jobs)
    results = Parallel(n_jobs=n_jobs)(delayed(func)(chunk) for chunk in chunks)
    return pd.concat(results)

In [16]:
%%time
parallel_apply_chunks(data, process_chunk, n_jobs=3)

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

or by setting POLARS_ALLOW_FORKING_THREAD=1.



CPU times: user 137 ms, sys: 117 ms, total: 254 ms
Wall time: 504 ms


Unnamed: 0,city,timestamp,temperature,season,std,mean,moving_average,is_outlier
0,New York,2010-01-01,-3.058695,winter,4.988176,-0.070198,,0
1,New York,2010-01-02,13.916703,winter,4.988176,-0.070198,,1
2,New York,2010-01-03,-6.809581,winter,4.988176,-0.070198,,0
3,New York,2010-01-04,0.838385,winter,4.988176,-0.070198,,0
4,New York,2010-01-05,0.012133,winter,4.988176,-0.070198,,0
...,...,...,...,...,...,...,...,...
547495,Mexico City,2109-12-03,12.126426,winter,5.029781,12.082468,14.424527,0
547496,Mexico City,2109-12-04,18.281893,winter,5.029781,12.082468,14.332900,0
547497,Mexico City,2109-12-05,12.867828,winter,5.029781,12.082468,14.264653,0
547498,Mexico City,2109-12-06,22.260401,winter,5.029781,12.082468,14.354379,1


Расспаралеливание вычислений не привело к сокращению времени обработки данных, векторные операции pandas быстрее.

Тест ассинхронности.

In [17]:
import nest_asyncio
import httpx
import asyncio

nest_asyncio.apply()

In [18]:
# Ваш API-ключ и настройки
base_url = "https://api.openweathermap.org/data/2.5/weather"

# Выполнение запроса
async def get_async_weather(city):
    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "en"
    }
    async with httpx.AsyncClient() as client:
        response = await client.get(base_url, params=params)
        if response.status_code == 200:
            data = response.json()
        else:
            return response.text

        return data
    
# Выполнение запроса
def get_sync_weather(city):
    params = {
        "q": city,
        "appid": API_KEY,
        "units": "metric",
        "lang": "en"
    }
    with httpx.Client() as client:
        response = client.get(base_url, params=params)
        if response.status_code == 200:
            data = response.json()
        else:
            return response.text

        return data
    

async def main():
    tasks = [asyncio.create_task(get_async_weather(city)) for city in data.city.unique()]
    for task in tasks:
        await task

Посмотрим сколько займет асинхронный вызов, для всех городов, которые есть у нас в данных.

In [19]:
%%time
asyncio.run(main())

CPU times: user 356 ms, sys: 31.8 ms, total: 388 ms
Wall time: 641 ms


А теперь синхронный

In [20]:
%%time
for city in data.city.unique():
    get_sync_weather(city)

CPU times: user 894 ms, sys: 106 ms, total: 1 s
Wall time: 2.66 s


Применение асинхронности тут оправдано.