## Вычислить скользящее среднее температуры с окном в 30 дней для сглаживания краткосрочных колебаний.

In [3]:
import pandas as pd
import numpy as np
df = pd.read_csv("temperature_data.csv", parse_dates=["timestamp"])
df = df.sort_values(["city", "timestamp"])

In [4]:
df["rolling_mean_30"] = (
    df.groupby("city")["temperature"]
      .transform(lambda x: x.rolling(window=30, min_periods=1).mean())
)

## Рассчитать среднюю температуру и стандартное отклонение для каждого сезона в каждом городе.

In [5]:
season_stats = (
    df.groupby(["city", "season"])
      .agg(
          mean_temp=("temperature", "mean"),
          std_temp=("temperature", "std")
      )
      .reset_index()
)

season_stats.head()

Unnamed: 0,city,season,mean_temp,std_temp
0,Beijing,autumn,15.98187,5.023288
1,Beijing,spring,12.985039,4.997763
2,Beijing,summer,26.888973,4.928311
3,Beijing,winter,-1.92629,5.160746
4,Berlin,autumn,11.020817,4.906201


## Выявить аномалии, где температура выходит за пределы  среднее±2σ .

In [6]:
# merge main + stats

df = df.merge(
    season_stats,
    on=["city", "season"],
    how="left"
)

# is anomaly flag
df["is_anomaly"] = (
    (df["temperature"] > df["mean_temp"] + 2 * df["std_temp"]) |
    (df["temperature"] < df["mean_temp"] - 2 * df["std_temp"])
)

df["is_anomaly"].value_counts(normalize=True)

is_anomaly
False    0.954995
True     0.045005
Name: proportion, dtype: float64

## Попробуйте распараллелить проведение этого анализа. Сравните скорость выполнения анализа с распараллеливанием и без него.

In [14]:
from joblib import Parallel, delayed
import time

df = pd.read_csv("temperature_data.csv", parse_dates=["timestamp"])
df = df.sort_values(["city", "timestamp"])

def analyze_city(city_df):
    city_df = city_df.sort_values("timestamp").copy()
    
    cols_to_drop = ["mean_temp", "std_temp", "mean_temp_x", "std_temp_x", "mean_temp_y", "std_temp_y"]
    city_df = city_df.drop(columns=cols_to_drop, errors="ignore")

    city_df["rolling_mean_30"] = city_df["temperature"].rolling(window=30, min_periods=1).mean()

    stats = city_df.groupby("season", observed=True)["temperature"].agg(
        mean_temp="mean",
        std_temp="std"
    ).reset_index()

    city_df = city_df.merge(stats, on="season", how="left")

    city_df["is_anomaly"] = (
        (city_df["temperature"] > city_df["mean_temp"] + 2 * city_df["std_temp"]) |
        (city_df["temperature"] < city_df["mean_temp"] - 2 * city_df["std_temp"])
    )

    return city_df

# Последовательно
start = time.time()
df_seq = pd.concat(analyze_city(g) for _, g in df.groupby("city"))
time_seq = time.time() - start

# Параллельно
groups = [g for _, g in df.groupby("city")]

start = time.time()
df_par = pd.concat(Parallel(n_jobs=-1)(delayed(analyze_city)(g) for g in groups))
time_par = time.time() - start

print(f"Последовательно:{time_seq:.3f}сек")
print(f"Параллельно:{time_par:.3f}сек")



Последовательно:0.029сек
Параллельно:0.017сек


In [15]:
diff = time_seq/time_par
print(diff)

1.7146030615877537


- Параллельное выполнение оказалось быстрее почти в 2 раза, хотя и объем данных незначительный, но при больших датасетах такой прирост окажет значительное влияние.