In [1]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor


data = pd.read_csv('/workspaces/AdvPython_HW1/temperature_data.csv')

In [9]:
def process_temperature_data(data):
    results = []
    cities = data['city'].unique()
    for city in cities:
        city_data = data[data['city'] == city]
        city_data['rolling_mean'] = city_data['temperature'].rolling(window=30).mean()
        city_data['rolling_std'] = city_data['temperature'].rolling(window=30).std()
        mean_temp = city_data['temperature'].mean()
        std_temp = city_data['temperature'].std()
        anomalies = city_data[(city_data['temperature'] - city_data['rolling_mean']).abs() > 2 * city_data['rolling_std']]
        for season in city_data.season.unique():
            city_data_season = city_data.loc[city_data.season == season]
        results.append({
            'city': city,
            'average': mean_temp,
            'min': city_data['temperature'].min(),
            'max': city_data['temperature'].max(),
            'season_avg': [city_data.loc[city_data.season == season]['temperature'].mean() for season in city_data.season.unique()],
            'season_std': [city_data.loc[city_data.season == season]['temperature'].std() for season in city_data.season.unique()],
            'trend': 'нормальный' if mean_temp > 0 else 'понижающийся',
            'anomalies': anomalies
        })
    return results

In [10]:
%%timeit
with ThreadPoolExecutor() as executor:
    analysis_results = list(executor.map(process_temperature_data, [data[data['city'] == city] for city in data['city'].unique().tolist()]))

234 ms ± 12.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%%timeit
analysis_results = [process_temperature_data(data[data['city'] == city]) for city in data['city'].unique().tolist()]

182 ms ± 6.66 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
data_x100 = pd.concat([data]*100)

In [13]:
%%timeit
with ThreadPoolExecutor() as executor:
    analysis_results = list(executor.map(process_temperature_data, [data_x100[data_x100['city'] == city] for city in data_x100['city'].unique().tolist()]))

11 s ± 40 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
%%timeit
analysis_results = [process_temperature_data(data_x100[data_x100['city'] == city]) for city in data_x100['city'].unique().tolist()]

11.9 s ± 66 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [15]:
def process_temperature_data_v2(data):
    results = []
    cities = data['city'].unique()
    for city in cities:
        city_data = data[data['city'] == city]
        city_data['rolling_mean'] = city_data['temperature'].rolling(window=30).mean()
        city_data['rolling_std'] = city_data['temperature'].rolling(window=30).std()
        mean_temp = city_data['temperature'].mean()
        std_temp = city_data['temperature'].std()
        anomalies = city_data[(city_data['temperature'] - city_data['rolling_mean']).abs() > 2 * city_data['rolling_std']]
        for season in city_data.season.unique():
            city_data_season = city_data.loc[city_data.season == season]
        results.append({
            'city': city,
            'average': mean_temp,
            'min': city_data['temperature'].min(),
            'max': city_data['temperature'].max(),
            'season_avg': city_data.groupby('season')['temperature'].mean(),
            'season_std': city_data.groupby('season')['temperature'].std(),
            'trend': 'нормальный' if mean_temp > 0 else 'понижающийся',
            'anomalies': anomalies
        })
    return results

In [16]:
%%timeit
with ThreadPoolExecutor() as executor:
    analysis_results = list(executor.map(process_temperature_data_v2, [data[data['city'] == city] for city in data['city'].unique().tolist()]))

156 ms ± 4.47 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [17]:
%%timeit
analysis_results = [process_temperature_data_v2(data[data['city'] == city]) for city in data['city'].unique().tolist()]

140 ms ± 8.64 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [18]:
%%timeit
with ThreadPoolExecutor() as executor:
    analysis_results = list(executor.map(process_temperature_data_v2, [data_x100[data_x100['city'] == city] for city in data_x100['city'].unique().tolist()]))

8.42 s ± 243 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%%timeit
analysis_results = [process_temperature_data_v2(data_x100[data_x100['city'] == city]) for city in data_x100['city'].unique().tolist()]

9.13 s ± 226 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


На меленьком сете распараллеливания замедляют, чем больше сет, тем более выгодно параллельно выполнять  
Лимитирующей стадией является рассчет статистик по сезоном (около 25% ускорения)
groupby в падас написан на C и работает очень быстро. Для умеренного размера датасетов лучше использовать pandas полностью. И потом конкатенировать результаты. 