In [1]:
# import pandas as pd
# import numpy as np

# # Реальные средние температуры (примерные данные) для городов по сезонам
# seasonal_temperatures = {
#     "New York": {"winter": 0, "spring": 10, "summer": 25, "autumn": 15},
#     "London": {"winter": 5, "spring": 11, "summer": 18, "autumn": 12},
#     "Paris": {"winter": 4, "spring": 12, "summer": 20, "autumn": 13},
#     "Tokyo": {"winter": 6, "spring": 15, "summer": 27, "autumn": 18},
#     "Moscow": {"winter": -10, "spring": 5, "summer": 18, "autumn": 8},
#     "Sydney": {"winter": 12, "spring": 18, "summer": 25, "autumn": 20},
#     "Berlin": {"winter": 0, "spring": 10, "summer": 20, "autumn": 11},
#     "Beijing": {"winter": -2, "spring": 13, "summer": 27, "autumn": 16},
#     "Rio de Janeiro": {"winter": 20, "spring": 25, "summer": 30, "autumn": 25},
#     "Dubai": {"winter": 20, "spring": 30, "summer": 40, "autumn": 30},
#     "Los Angeles": {"winter": 15, "spring": 18, "summer": 25, "autumn": 20},
#     "Singapore": {"winter": 27, "spring": 28, "summer": 28, "autumn": 27},
#     "Mumbai": {"winter": 25, "spring": 30, "summer": 35, "autumn": 30},
#     "Cairo": {"winter": 15, "spring": 25, "summer": 35, "autumn": 25},
#     "Mexico City": {"winter": 12, "spring": 18, "summer": 20, "autumn": 15},
# }

# # Сопоставление месяцев с сезонами
# month_to_season = {12: "winter", 1: "winter", 2: "winter",
#                    3: "spring", 4: "spring", 5: "spring",
#                    6: "summer", 7: "summer", 8: "summer",
#                    9: "autumn", 10: "autumn", 11: "autumn"}

# # Генерация данных о температуре
# def generate_realistic_temperature_data(cities, num_years=10):
#     dates = pd.date_range(start="2010-01-01", periods=365 * num_years, freq="D")
#     data = []

#     for city in cities:
#         for date in dates:
#             season = month_to_season[date.month]
#             mean_temp = seasonal_temperatures[city][season]
#             # Добавляем случайное отклонение
#             temperature = np.random.normal(loc=mean_temp, scale=5)
#             data.append({"city": city, "timestamp": date, "temperature": temperature})

#     df = pd.DataFrame(data)
#     df['season'] = df['timestamp'].dt.month.map(lambda x: month_to_season[x])
#     return df

# # Генерация данных
# data = generate_realistic_temperature_data(list(seasonal_temperatures.keys()))
# data.to_csv('temperature_data.csv', index=False)

# Первое задание

In [2]:
%cd ~/Documents/Магистратура/1 курс/2 модуль/Прикладной Python/ДЗ-1

/home/boyarskikhae/Documents/Магистратура/1 курс/2 модуль/Прикладной Python/ДЗ-1


In [3]:
import multiprocessing
import concurrent.futures
import requests
import json
import aiohttp
import asyncio
import pandas as pd
import numpy as np

from time import time
from sklearn.linear_model import LinearRegression
from config.config import API_KEY

In [4]:
df = pd.read_csv('data/temperature_data.csv')
cities = df['city'].unique()
season = 'winter'

In [5]:
def get_weather_data(city: str, df: pd.DataFrame) -> pd.DataFrame:
    # Фильтруем город.
    df = df[df['city'] == city].copy()

    # Расчёт минимальной, максимальной и средней температуры.
    min_temperature = df['temperature'].min()
    max_temperature = df['temperature'].max()
    mean_temperature = df['temperature'].mean()

    # Рассчитываем аномальные значения.
    anomalies = df.copy()
    
    anomalies['moving_average'] = anomalies['temperature'].rolling(window=30, min_periods=1).mean()
    anomalies['moving_std'] = anomalies['temperature'].rolling(window=30, min_periods=1).std()
    anomalies['is_anomaly'] = anomalies.apply(
        lambda column: 
            1 if 
                (column['temperature'] >= column['moving_average'] + 2 * column['moving_std']) |\
                (column['temperature'] <= column['moving_average'] - 2 * column['moving_std'])
            else 0,
        axis=1
    )
    anomalies = anomalies[['timestamp', 'temperature', 'is_anomaly']]

    # Получаем профиль сезона.
    season_profile = df.copy()
    season_profile = df.groupby('season')['temperature'].agg(average='mean', std='std')
    
    # Вычисляем тренд.
    trend = df.copy()
    
    trend['timestamp_ordinal'] = pd.to_datetime(trend['timestamp'])
    trend['timestamp_ordinal'] = trend['timestamp_ordinal'].map(pd.Timestamp.toordinal)

    X = trend[['timestamp_ordinal']]
    y = trend[['temperature']]
    
    regressor = LinearRegression()
    regressor.fit(X=X, y=y)
    trend['trend'] = regressor.predict(X=X)
    
    trend = trend[['timestamp', 'trend']]

    return {
        city: [
            mean_temperature, 
            min_temperature, 
            max_temperature,
            season_profile,
            trend,
            anomalies
        ]
    }

