## ДЗ 1 (ОБЯЗАТЕЛЬНОЕ): Анализ температурных данных и мониторинг текущей температуры через OpenWeatherMap API

**Описание задания:**  
Вы аналитик в компании, занимающейся изучением климатических изменений и мониторингом температур в разных городах. Вам нужно провести анализ исторических данных о температуре для выявления сезонных закономерностей и аномалий. Также необходимо подключить API OpenWeatherMap для получения текущей температуры в выбранных городах и сравнить её с историческими данными.


### Цели задания:
1. Провести **анализ временных рядов**, включая:
   - Вычисление скользящего среднего и стандартного отклонения для сглаживания температурных колебаний.
   - Определение аномалий на основе отклонений температуры от $ \text{скользящее среднее} \pm 2\sigma $.
   - Построение долгосрочных трендов изменения температуры.
   - Любые дополнительные исследования будут вам в плюс.

2. Осуществить **мониторинг текущей температуры**:
   - Получить текущую температуру через OpenWeatherMap API.
   - Сравнить её с историческим нормальным диапазоном для текущего сезона.

3. Разработать **интерактивное приложение**:
   - Дать пользователю возможность выбрать город.
   - Отобразить результаты анализа температур, включая временные ряды, сезонные профили и аномалии.
   - Провести анализ текущей температуры в контексте исторических данных.


### Описание данных
Исторические данные о температуре содержатся в файле `temperature_data.csv`, включают:
  - `city`: Название города.
  - `timestamp`: Дата (с шагом в 1 день).
  - `temperature`: Среднесуточная температура (в °C).
  - `season`: Сезон года (зима, весна, лето, осень).

Код для генерации файла вы найдете ниже.

### Этапы выполнения

1. **Анализ исторических данных**:
   - Вычислить **скользящее среднее** температуры с окном в 30 дней для сглаживания краткосрочных колебаний.
   - Рассчитать среднюю температуру и стандартное отклонение для каждого сезона в каждом городе.
   - Выявить аномалии, где температура выходит за пределы $ \text{среднее} \pm 2\sigma $.
   - Попробуйте распараллелить проведение этого анализа. Сравните скорость выполнения анализа с распараллеливанием и без него.

2. **Мониторинг текущей температуры**:
   - Подключить OpenWeatherMap API для получения текущей температуры города. Для получения API Key (бесплатно) надо зарегистрироваться на сайте. Обратите внимание, что API Key может активироваться только через 2-3 часа, это нормально. Посему получите ключ заранее.
   - Получить текущую температуру для выбранного города через OpenWeatherMap API.
   - Определить, является ли текущая температура нормальной, исходя из исторических данных для текущего сезона.
   - Данные на самом деле не совсем реальные (сюрпрайз). Поэтому на момент эксперимента погода в Берлине, Каире и Дубае была в рамках нормы, а в Пекине и Москве аномальная. Протестируйте свое решение для разных городов.
   - Попробуйте для получения текущей температуры использовать синхронные и асинхронные методы. Что здесь лучше использовать?

3. **Создание приложения на Streamlit**:
   - Добавить интерфейс для загрузки файла с историческими данными.
   - Добавить интерфейс для выбора города (из выпадающего списка).
   - Добавить форму для ввода API-ключа OpenWeatherMap. Когда он не введен, данные для текущей погоды не показываются. Если ключ некорректный, выведите на экран ошибку (должно приходить `{"cod":401, "message": "Invalid API key. Please see https://openweathermap.org/faq#error401 for more info."}`).
   - Отобразить:
     - Описательную статистику по историческим данным для города, можно добавить визуализации.
     - Временной ряд температур с выделением аномалий (например, точками другого цвета).
     - Сезонные профили с указанием среднего и стандартного отклонения.
   - Вывести текущую температуру через API и указать, нормальна ли она для сезона.

### Критерии оценивания

- Корректное проведение анализа данных – 1 балл.
- Исследование распараллеливания анализа – 1 балл.
- Корректный поиск аномалий – 1 балл.
- Подключение к API и корректность выполнения запроса – 1 балл.
- Проведение эксперимента с синхронным и асинхронным способом запроса к API – 1 балл.
- Создание интерфейса приложения streamlit в соответствии с описанием – 3 балла.
- Корректное отображение графиков и статистик, а также сезонных профилей – 1 балл.
- Корректный вывод текущей температуры в выбранном городе и проведение проверки на ее аномальность – 1 балл.
- Любая дополнительная функциональность приветствуется и оценивается бонусными баллами (не более 2 в сумме) на усмотрение проверяющего.

