# Вычисление метрик

In [60]:
import aiohttp
import asyncio
import json
import multiprocessing
import requests
import pandas as pd
import numpy as np

from time import time
from sklearn.linear_model import LinearRegression
from google.colab import userdata

In [13]:
df = pd.read_csv("/content/temperature_data.csv")
cities = df["city"].unique()
season = "winter"

In [51]:
def aggregate_data(city, df_init):
    df = df_init[df_init["city"] == city].copy()

    min_temperature = df["temperature"].min()
    max_temperature = df["temperature"].max()
    mean_temperature = df["temperature"].mean()

    anomalies = df.copy()

    anomalies["moving_average"] = anomalies["temperature"].rolling(window=30, min_periods=1).mean()
    anomalies["moving_std"] = anomalies["temperature"].rolling(window=30, min_periods=2).std()
    anomalies["is_anomaly"] = anomalies.apply(
        lambda column:
                (column["temperature"] > column["moving_average"] + 2 * column["moving_std"]) |\
                (column["temperature"] < column["moving_average"] - 2 * column["moving_std"]),
        axis=1
    )
    anomalies = anomalies[["timestamp", "temperature", "is_anomaly"]]

    season_profile = df.copy()
    season_profile = df.groupby("season")["temperature"].agg(average="mean", std="std")

    trend = df.copy()

    trend["timestamp_ordinal"] = pd.to_datetime(trend["timestamp"])
    trend["timestamp_ordinal"] = trend["timestamp_ordinal"].map(pd.Timestamp.toordinal)

    X = trend[["timestamp_ordinal"]]
    y = trend[["temperature"]]

    linreg = LinearRegression()
    linreg.fit(X, y)
    trend["trend"] = linreg.predict(X)

    trend = trend[["timestamp", "trend"]]

    return {
        city: [
            mean_temperature,
            min_temperature,
            max_temperature,
            season_profile,
            trend,
            anomalies
        ]
    }

def weather_data(cities, df):
    temp_weather_data = []
    weather_data = {}

    for city in cities:
        temp_weather_data.append(aggregate_data(city, df))

    for temp_weather_object in temp_weather_data:
        weather_data.update(temp_weather_object)

    return weather_data

def weather_data_mult(cities, df):
    temp_weather_data = []
    weather_data = {}

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        args = [(city, df) for city in cities]
        temp_weather_data = pool.starmap(aggregate_data, args)

    for temp_weather_object in temp_weather_data:
        weather_data.update(temp_weather_object)

    return weather_data


In [46]:
%%time

wd = weather_data(cities, df)

CPU times: user 1.38 s, sys: 2.41 ms, total: 1.38 s
Wall time: 1.38 s


In [47]:
%%time

wdm = weather_data_mult(cities, df)

CPU times: user 154 ms, sys: 41.8 ms, total: 195 ms
Wall time: 1.92 s


Не смотря на то Wall time у методов одинаковый, время цпу у метода с мультипроцессингом меньше. Если мы будем увеличивать количество задач для ЦПУ не увеличивая задач связанных с вводом выводом, то думаю, что наш выйгрыш от мультипроцессов будет увеличиваться и разница walltime будет сильнее.

In [93]:
def get_temperatures(cities, api_key):
    temperatures = {}

    for city in cities:
        base_url = "https://api.openweathermap.org/data/2.5/weather?"
        params = {
            'q': city,
            "appid": api_key,
            "units": "metric"
        }

        response = requests.get(url=base_url, params=params)
        if response.status_code == 200:
            temperature = response.json()["main"]["temp"]
            temperatures.update({city: temperature})
        else:
            temperatures.update({city: None})

    return temperatures

def validate_temperature(cities, df, season):
    validations = {}

    weather_data = collect_weather_data(cities, df)
    temperatures = get_temperatures(cities, api_key=userdata.get('API_WEATHER'))

    for city in cities:
        season_profile = weather_data[city][3]
        season_top = season_profile.loc[season, "average"] + season_profile.loc[season, "std"]
        season_bottom = season_profile.loc[season, "average"] - season_profile.loc[season, "std"]


        if temperatures[city] is None:
            validations.update({city: "Нет данных"})
        elif temperatures[city] > season_top:
            validations.update({city: "рекорд жары"})
        elif temperatures[city] < season_bottom:
             validations.update({city: "Рекорд холода"})
        else:
             validations.update({city: "Ок"})

    return validations

In [98]:
%%time

validations = validate_temperature(cities, df, "winter")

CPU times: user 2.4 s, sys: 12.7 ms, total: 2.41 s
Wall time: 4.39 s


## Оптимизированный вариант

In [95]:
async def async_get_temperatures(city, api_key):
    base_url = "https://api.openweathermap.org/data/2.5/weather?"
    params = {
        'q': city,
        "appid": api_key,
        "units": "metric"
    }

    async with aiohttp.ClientSession() as session:
        async with session.get(url=base_url, params=params) as response:
            temperature = await response.text()
            temperature = json.loads(temperature)["main"]["temp"]

    return {city: temperature}

async def collect_temperatures(cities: list, api_key: str) -> dict:
    temperature = {}
    tasks = [async_get_temperatures(city=city, api_key=api_key) for city in cities]
    ts = await asyncio.gather(*tasks)

    for t in ts:
        temperature.update(t)

    return temperature

async def async_validate_temperature(cities, df, api_key, season):
    validations = {}

    weather_data = collect_weather_data_multiprocess(cities, df)
    temperatures = await collect_temperatures(cities, api_key=userdata.get('API_WEATHER'))

    for city in cities:
        season_profile = weather_data[city][3]

        season_top = season_profile.loc[season, "average"] + season_profile.loc[season, "std"]
        season_bottom = season_profile.loc[season, "average"] - season_profile.loc[season, "std"]

        if temperatures[city] is None:
            validations.update({city: "Нет данных"})
        elif temperatures[city] > season_top:
            validations.update({city: "рекорд жары"})
        elif temperatures[city] < season_bottom:
             validations.update({city: "Рекорд холода"})
        else:
             validations.update({city: "Ок"})

    return validations

In [96]:
start = time()
validations = await async_validate_temperature(cities, df, userdata.get('API_WEATHER'), season)
print(f'Потрачено: {time() - start} ')

Потрачено: 3.284205675125122 


Работает быстрее, интересно, если делать это не в коллабе, будет ли больше разница. На самом деле сильно зависит от запуска, у меня были запуски без асинка, которые выполнялись за 2 секунды, и есть за 6. Но вроде что-то вышло