This scripts retrieve sensors data from the http://archive.luftdaten.info/ 
It uses as input a list of .csv sensors files url and save the resulting data as .csv

**this sript does not work on windows pc because of asyncio**, in this case, you can upload this notebook on google colab.
The asyncio library allows to run in this case 20 threads in parallel, making it much faster to scrap the data.

In [1]:
import datetime
import requests
import lxml.etree
from lxml import html
import pandas as pd
import asyncio
from concurrent.futures import ThreadPoolExecutor
import re
import io
import sqlalchemy
from aiohttp import ClientSession,TCPConnector
import nest_asyncio

import config

# asyncio and jupyter cause trouble, this is a fix :
# https://markhneedham.com/blog/2019/05/10/jupyter-runtimeerror-this-event-loop-is-already-running/
nest_asyncio.apply()

In [2]:
## beautiful asyncio needs beautiful python 3.7
from platform import python_version

print(python_version())

3.7.1


In [3]:
engine = sqlalchemy.create_engine(f"sqlite:///{config.DB_PATH}")

### Load the list of lufdaten sensors (see *luftdaten_geo.ipynb* to get this list)

In [4]:
df_sensors = pd.read_sql('sensors_luftdaten', engine)
df_sensors['endpoint_url']= df_sensors.sensor_type_name+"_sensor_"+df_sensors.sensor_id.astype(str)+".csv"
df_sensors['endpoint_url']= df_sensors['endpoint_url'].str.lower()
df_sensors.sensor_type_name.unique()

array(['SDS011', 'PPD42NS', 'DHT22', 'BME280', 'HTU21D', 'BMP180',
       'SDS021', 'PMS7003', 'DS18B20', 'BMP280', 'PMS5003', 'PMS3003',
       'DHT11', 'DS18S20', 'SHT31', 'HPM', 'PMS1003'], dtype=object)

In [5]:
df_sensors.shape

(17959, 13)

In [6]:
def get_list_of_missing_sensors():
    df_data_in_db = pd.read_sql_query('SELECT DISTINCT sensor_id FROM sensors_luftdaten_data', engine)
    df_sensors_id_to_scrap = df_sensors[df_sensors.sensor_type_name.isin(['SDS011','BME280'])]
    df_sensors_id_to_scrap=df_sensors_id_to_scrap[df_sensors_id_to_scrap.location_country=='DE']
    df_sensors_id_to_scrap = df_sensors_id_to_scrap[~df_sensors_id_to_scrap.sensor_id.isin(df_data_in_db.sensor_id.tolist())]
    locations_to_scrap = df_sensors_id_to_scrap[df_sensors_id_to_scrap.sensor_type_name=='BME280'].location_id
    locations_to_scrap = locations_to_scrap.unique().tolist()
    df_sensors_id_to_scrap = df_sensors_id_to_scrap[df_sensors_id_to_scrap.location_id.isin(locations_to_scrap)]
    return df_sensors_id_to_scrap.sensor_id.tolist()

### we set the date range to scrap data (luftdaten data start from 2017)

In [7]:
def set_dt_range(start='1/1/2017'):
    dt_range = pd.date_range(start, end=datetime.datetime.now().strftime("%Y-%m-%d"))
    dt_range = dt_range.strftime("%Y-%m-%d")
    dt_range = dt_range.tolist()
    return dt_range

In [8]:
def clean_data(content):
   
    df_data = pd.read_csv(io.StringIO(content),sep=';')
  
    sensor_type = df_data.sensor_type.head(1).values
    
    # we tidy the data because we are cool people :
    if sensor_type  in ['DHT22', 'BME280', 'HTU21D', 'BMP180', 'BMP280']:
        df_data = pd.melt(df_data, id_vars=['sensor_id','timestamp'], value_vars=['temperature','humidity'])
    if sensor_type=="SDS011":
        df_data = pd.melt(df_data, id_vars=['sensor_id','timestamp'], value_vars=['P1','P2'])
        
    return df_data

def aggregate_data(df_data):
    try:
        # aggregate data per hour
        df_data['timestamp'] = pd.to_datetime(df_data['timestamp'])
        df_data['datehour']= df_data['timestamp'].apply(lambda dt: datetime.datetime(dt.year, dt.month, dt.day, dt.hour,0))
        df_data = df_data.groupby(['variable','datehour']).mean()

        return df_data
    except Exception as e:
        print(url)
        print(e)
        return None    
  
regex = re.compile("^.*?\.(zip|csv)$")


