# Météo Toulouse

In [1]:
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client import InfluxDBClient, Point, WriteOptions
import rx
from rx import operators as ops
from collections import OrderedDict
from csv import DictReader
from datetime import datetime
import pandas as pd
import numpy as np
import re

In [2]:
from IPython.display import display, HTML
def print_df(df):
    display(HTML(df.to_html()))

In [3]:
bucket = "meteo_toulouse"
org = "obd_influxdb"
token = "DkqpJF-KB6oCD25k7E8woeHxK0deggIpNyDxFeoszhLogSRrYLKdYMdcZjVGXyutwFSFBfLD5-Jkbah6wt9ujA=="
url="http://localhost:8086"

client = influxdb_client.InfluxDBClient(
        url=url,
        token=token,
        org=org,
        timeout=60_000
    )

### Import dataset

In [4]:
station_list = [
    '36-station-meteo-toulouse-purpan',
    '12-station-meteo-toulouse-montaudran',
    '53-station-meteo-toulouse-ponsan',
    '04-station-meteo-toulouse-ile-empalot',
    '15-station-meteo-l-union-ecole',
    '01-station-meteo-toulouse-meteopole',
    '00-station-meteo-toulouse-valade'
]

In [22]:
def parse_row(row, station_name):
    if row['humidite'] == '':
        print(row)
    return influxdb_client.Point("meteo") \
        .tag('station', station_name) \
        .field('humidite', float(row['humidite'])) \
        .field('pluie', float(row['pluie'])) \
        .field('pluie_intensite_max', float(row['pluie_intensite_max'])) \
        .field('force_moyenne_du_vecteur_vent', float(row['force_moyenne_du_vecteur_vent'])) \
        .field('direction_du_vecteur_vent_moyen', float(row['direction_du_vecteur_vent_moyen'])) \
        .field('direction_du_vecteur_de_vent_max', float(row['direction_du_vecteur_de_vent_max'])) \
        .field('force_rafale_max', float(row['force_rafale_max'])) \
        .field('direction_du_vecteur_de_rafale_de_vent_max', float(row['direction_du_vecteur_de_rafale_de_vent_max'])) \
        .field('temperature', float(row['temperature'])) \
        .field('pression', float(row['pression'])) \
        .time(row['heure_de_paris'])

def gen_rows(df):
    for row in df.itertuples(index=False):
        yield row._asdict()
        

relevant_index = ['humidite', 
               'pluie', 'pluie_intensite_max', 
               'force_moyenne_du_vecteur_vent', 'direction_du_vecteur_vent_moyen', 'direction_du_vecteur_de_vent_max',
               'force_rafale_max', 'direction_du_vecteur_de_rafale_de_vent_max', 'temperature', 'pression', 'heure_de_paris']

# To use in PostgreSQL
def export_clean_csv(df_station, station):
    df_clean = pd.DataFrame()
    df_clean[relevant_index] = df_station[relevant_index]
    df_clean['station'] = [station] * len(df_clean)
    df_clean.to_csv(f'psql_queries/{station}.csv', index=False)

    
def import_one_station(csv_file, station_name):
    
    df_station = pd.read_csv(csv_file, sep=";")
    df_station.replace('', np.nan, inplace=True)
    df_station.dropna(inplace=True)
    
    export_clean_csv(df_station, station)
    
    data = rx.from_iterable(gen_rows(df_station)).pipe(ops.map(lambda row: parse_row(row, station_name)))

    write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
    write_api.write(bucket=bucket, org=org, record=data)
    write_api.close()
    print(f"{station_name} done !")

#### Test one file

In [23]:
datafile = f"data/{station_list[1]}.csv"
regex_items = re.search(r'data/(\d\d)-station-meteo-(.*)\.csv', datafile)
station = f"{regex_items.group(1)}-{regex_items.group(2)}"
print(station)

df_station = pd.read_csv(datafile, sep=";")
df_station.replace('', np.nan, inplace=True)
df_station.dropna(inplace=True)
df_station['temperature']

# import_one_station(datafile, station)

12-toulouse-montaudran


0        12.0
1        11.5
2        11.4
3        10.6
4        15.8
         ... 
73938    14.3
73939    15.5
73940    21.8
73941    21.3
73942     5.3
Name: temperature, Length: 73943, dtype: float64

#### Import all files

In [24]:
import time