### Формат сдачи домашнего задания

Решение нужно развернуть в Streamlit Cloud (бесплатно)

*   Создаем новый репозиторий на GitHub.  
*   Загружаем проект.
*   Создаем аккаунт в [Streamlit Cloud](https://streamlit.io/cloud).
*   Авторизуемся в Streamlit Cloud.
*   Создаем новое приложение в Streamlit Cloud и подключаем GitHub-репозиторий.
*   Deploy!

Сдать в форму необходимо:
1. Ссылку на развернутое в Streamlit Cloud приложение.
2. Ссылку на код. Все выводы про, например, использование параллельности/асинхронности опишите в комментариях.

Не забудьте удалить ключ API и иную чувствительную информацию.

### Полезные ссылки
*   [Оформление задачи Титаник на Streamlit](https://github.com/evgpat/streamlit_demo)
*   [Документация Streamlit](https://docs.streamlit.io/)
*   [Блог о Streamlit](https://blog.streamlit.io/)

In [5]:
import pandas as pd
import numpy as np

# Реальные средние температуры (примерные данные) для городов по сезонам
seasonal_temperatures = {
    "New York": {"winter": 0, "spring": 10, "summer": 25, "autumn": 15},
    "London": {"winter": 5, "spring": 11, "summer": 18, "autumn": 12},
    "Paris": {"winter": 4, "spring": 12, "summer": 20, "autumn": 13},
    "Tokyo": {"winter": 6, "spring": 15, "summer": 27, "autumn": 18},
    "Moscow": {"winter": -10, "spring": 5, "summer": 18, "autumn": 8},
    "Sydney": {"winter": 12, "spring": 18, "summer": 25, "autumn": 20},
    "Berlin": {"winter": 0, "spring": 10, "summer": 20, "autumn": 11},
    "Beijing": {"winter": -2, "spring": 13, "summer": 27, "autumn": 16},
    "Rio de Janeiro": {"winter": 20, "spring": 25, "summer": 30, "autumn": 25},
    "Dubai": {"winter": 20, "spring": 30, "summer": 40, "autumn": 30},
    "Los Angeles": {"winter": 15, "spring": 18, "summer": 25, "autumn": 20},
    "Singapore": {"winter": 27, "spring": 28, "summer": 28, "autumn": 27},
    "Mumbai": {"winter": 25, "spring": 30, "summer": 35, "autumn": 30},
    "Cairo": {"winter": 15, "spring": 25, "summer": 35, "autumn": 25},
    "Mexico City": {"winter": 12, "spring": 18, "summer": 20, "autumn": 15},
}

# Сопоставление месяцев с сезонами
month_to_season = {
    12: "winter",
    1: "winter",
    2: "winter",
    3: "spring",
    4: "spring",
    5: "spring",
    6: "summer",
    7: "summer",
    8: "summer",
    9: "autumn",
    10: "autumn",
    11: "autumn",
}


# Генерация данных о температуре
def generate_realistic_temperature_data(cities, num_years=10):
    dates = pd.date_range(start="2010-01-01", periods=365 * num_years, freq="D")
    data = []

    for city in cities:
        for date in dates:
            season = month_to_season[date.month]
            mean_temp = seasonal_temperatures[city][season]
            # Добавляем случайное отклонение
            temperature = np.random.normal(loc=mean_temp, scale=5)
            data.append({"city": city, "timestamp": date, "temperature": temperature})

    df = pd.DataFrame(data)
    df["season"] = df["timestamp"].dt.month.map(lambda x: month_to_season[x])
    return df


# Генерация данных
data = generate_realistic_temperature_data(list(seasonal_temperatures.keys()))
data.to_csv("data/temperature_data.csv", index=False)

In [7]:
df = pd.read_csv("data/temperature_data.csv")

In [8]:
df.head()

Unnamed: 0,city,timestamp,temperature,season
0,New York,2010-01-01,-1.177655,winter
1,New York,2010-01-02,-4.692483,winter
2,New York,2010-01-03,4.375673,winter
3,New York,2010-01-04,-2.249822,winter
4,New York,2010-01-05,-0.193305,winter


In [9]:
from sklearn.linear_model import LinearRegression
import time


def heavy_func(city, df=df):
    new_df = df[df["city"] == city].copy()
    new_df["timestamp"] = pd.to_datetime(new_df["timestamp"])

    new_df.set_index("timestamp", inplace=True)
    new_df["MA"] = new_df["temperature"].rolling(window="30D", min_periods=1).mean()
    new_df["std"] = new_df["temperature"].rolling(window="30D", min_periods=1).std()
    new_df["anomaly"] = np.where(
        np.abs(new_df["temperature"])
        > np.abs(new_df["MA"]) + np.abs(2 * new_df["std"]),
        1,
        0,
    )

    season_df = df[df["city"] == city].copy()
    season_profile = (
        season_df.groupby("season")["temperature"]
        .agg(average="mean", std="std")
        .reset_index()
    )

    trend_df = df[df["city"] == city].copy()
    trend_df["timestamp"] = pd.to_datetime(trend_df["timestamp"])
    timestamps_numeric = (
        (trend_df["timestamp"] - trend_df["timestamp"].min()).dt.total_seconds()
        / 60
        / 60
        / 24
    ).values.reshape(-1, 1)
    temperatures = trend_df["temperature"].values.reshape(-1, 1)

    model = LinearRegression()
    model.fit(timestamps_numeric, temperatures)

    if model.coef_[0, 0] > 0:
        trend = "Positive"
    else:
        trend = "Negative"

    avg_temp = new_df["temperature"].mean()
    min_temp = new_df["temperature"].min()
    max_temp = new_df["temperature"].max()
    anomaly_df = df[df["city"] == city].copy()

    anomaly_df = anomaly_df.merge(season_profile, on="season", how="left")
    anomaly_df["is_anomaly"] = (
        anomaly_df["temperature"] > anomaly_df["average"] + 2 * anomaly_df["std"]
    ) | (anomaly_df["temperature"] < anomaly_df["average"] - 2 * anomaly_df["std"])

    anomaly_points = anomaly_df[anomaly_df["is_anomaly"] == 1]
    return (
        city,
        avg_temp,
        min_temp,
        max_temp,
        new_df["MA"].values.reshape(-1, 1),
        new_df["std"].values.reshape(-1, 1),
        season_profile,
        trend,
        anomaly_points,
    )

In [10]:
start_time = time.time()
for city in seasonal_temperatures.keys():
    heavy_func(city, df)

end_time = time.time()
print(f"Время выполнения: {end_time - start_time:.2f} секунд")

Время выполнения: 0.61 секунд


In [11]:
from concurrent.futures import ThreadPoolExecutor
import itertools

start_time = time.time()
with ThreadPoolExecutor() as executor:
    res = list(
        executor.map(heavy_func, seasonal_temperatures.keys(), itertools.repeat(df))
    )
end_time = time.time()
print(f"Время выполнения: {end_time - start_time:.2f} секунд")

Время выполнения: 1.02 секунд


In [12]:
def heavy_func_mproc(city_df):
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LinearRegression

    city, df = city_df
    new_df = df[df["city"] == city].copy()
    new_df["timestamp"] = pd.to_datetime(new_df["timestamp"])

    new_df.set_index("timestamp", inplace=True)
    new_df["MA"] = new_df["temperature"].rolling(window="30D", min_periods=1).mean()
    new_df["std"] = new_df["temperature"].rolling(window="30D", min_periods=1).std()
    new_df["anomaly"] = np.where(
        np.abs(new_df["temperature"])
        > np.abs(new_df["MA"]) + np.abs(2 * new_df["std"]),
        1,
        0,
    )

    season_df = df[df["city"] == city].copy()
    season_profile = (
        season_df.groupby("season")["temperature"]
        .agg(average="mean", std="std")
        .reset_index()
    )

    trend_df = df[df["city"] == city].copy()
    trend_df["timestamp"] = pd.to_datetime(trend_df["timestamp"])
    timestamps_numeric = (
        (trend_df["timestamp"] - trend_df["timestamp"].min()).dt.total_seconds()
        / 60
        / 60
        / 24
    ).values.reshape(-1, 1)
    temperatures = trend_df["temperature"].values.reshape(-1, 1)

    model = LinearRegression()
    model.fit(timestamps_numeric, temperatures)

    if model.coef_[0, 0] > 0:
        trend = "Positive"
    else:
        trend = "Negative"

    avg_temp = new_df["temperature"].mean()
    min_temp = new_df["temperature"].min()
    max_temp = new_df["temperature"].max()
    anomaly_df = df[df["city"] == city].copy()

    anomaly_df = anomaly_df.merge(season_profile, on="season", how="left")
    anomaly_df["is_anomaly"] = (
        anomaly_df["temperature"] > anomaly_df["average"] + 2 * anomaly_df["std"]
    ) | (anomaly_df["temperature"] < anomaly_df["average"] - 2 * anomaly_df["std"])

    anomaly_points = anomaly_df[anomaly_df["is_anomaly"] == 1]
    return (
        city,
        avg_temp,
        min_temp,
        max_temp,
        new_df["MA"].values.reshape(-1, 1),
        new_df["std"].values.reshape(-1, 1),
        season_profile,
        trend,
        anomaly_points,
    )

In [13]:
from multiprocess import Pool, cpu_count

if __name__ == "__main__":
    start_time = time.time()
    num_workers = cpu_count()

    with Pool(processes=8) as pool:
        results = pool.map(
            heavy_func_mproc, [(city, df) for city in seasonal_temperatures.keys()]
        )
    end_time = time.time()
    print(f"Время выполнения: {end_time - start_time:.2f} секунд")

Время выполнения: 22.78 секунд


При исследовании возможности распараллеливания работы основной "тяжелой функции", было выявлено, что быстрее всего работает просто при последовательных операциях, так как создание процессов или потоков выходит дороже
Для мультипроцессорности была выбрана библиотека multiprocess, она подстроена над библиотекой multiprocessing, но позволяет обрабатывать более сложные типы данных, например DataFrame при помощи dill

In [14]:
api = "***"

In [13]:
import requests
from datetime import datetime

In [14]:
def is_normal(city, api_key):
    response = requests.get(
        f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}"
    )
    data = response.json()
    if response.status_code == 401:
        return None, data
    t = data["main"]["temp"] - 273.15
    _, _, _, _, _, _, season_profile, _, _ = heavy_func(city)
    season = month_to_season[datetime.now().month]
    avg = season_profile[season_profile["season"] == season]["average"].values
    std = season_profile[season_profile["season"] == season]["std"].values

    if (t > avg + 2 * std) | (t < avg - 2 * std):
        return t, "Anomaly temp"
    else:
        return t, "Norm temp"

In [15]:
start_time = time.time()
for city in seasonal_temperatures.keys():
    is_normal(city, api)
end_time = time.time()
print(f"Время выполнения: {end_time - start_time:.2f} секунд")

Время выполнения: 6.89 секунд


In [16]:
import aiohttp


async def is_normal_async(city, api_key):
    async with aiohttp.ClientSession() as session:
        url = (
            f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}"
        )
        async with session.get(url) as response:
            data = await response.json()
            if response.status == 401:
                return None, data

            t = data["main"]["temp"] - 273.15

            _, _, _, _, _, _, season_profile, _, _ = heavy_func(city)
            season = month_to_season[datetime.now().month]
            avg = season_profile[season_profile["season"] == season]["average"].values[
                0
            ]
            std = season_profile[season_profile["season"] == season]["std"].values[0]

            if (t > avg + 2 * std) | (t < avg - 2 * std):
                return t, "Anomaly temp"
            else:
                return t, "Normal temp"

In [3]:
import nest_asyncio

nest_asyncio.apply()

In [19]:
import time
import asyncio

start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [is_normal_async(city, api) for city in seasonal_temperatures.keys()]
results = loop.run_until_complete(asyncio.gather(*tasks))
end_time = time.time()
print(f"Время выполнения: {end_time - start_time:.2f} секунд")

Время выполнения: 1.29 секунд


При исследовании эффективности асинхронных запросов к WeatherAPI, можно заметить, что асинхронности быстрее в 6 раз