def get_list_of_files(dt):
    url = f'http://archive.luftdaten.info/{dt}'
    try:
        r = requests.get(url)
        tree = html.fromstring(r.content)
        files = tree.xpath('//a/@href')
        return list(filter(regex.search, files))
    except Exception as e :
        print(e)    

In [19]:
async def fetch(url, session):
    async with session.get(url) as response:
        content = await response.text()
        df_data = clean_data(content)
        
        return aggregate_data(df_data)

      

async def run(lista):
   
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    # we set the connector to 10 because we are well educated people who play by the rules :
    connector = TCPConnector(limit=10)
    async with ClientSession(connector=connector) as session:
        for url in lista:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task)

        responses = await asyncio.gather(*tasks,return_exceptions=True )
        # you now have all response bodies in this variable
        return (responses)
    
def scrap_all_data_one_day(dt,sensors_list):
    
    lista = get_list_of_files(dt)

    sensors_available= [l.split('_', 1)[-1] for l in lista]
    sensors_to_retrieve = list(set(sensors_available) & set(sensors_list))

    print(f'{dt}:{len(sensors_to_retrieve)} files to retrieve')

    all_url_of_date = [ROOT_URL.format(dt,dt,l) for l in sensors_to_retrieve]
    
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run(all_url_of_date))
    data = loop.run_until_complete(future)

    df = pd.DataFrame()
    for d in data:
        df = pd.concat([df,d],axis=0)

    return df.reset_index()

def scrap_all_data(date_range,sensors_list):
    df_master = pd.DataFrame()

    for i,dt in enumerate(date_range):
      try:
       
        df = scrap_all_data_one_day(dt,sensors_list)
        df_master=pd.concat([df_master,df],axis=0)

        if (i%100==0 and i>0) :
            print(f'saving to master_{str(i)}_{dt}.csv')
            df_master.to_csv(f'master_{str(i)}_{dt}.csv')
            df_master = pd.DataFrame()

      except Exception as e:
        print(f'error for {dt}:')
        print(e)

    df_master.to_csv(f'master_{str(i)}.csv')

In [10]:
def master_scrapper():
    ROOT_URL = "http://archive.luftdaten.info/{}/{}_{}"
    date_range = set_dt_range()
    sensors_to_scrap = get_list_of_missing_sensors()
    all_sensors_to_scrap = df_sensors[df_sensors.sensor_id.isin(sensors_to_scrap)].endpoint_url.tolist()
    scrap_all_data(date_range,all_sensors_to_scrap)

In [11]:
file_list = get_list_of_files('2019-01-06')

In [15]:
date_range = set_dt_range(start='1/1/2015')

In [16]:
all_files = pd.DataFrame()

for dt in date_range:
    files = pd.DataFrame(get_list_of_files(dt))
    
    
    

['2015-01-01',
 '2015-01-02',
 '2015-01-03',
 '2015-01-04',
 '2015-01-05',
 '2015-01-06',
 '2015-01-07',
 '2015-01-08',
 '2015-01-09',
 '2015-01-10',
 '2015-01-11',
 '2015-01-12',
 '2015-01-13',
 '2015-01-14',
 '2015-01-15',
 '2015-01-16',
 '2015-01-17',
 '2015-01-18',
 '2015-01-19',
 '2015-01-20',
 '2015-01-21',
 '2015-01-22',
 '2015-01-23',
 '2015-01-24',
 '2015-01-25',
 '2015-01-26',
 '2015-01-27',
 '2015-01-28',
 '2015-01-29',
 '2015-01-30',
 '2015-01-31',
 '2015-02-01',
 '2015-02-02',
 '2015-02-03',
 '2015-02-04',
 '2015-02-05',
 '2015-02-06',
 '2015-02-07',
 '2015-02-08',
 '2015-02-09',
 '2015-02-10',
 '2015-02-11',
 '2015-02-12',
 '2015-02-13',
 '2015-02-14',
 '2015-02-15',
 '2015-02-16',
 '2015-02-17',
 '2015-02-18',
 '2015-02-19',
 '2015-02-20',
 '2015-02-21',
 '2015-02-22',
 '2015-02-23',
 '2015-02-24',
 '2015-02-25',
 '2015-02-26',
 '2015-02-27',
 '2015-02-28',
 '2015-03-01',
 '2015-03-02',
 '2015-03-03',
 '2015-03-04',
 '2015-03-05',
 '2015-03-06',
 '2015-03-07',
 '2015-03-