In [1]:
import pandas as pd
import numpy as np
import time
from sklearn.linear_model import LinearRegression

### Генерация данных

In [2]:
# Реальные средние температуры (примерные данные) для городов по сезонам
seasonal_temperatures = {
    "New York": {"winter": 0, "spring": 10, "summer": 25, "autumn": 15},
    "London": {"winter": 5, "spring": 11, "summer": 18, "autumn": 12},
    "Paris": {"winter": 4, "spring": 12, "summer": 20, "autumn": 13},
    "Tokyo": {"winter": 6, "spring": 15, "summer": 27, "autumn": 18},
    "Moscow": {"winter": -10, "spring": 5, "summer": 18, "autumn": 8},
    "Sydney": {"winter": 12, "spring": 18, "summer": 25, "autumn": 20},
    "Berlin": {"winter": 0, "spring": 10, "summer": 20, "autumn": 11},
    "Beijing": {"winter": -2, "spring": 13, "summer": 27, "autumn": 16},
    "Rio de Janeiro": {"winter": 20, "spring": 25, "summer": 30, "autumn": 25},
    "Dubai": {"winter": 20, "spring": 30, "summer": 40, "autumn": 30},
    "Los Angeles": {"winter": 15, "spring": 18, "summer": 25, "autumn": 20},
    "Singapore": {"winter": 27, "spring": 28, "summer": 28, "autumn": 27},
    "Mumbai": {"winter": 25, "spring": 30, "summer": 35, "autumn": 30},
    "Cairo": {"winter": 15, "spring": 25, "summer": 35, "autumn": 25},
    "Mexico City": {"winter": 12, "spring": 18, "summer": 20, "autumn": 15},
}

# Сопоставление месяцев с сезонами
month_to_season = {12: "winter", 1: "winter", 2: "winter",
                   3: "spring", 4: "spring", 5: "spring",
                   6: "summer", 7: "summer", 8: "summer",
                   9: "autumn", 10: "autumn", 11: "autumn"}

# Генерация данных о температуре
def generate_realistic_temperature_data(cities, num_years=10):
    dates = pd.date_range(start="2010-01-01", periods=365 * num_years, freq="D")
    data = []

    for city in cities:
        for date in dates:
            season = month_to_season[date.month]
            mean_temp = seasonal_temperatures[city][season]
            # Добавляем случайное отклонение
            temperature = np.random.normal(loc=mean_temp, scale=5)
            data.append({"city": city, "timestamp": date, "temperature": temperature})

    df = pd.DataFrame(data)
    df['season'] = df['timestamp'].dt.month.map(lambda x: month_to_season[x])
    return df

# Генерация данных
data = generate_realistic_temperature_data(list(seasonal_temperatures.keys()))
data.to_csv('temperature_data.csv', index=False)

