In [102]:
import os
import json
import requests
from pprint import pp
import time
import asyncio

from joblib import Parallel, delayed
import aiohttp
import pandas as pd
from ipywidgets import interact, interact_manual
import seaborn as sns
import matplotlib.pyplot as plt

In [86]:
import nest_asyncio
nest_asyncio.apply()

## 

In [14]:
df = pd.read_csv("temperature_data.csv")
df = df.sort_values(by=["city", "timestamp"]).reset_index(drop=True)

# EDA

### Rolling mean

In [15]:
# First variant: iterate through all the cities an compute rolling mean for each

%time
df["rolling_mean"] = None
for city in df["city"].unique():
    df.loc[df.city == city, "rolling_mean"] = df.loc[df.city == city, "temperature"].rolling(window=30).mean()

df.tail()

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 7.15 µs


Unnamed: 0,city,timestamp,temperature,season,rolling_mean
54745,Tokyo,2019-12-25,16.445986,winter,9.080301
54746,Tokyo,2019-12-26,3.555002,winter,8.364669
54747,Tokyo,2019-12-27,7.232076,winter,8.090793
54748,Tokyo,2019-12-28,5.602815,winter,7.890499
54749,Tokyo,2019-12-29,10.444926,winter,7.5479


In [16]:
def add_rolling_mean(x):
    return x.assign(rolling_mean=x["temperature"].rolling(window=30).mean())

In [17]:
# Second variant: perform computations for each city in a separate process
%time
out = Parallel(n_jobs=4)(delayed(add_rolling_mean)(df.loc[df.city == city]) for city in df["city"].unique())
df = pd.concat(out)
df.tail()

CPU times: user 4 µs, sys: 2 µs, total: 6 µs
Wall time: 11 µs


Unnamed: 0,city,timestamp,temperature,season,rolling_mean
54745,Tokyo,2019-12-25,16.445986,winter,9.080301
54746,Tokyo,2019-12-26,3.555002,winter,8.364669
54747,Tokyo,2019-12-27,7.232076,winter,8.090793
54748,Tokyo,2019-12-28,5.602815,winter,7.890499
54749,Tokyo,2019-12-29,10.444926,winter,7.5479


**Result:** Multiprocessing is slower.

Possibly, due to the high cost of creating new resources combined with the low complexity of the task. 

### Mean and std for each city/season

In [18]:
df_stats = df.groupby(by=["city", "season"], as_index=False).agg({"temperature": ["mean", "std"]})
df_stats.columns = ["_".join(i) if i[1] != "" else i[0] for i in df_stats.columns]
df_stats.head()

Unnamed: 0,city,season,temperature_mean,temperature_std
0,Beijing,autumn,16.017467,4.973704
1,Beijing,spring,13.159323,5.24232
2,Beijing,summer,27.075768,4.861994
3,Beijing,winter,-1.922736,5.163958
4,Berlin,autumn,11.185098,5.084606


### Find anomalies

In [19]:
# merge stats to the data
df = df.merge(df_stats, on=["city", "season"], how="left")

In [20]:
df["anomaly"] = 0

condition = ((df["temperature"] < df.temperature_mean - 2 * df.temperature_std) | 
             (df["temperature"] > df.temperature_mean + 2 * df.temperature_std))

df.loc[condition, "anomaly"] = 1

In [21]:
df

Unnamed: 0,city,timestamp,temperature,season,rolling_mean,temperature_mean,temperature_std,anomaly
0,Beijing,2010-01-01,-0.468425,winter,,-1.922736,5.163958,0
1,Beijing,2010-01-02,11.562106,winter,,-1.922736,5.163958,1
2,Beijing,2010-01-03,-4.999808,winter,,-1.922736,5.163958,0
3,Beijing,2010-01-04,-1.695891,winter,,-1.922736,5.163958,0
4,Beijing,2010-01-05,-9.744884,winter,,-1.922736,5.163958,0
...,...,...,...,...,...,...,...,...
54745,Tokyo,2019-12-25,16.445986,winter,9.080301,6.297711,5.035026,1
54746,Tokyo,2019-12-26,3.555002,winter,8.364669,6.297711,5.035026,0
54747,Tokyo,2019-12-27,7.232076,winter,8.090793,6.297711,5.035026,0
54748,Tokyo,2019-12-28,5.602815,winter,7.890499,6.297711,5.035026,0


# OpenWeatherMap API

In [22]:
from dotenv import load_dotenv
load_dotenv()

True

In [54]:
API_KEY = os.getenv("OpenWeatherMapAPIKey")
URL_TEMPLATE = "https://api.openweathermap.org/data/2.5/weather?q={city}&units=metric&appid={API_KEY}"

### View current temperature

In [55]:
@interact
def show_current_weather(city=df["city"].unique()):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))
    pp(json.loads(resp.text))

interactive(children=(Dropdown(description='city', options=('Beijing', 'Berlin', 'Cairo', 'Dubai', 'London', '…

### Detect anomalies

In [73]:
@interact
def show_current_weather_with_anomalies(city=df["city"].unique()):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))
    curr_temp = json.loads(resp.text)["main"]["temp"]
    normal_mean, normal_std = (df_stats
                   .loc[(df_stats.city == city) & (df_stats.season == "winter"),
                        ["temperature_mean", "temperature_std"]]
                   .values[0])
    is_anomaly_detected = "not detected"
    if (curr_temp > normal_mean + 2 * normal_std) or (curr_temp < normal_mean - 2 * normal_std):
        is_anomaly_detected = "detected"
    
    print(f"City: {city}")
    print(f"Normal temperture bounds: ({normal_mean - 2 * normal_std} 'C, {normal_mean + 2 * normal_std} 'C)")
    print(f"Current temperature: {curr_temp} 'C, Anomaly {is_anomaly_detected}")

interactive(children=(Dropdown(description='city', options=('Beijing', 'Berlin', 'Cairo', 'Dubai', 'London', '…

After testing all the cities, anomaly was detected in none of them. 

### Testing synchronious and asynchronioys requests.

Let's try to retireive weather for all the cities at once

In [91]:
def get_weather_synch(city):
    resp = requests.get(URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    ))
    return json.loads(resp.text)

start = time.time()
for city in df.city.unique():
    get_weather_synch(city)
end = time.time()
print(f"Synchronious time: {end - start:.2f} sec.")

Synchronious time: 2.83 sec.


In [92]:
async def get_weather_asynch(city):
    url = URL_TEMPLATE.format(
        city=city,
        API_KEY=API_KEY
    )
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
                content = await response.text()
                return json.loads(content)
            
start = time.time()
await asyncio.gather(*[get_weather_asynch(city) for city in df.city.unique()])
end = time.time()
print(f"Asynchronious time: {end - start:.2f} sec.")

Asynchronious time: 0.22 sec.


**Asynch** version is **muuuuuch** faster. Better to use it.