for station in station_list:
    print(station)
    start_time = time.time()
    datafile = f'data/{station}.csv'
    regex_items = re.search(r'data/(\d\d)-station-meteo-(.*)\.csv', datafile)
    station = f"{regex_items.group(1)}-{regex_items.group(2)}"

    import_one_station(datafile, station)
    print("Elapsed time : ", time.time() - start_time)

36-station-meteo-toulouse-purpan
Elapsed time :  1.7003614902496338
12-station-meteo-toulouse-montaudran
Elapsed time :  1.2425305843353271
53-station-meteo-toulouse-ponsan
Elapsed time :  1.7457196712493896
04-station-meteo-toulouse-ile-empalot
Elapsed time :  1.6238138675689697
15-station-meteo-l-union-ecole
Elapsed time :  2.297161102294922
01-station-meteo-toulouse-meteopole
Elapsed time :  1.8756422996520996
00-station-meteo-toulouse-valade
Elapsed time :  2.0257508754730225


### Query

In [57]:
from pprint import pprint

query_api = client.query_api()
query = """from(bucket: "meteo_toulouse")
  |> range(start: -60d, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "meteo")
  |> filter(fn: (r) => r["station"] == "12-montaudran")
  |> filter(fn: (r) => r["_field"] == "humidite")"""

print("Querying...")
result = query_api.query(org=org, query=query)
print("Response received !")
results = []
for table in result:
    for record in table.records:
        results.append((record.get_time().strftime("%d/%m/%Y, %H:%M:%S"), record.get_field(), record.get_value()))

print(f"Entries : {len(results)}")
pprint(results[:10])

Querying...
Response received !
Entries : 5518
[('17/08/2021, 07:45:00', 'humidite', 74.0),
 ('17/08/2021, 08:00:00', 'humidite', 73.0),
 ('17/08/2021, 08:15:00', 'humidite', 71.0),
 ('17/08/2021, 08:30:00', 'humidite', 69.0),
 ('17/08/2021, 08:45:00', 'humidite', 66.0),
 ('17/08/2021, 09:00:00', 'humidite', 64.0),
 ('17/08/2021, 09:15:00', 'humidite', 62.0),
 ('17/08/2021, 09:30:00', 'humidite', 62.0),
 ('17/08/2021, 09:45:00', 'humidite', 62.0),
 ('17/08/2021, 10:00:00', 'humidite', 62.0)]


### Delete fulll DB

In [20]:
def delete_db():
    delete_api = client.delete_api()
    start = "1970-01-01T00:00:00Z"
    stop = "2022-02-01T00:00:00Z"
    delete_api.delete(start, stop, '_measurement="meteo"', bucket=bucket, org=org)
    print("Done !")
    
delete_db()

Done !


## Downloader

In [7]:
import urllib.request

from tqdm.autonotebook import tqdm


class DownloadProgressBar(tqdm):
    def update_to(self, b=1, bsize=1, tsize=None):
        if tsize is not None:
            self.total = tsize
        self.update(b * bsize - self.n)


def download_url(url, output_path):
    with DownloadProgressBar(unit='B', unit_scale=True,
                             miniters=1, desc=output_path) as t:
        urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to)
        



for filename in station_list:
    url = f'https://data.toulouse-metropole.fr/explore/dataset/{filename}/download/?format=csv&timezone=Europe/Berlin&lang=fr&use_labels_for_header=true&csv_separator=%3B'
    download_url(url, f"data/{filename}.csv")

HBox(children=(HTML(value='data/36-station-meteo-toulouse-purpan.csv'), FloatProgress(value=1.0, bar_style='in…




HBox(children=(HTML(value='data/12-station-meteo-toulouse-montaudran.csv'), FloatProgress(value=1.0, bar_style…




HBox(children=(HTML(value='data/53-station-meteo-toulouse-ponsan.csv'), FloatProgress(value=1.0, bar_style='in…




HBox(children=(HTML(value='data/04-station-meteo-toulouse-ile-empalot.csv'), FloatProgress(value=1.0, bar_styl…




HBox(children=(HTML(value='data/15-station-meteo-l-union-ecole.csv'), FloatProgress(value=1.0, bar_style='info…




HBox(children=(HTML(value='data/01-station-meteo-toulouse-meteopole.csv'), FloatProgress(value=1.0, bar_style=…




HBox(children=(HTML(value='data/00-station-meteo-toulouse-valade.csv'), FloatProgress(value=1.0, bar_style='in…