In [3]:
df = pd.read_csv('temperature_data.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.head()

Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,2.887095,winter
1,New York,2010-01-02,3.402675,winter
2,New York,2010-01-03,-0.999237,winter
3,New York,2010-01-04,6.680488,winter
4,New York,2010-01-05,-5.56032,winter


### 1. Анализ исторических данных:

- Вычислить скользящее среднее температуры с окном в 30 дней для сглаживания краткосрочных колебаний.
- Рассчитать среднюю температуру и стандартное отклонение для каждого сезона в каждом городе.
- Выявить аномалии, где температура выходит за пределы  среднее±2𝜎 .
- Попробуйте распараллелить проведение этого анализа. Сравните скорость выполнения анализа с распараллеливанием и без него.

In [4]:
def city_statistics(df, window_days=30):
    # Определение аномалий по скользящим статистикам
    df["rolling_mean"] = df.temperature.rolling(window=window_days).mean()
    df["rolling_std"] = df.temperature.rolling(window=window_days).std()

    df["is_anomaly"] = np.where(
        df["rolling_mean"].notna() & df["rolling_std"].notna(),
        (df["temperature"] < df["rolling_mean"] - 2 * df["rolling_std"]) |
        (df["temperature"] > df["rolling_mean"] + 2 * df["rolling_std"]),
        np.nan
    )

    # Общий профиль города
    city_profile = df.groupby("city", as_index=False) \
        .agg(temp_mean=("temperature", "mean"),
             temp_min=("temperature", "min"),
             temp_max=("temperature", "max"),
             anomalies_count=("is_anomaly", "sum"),
             obs_count=("timestamp", "size")
             )

    city_profile["anomalies_share"] = city_profile.anomalies_count / city_profile.obs_count

    # + Тренд
    df_temp = df.copy()
    df_temp["timestamp_unix"] = df_temp["timestamp"].astype("int64") // 10 ** 9

    X = df_temp["timestamp_unix"].values.reshape(-1, 1)
    y = df_temp["temperature"].values

    model = LinearRegression()
    model.fit(X, y)

    df["trend"] = model.predict(X)
    slope = model.coef_[0]

    if abs(slope) < 1e-9:
        trend = "doesn`t exist"
    elif slope < 0:
        trend = "negative"
    else:
        trend = "positive"

    city_profile["trend"] = trend

    # Профиль сезона
    # Оставляем город в groupby, чтобы можно было объединить выводы функции в один df
    season_profile = df.groupby(["season", "city"], as_index=False).agg(
        temp_mean=("temperature", "mean"),
        temp_std=("temperature", "std")
    )

    return df, city_profile, season_profile

In [5]:
results = []

start = time.time()

for city in df.city.unique():
    df_temp, city_profile_temp, season_profile_temp = city_statistics(df[df['city'] == city].copy().reset_index(drop=True))
    results.append((df_temp, city_profile_temp, season_profile_temp))

df_wo_parallel = pd.concat([result[0] for result in results], ignore_index=True)
city_profile_wo_parallel = pd.concat([result[1] for result in results], ignore_index=True)
season_profile_wo_parallel = pd.concat([result[2] for result in results], ignore_index=True)

end = time.time()
print(f'Выполнение без распараллеливания: {end - start :.2f} секунд')

Выполнение без распараллеливания: 0.32 секунд


In [6]:
# Для наблюдений первого месяца лежит Nan, т.к. для них нет накопленных данных для скользящего окна
# Вызываем tail
df_wo_parallel.tail()

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,rolling_std,is_anomaly,trend
54745,Mexico City,2019-12-25,7.742859,winter,12.61341,4.524898,0.0,16.426887
54746,Mexico City,2019-12-26,6.608162,winter,12.350975,4.639689,0.0,16.426983
54747,Mexico City,2019-12-27,9.616245,winter,12.177358,4.641378,0.0,16.42708
54748,Mexico City,2019-12-28,8.529817,winter,12.053396,4.688827,0.0,16.427176
54749,Mexico City,2019-12-29,11.310376,winter,11.891022,4.624847,0.0,16.427272


In [7]:
city_profile_wo_parallel

Unnamed: 0,city,temp_mean,temp_min,temp_max,anomalies_count,obs_count,anomalies_share,trend
0,New York,12.485658,-14.400154,40.302351,211.0,3650,0.057808,positive
1,London,11.585265,-11.893173,36.386715,178.0,3650,0.048767,doesn`t exist
2,Paris,12.255139,-11.099069,33.721655,165.0,3650,0.045205,positive
3,Tokyo,16.515341,-9.421874,41.469879,193.0,3650,0.052877,positive
4,Moscow,5.262371,-25.365678,33.012882,223.0,3650,0.061096,positive
5,Sydney,18.760409,-4.292153,42.795422,168.0,3650,0.046027,positive
6,Berlin,10.34599,-16.063702,37.822353,198.0,3650,0.054247,positive
7,Beijing,13.598092,-19.780023,42.314473,227.0,3650,0.062192,positive
8,Rio de Janeiro,25.036387,-0.718259,44.535694,168.0,3650,0.046027,positive
9,Dubai,30.007129,7.351793,57.631327,224.0,3650,0.06137,positive


In [8]:
season_profile_wo_parallel.head()

Unnamed: 0,season,city,temp_mean,temp_std
0,autumn,New York,15.048181,5.029293
1,spring,New York,9.866943,5.057879
2,summer,New York,25.009779,5.019377
3,winter,New York,-0.230865,4.774404
4,autumn,London,11.915265,4.812078


In [9]:
from multiprocess import Pool

def parallel_anomaly_detection(df, num_processes=8):
    groups_by_city = [df[df['city'] == city].copy().reset_index(drop=True) for city in df['city'].unique()]

    with Pool(processes=num_processes) as pool:
        results = pool.map(city_statistics, groups_by_city)
        
    df_result = pd.concat([result[0] for result in results], ignore_index=True)
    city_profile_result = pd.concat([result[1] for result in results], ignore_index=True)
    season_profile_result = pd.concat([result[2] for result in results], ignore_index=True)

    return df_result, city_profile_result, season_profile_result

In [10]:
start = time.time()
df_parallel, city_profile_parallel, season_profile_parallel = parallel_anomaly_detection(df, num_processes=8)
end = time.time()
print(f'Выполнение с распараллеливанием: {end - start:.2f} секунд')

Выполнение с распараллеливанием: 0.36 секунд


In [11]:
df_parallel.tail()

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,rolling_std,is_anomaly,trend
54745,Mexico City,2019-12-25,7.742859,winter,12.61341,4.524898,0.0,16.426887
54746,Mexico City,2019-12-26,6.608162,winter,12.350975,4.639689,0.0,16.426983
54747,Mexico City,2019-12-27,9.616245,winter,12.177358,4.641378,0.0,16.42708
54748,Mexico City,2019-12-28,8.529817,winter,12.053396,4.688827,0.0,16.427176
54749,Mexico City,2019-12-29,11.310376,winter,11.891022,4.624847,0.0,16.427272


In [12]:
city_profile_parallel

Unnamed: 0,city,temp_mean,temp_min,temp_max,anomalies_count,obs_count,anomalies_share,trend
0,New York,12.485658,-14.400154,40.302351,211.0,3650,0.057808,positive
1,London,11.585265,-11.893173,36.386715,178.0,3650,0.048767,doesn`t exist
2,Paris,12.255139,-11.099069,33.721655,165.0,3650,0.045205,positive
3,Tokyo,16.515341,-9.421874,41.469879,193.0,3650,0.052877,positive
4,Moscow,5.262371,-25.365678,33.012882,223.0,3650,0.061096,positive
5,Sydney,18.760409,-4.292153,42.795422,168.0,3650,0.046027,positive
6,Berlin,10.34599,-16.063702,37.822353,198.0,3650,0.054247,positive
7,Beijing,13.598092,-19.780023,42.314473,227.0,3650,0.062192,positive
8,Rio de Janeiro,25.036387,-0.718259,44.535694,168.0,3650,0.046027,positive
9,Dubai,30.007129,7.351793,57.631327,224.0,3650,0.06137,positive


In [13]:
season_profile_parallel.head()

Unnamed: 0,season,city,temp_mean,temp_std
0,autumn,New York,15.048181,5.029293
1,spring,New York,9.866943,5.057879
2,summer,New York,25.009779,5.019377
3,winter,New York,-0.230865,4.774404
4,autumn,London,11.915265,4.812078


Видим, что внутри Jupyter Notebook распараллеливание не дает преимуществ по скорости обработки. Наоброт, запуск с мультипроцессингом выполняется даже медленнее обычного цикла.

Это может быть связано с GIL, ограничивающим многопоточность и используемым Jupyter. При этом в исполняемых файлах фукнции могут повести себя иначе. 

Для реализации приложения на Streamlit в дальнейшем будет использовать распараллеленный запуск.

### 2. Мониторинг текущей температуры:

- Подключить OpenWeatherMap API для получения текущей температуры города. Для получения API Key (бесплатно) надо зарегистрироваться на сайте. Обратите внимание, что API Key может активироваться только через 2-3 часа, это нормально. Посему получите ключ заранее.
- Получить текущую температуру для выбранного города через OpenWeatherMap API.
- Определить, является ли текущая температура нормальной, исходя из исторических данных для текущего сезона.
- Данные на самом деле не совсем реальные (сюрпрайз). Поэтому на момент эксперимента погода в Берлине, Каире и Дубае была в рамках нормы, а в Пекине и Москве аномальная. Протестируйте свое решение для разных городов.
- Попробуйте для получения текущей температуры использовать синхронные и асинхронные методы. Что здесь лучше использовать?

In [14]:
import requests
from datetime import datetime, timezone, timedelta
import aiohttp
import asyncio

import nest_asyncio
nest_asyncio.apply()

In [15]:
def read_api_key(file_path):
    with open(file_path, 'r') as file:
        return file.read().strip()

In [16]:
key = read_api_key('openweather_api_key.txt')

In [18]:
# Сезон локальной даты, для проверки на аномальность
def get_season(unix_timestamp, timezone_offset):
    local_timezone = timezone(timedelta(seconds=timezone_offset))
    local_time = datetime.fromtimestamp(unix_timestamp, tz=local_timezone)
    month = local_time.month
    season = month_to_season[month]
    
    return season


def get_response(city, key):
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={key}&units=metric'
    response = requests.get(url)
    response.raise_for_status() 
    data = response.json()
    
    temperature = data['main']['temp']
    unix_timestamp = data['dt']
    timezone = data['timezone']
    
    season = get_season(unix_timestamp, timezone)
    
    return temperature, season


def is_anomaly(city, season, current_temp, season_profile):
    city_season_profile = season_profile[
        (season_profile['city'] == city) & (season_profile['season'] == season)] \
        .copy().reset_index(drop=True)
    
    if city_season_profile.empty:
        raise ValueError(f'Данные {city}-{season} отсутствуют')
    
    mean = city_season_profile['temp_mean'].values[0]
    std = city_season_profile['temp_std'].values[0]
    
    return bool(current_temp < mean - 2 * std or  current_temp > mean + 2 * std)


def check_temperature(city, key, season_profile):
    temperature, season = get_response(city, key)
    anomaly_status = is_anomaly(city, season, temperature, season_profile)
    
    return temperature, season, anomaly_status

In [19]:
results_sync = []

start = time.time()

for city in df.city.unique():
    try:
        temperature, season, anomaly_status = check_temperature(city, key, season_profile_parallel)
        status = 'anomaly' if anomaly_status else 'normal'
        results_sync.append((city, temperature, season, status))
    except ValueError as e:
        print(e)
        
end = time.time()

for city, temperature, season, status in results_sync:
    print(f'For {city} current temperature {temperature}C is {status}')
    
print(f'Синхронное выполнение: {end - start:.2f} секунд')

For New York current temperature 8.34C is normal
For London current temperature 4.17C is normal
For Paris current temperature 4.07C is normal
For Tokyo current temperature 5.99C is normal
For Moscow current temperature 0.96C is anomaly
For Sydney current temperature 21.45C is normal
For Berlin current temperature 1.71C is normal
For Beijing current temperature -5.06C is normal
For Rio de Janeiro current temperature 25.67C is normal
For Dubai current temperature 25.96C is normal
For Los Angeles current temperature 10.52C is normal
For Singapore current temperature 26.03C is normal
For Mumbai current temperature 28.99C is normal
For Cairo current temperature 19.42C is normal
For Mexico City current temperature 10.06C is normal
Синхронное выполнение: 5.25 секунд


In [20]:
async def get_response_async(city, key):
    url = f'https://api.openweathermap.org/data/2.5/weather?q={city}&appid={key}&units=metric'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            response.raise_for_status()
            data = await response.json()
            
            temperature = data['main']['temp']
            unix_timestamp = data['dt']
            timezone = data['timezone']
            
            season = get_season(unix_timestamp, timezone)
            
            return city, temperature, season


async def check_temperatures_async(cities, key, season_profile):
    tasks = [get_response_async(city, key) for city in cities]
    responses = await asyncio.gather(*tasks)
    results = []
    
    for city, temperature, season in responses:
        anomaly_status = is_anomaly(city, season, temperature, season_profile)
        status = 'anomaly' if anomaly_status else 'normal'
        results.append((city, temperature, season, status))

    return results

In [23]:
start = time.time()
results_async = await check_temperatures_async(df.city.unique(), key, season_profile_parallel)
end = time.time()

for city, temperature, season, status in results_async:
    print(f'For {city} current temperature {temperature}C is {status}')

print(f'Асинхронное выполнение: {end - start:.2f} секунд')

For New York current temperature 8.14C is normal
For London current temperature 4.21C is normal
For Paris current temperature 4.07C is normal
For Tokyo current temperature 5.99C is normal
For Moscow current temperature 0.96C is anomaly
For Sydney current temperature 21.4C is normal
For Berlin current temperature 1.71C is normal
For Beijing current temperature -5.06C is normal
For Rio de Janeiro current temperature 25.67C is normal
For Dubai current temperature 25.96C is normal
For Los Angeles current temperature 10.32C is normal
For Singapore current temperature 26.03C is normal
For Mumbai current temperature 28.99C is normal
For Cairo current temperature 19.42C is normal
For Mexico City current temperature 10.06C is normal
Асинхронное выполнение: 0.60 секунд


А вот уже асинхронное выполнение запроса по API сработало гораздо быстрее синхронного цикла (не блокируемся на ожидание ответа от сервера, передаем управление). 

Но т.к. в Streamlit нам нужно будет подгружать по API только текущую температуру для одного выбранного города — применяем в нем синхронную функцию.

### 3. Создание приложения на Streamlit:

- Добавить интерфейс для загрузки файла с историческими данными.
- Добавить интерфейс для выбора города (из выпадающего списка).
- Добавить форму для ввода API-ключа OpenWeatherMap. Когда он не введен, данные для текущей погоды не показываются. Если ключ некорректный, выведите на экран ошибку (должно приходить {"cod":401, "message": "Invalid API key. Please see https://openweathermap.org/faq#error401 for more info."}).

Отобразить:
- Описательную статистику по историческим данным для города, можно добавить визуализации.
- Временной ряд температур с выделением аномалий (например, точками другого цвета).
- Сезонные профили с указанием среднего и стандартного отклонения.
- Вывести текущую температуру через API и указать, нормальна ли она для сезона.

Реализовано в папке streamlit_app