def collect_weather_data(cities: str, df: pd.DataFrame):
    temp_weather_data = []
    weather_data = {}
    
    for city in cities:
        temp_weather_data.append(get_weather_data(city=city, df=df))
    
    for temp_weather_object in temp_weather_data:
        weather_data.update(temp_weather_object)

    return weather_data

def collect_weather_data_multiprocess(cities: str, df: pd.DataFrame):
    temp_weather_data = []
    weather_data = {}

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        args = [(city, df) for city in cities]
        temp_weather_data = pool.starmap(get_weather_data, args)
    
    for temp_weather_object in temp_weather_data:
        weather_data.update(temp_weather_object)

    return weather_data

def collect_weather_data_concurrent(cities: str, df: pd.DataFrame):
    temp_weather_data = []
    weather_data = {}
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        args = [(city, df) for city in cities]
        temp_weather_object = list(executor.map(get_weather_data, *zip(*args)))
    
    for temp_weather_object in temp_weather_data:
        weather_data.update(temp_weather_object)

    return weather_data

## Не оптимизированный вариант

In [6]:
%%time

wd = collect_weather_data(cities=cities, df=df)

CPU times: user 1.22 s, sys: 1.41 ms, total: 1.22 s
Wall time: 1.24 s


## Оптимизированный вариант

### multiprocessing

In [7]:
%%time

wdm = collect_weather_data_multiprocess(cities=cities, df=df)

CPU times: user 164 ms, sys: 101 ms, total: 264 ms
Wall time: 562 ms


### concurrent.futures

In [8]:
%%time

wdc = collect_weather_data_concurrent(cities=cities, df=df)

CPU times: user 158 ms, sys: 86 ms, total: 244 ms
Wall time: 480 ms


`Мультипроцессорные подходы работают в несколько раз быстрее не оптимизированого подхода. Для создания процессов были использованы две библиотеки: multiprocessin и concurrent.futures, которые сработали практически одинаково.`

## Второе задание

## Не оптимизированный вариант

In [9]:
def get_temperatures(cities: list, api_key: str) -> float:
    temperatures = {}
    
    for city in cities:
        base_url = 'https://api.openweathermap.org/data/2.5/weather?'
        params = {
            'q': city,
            'appid': api_key,
            'units': 'metric'
        }
        
        response = requests.get(url=base_url, params=params)
        temperature = response.json()['main']['temp']
        temperatures.update({city: temperature})

    return temperatures

def validate_temperature(cities: list, df: pd.DataFrame, season: dict='winter') -> str:
    validations = {}
    
    weather_data = collect_weather_data(cities=cities, df=df) 
    temperatures = get_temperatures(cities=cities, api_key=API_KEY)
    
    for city in cities:
        season_profile = weather_data[city][3]
        
        season_average = season_profile.loc[season, 'average']
        season_std = season_profile.loc[season, 'std']
        season_top = season_average + season_std
        season_bottom = season_average - season_std
        
        if temperatures[city] > season_top: validations.update({city: 'Слишком жарко!'})
        elif temperatures[city] < season_bottom: validations.update({city: 'Слишком холодно!'})
        else: validations.update({city: 'В норме!'})

    return validations

In [10]:
%%time

validations = validate_temperature(df=df, cities=cities, season='winter')

CPU times: user 1.15 s, sys: 20.3 ms, total: 1.17 s
Wall time: 13.1 s


## Оптимизированный вариант

In [11]:
async def async_get_temperatures(city: str, api_key: str) -> float:
    base_url = 'https://api.openweathermap.org/data/2.5/weather?'
    params = {
        'q': city,
        'appid': api_key,
        'units': 'metric'
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.get(url=base_url, params=params) as response:
            temperature = await response.text()
            temperature = json.loads(temperature)['main']['temp']

    return {city: temperature}

async def collect_temperatures(cities: list, api_key: str) -> dict:
    temperature = {}
    tasks = [async_get_temperatures(city=city, api_key=api_key) for city in cities]
    ts = await asyncio.gather(*tasks)
    
    for t in ts:
        temperature.update(t)

    return temperature

async def async_validate_temperature(cities: list, df: pd.DataFrame, api_key: str, season: dict='winter') -> str:
    validations = {}
    
    weather_data = collect_weather_data_multiprocess(cities=cities, df=df) 
    temperatures = await collect_temperatures(cities=cities, api_key=api_key)
    
    for city in cities:
        season_profile = weather_data[city][3]
        
        season_average = season_profile.loc[season, 'average']
        season_std = season_profile.loc[season, 'std']
        season_top = season_average + season_std
        season_bottom = season_average - season_std
        
        if temperatures[city] > season_top: validations.update({city: 'Слишком жарко!'})
        elif temperatures[city] < season_bottom: validations.update({city: 'Слишком холодно!'})
        else: validations.update({city: 'В норме!'})

    return validations

In [12]:
start = time()
validations = await async_validate_temperature(cities=cities, df=df, api_key=API_KEY, season=season)
print(f'Выполнение заняло {time() - start} секунд')

Выполнение заняло 1.3895983695983887 секунд


`Код с асинхронностью и дополнительной оптимизацией с помощью многопроцессност сработал ожидаемо быстрее. Почти в 14 раз! Это достаточно быстро, основной вклад внесло асинхронное обращение к API.`