In [1]:
import requests
import csv 
from datetime import datetime


In [11]:
def get_access_token(client_id, client_secret, refresh_token):
    # URL for token endpoint
    token_url = "https://api.netatmo.com/oauth2/token"

    # Parameters for token request
    params = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
        "client_id": client_id,
        "client_secret": client_secret
    }

    # Send POST request to get access token
    response = requests.post(token_url, data=params)

    # Check if request was successful
    if response.status_code == 200:
        # Parse JSON response
        token_data = response.json()
        # Access token
        access_token = token_data["access_token"]
        return access_token
    else:
        print("Error:", response.status_code, response.text)
        return None

# Example usage:
client_id = "6639b5281c1e11c3320623e9"
client_secret = "y4UauIiAIWvNiPAeXoWZHgRx9f"
refresh_token = "6639b0eb66bcd29b9809e8f4|e7a7a25cd535430315068879948c04f0"

access_token = get_access_token(client_id, client_secret, refresh_token)


In [14]:
def get_ids(access_token, lat_ne, lon_ne, lat_sw, lon_sw):
    params = {
        'access_token': access_token,
        'lat_ne' : lat_ne,
        'lon_ne' : lon_ne,
        'lat_sw' : lat_sw,
        'lon_sw' : lon_sw,
    }

    ids = {}
    NoResponse = True
    retry_count = 0
    while NoResponse:
        #try to get stations in given region, 5 attempts before moving on to next area
        try:
            #try to get stations
            response = requests.post("https://api.netatmo.com/api/getpublicdata", params=params)
            response.raise_for_status()
            data = response.json()["body"]
            for station in data:
            #find each value for each station
                _id = station['_id']
                mod = [n for n in station['modules'] if n.startswith('02:')]
                location = station['place']['location']
                altitude = station['place']['altitude']
                if 'city' in station['place'].keys():
                    city = station['place']['city']
                else:
                    city = 'no city'
                ids[_id] = ({'module_name':mod, 'location':location, 'altitude':altitude,
                             'city':city, 'full_modules':station['modules']})

            #Checking that some data has been returned
            if len(ids) == 0:
            #if everything works but we have no data returned in the given box, raise
                raise NameError('length')                
        except requests.exceptions.HTTPError as error:
            #if there's an error, try four more times before moving on
            print(error.response.status_code, error.response.text)
            if retry_count < 5:
                retry_count += 1
            else:
                return({})
        except NameError:
            if retry_count < 5:
                retry_count += 1
            else:
                return({})
        else:
            NoResponse = False
            return(ids)
        
lat_ne = -36.580214
lon_ne = 175.189765
lat_sw = -37.119948
lon_sw = 174.385622
        
ids = get_ids(access_token, lat_ne, lon_ne, lat_sw, lon_sw)

In [15]:
def save_netatmo_data_to_csv(ids, csv_file):
    with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(['MAC_address', 'module_name', 'latitude', 'longitude', 'altitude', 'city', 'full_modules'])
        for mac_address, value in ids.items():
            latitude, longitude = value['location']
            writer.writerow([mac_address, value['module_name'], latitude, longitude, value['altitude'], value['city'], value['full_modules']])
    return csv_file

csv_file = "netatmo_datatest.csv"
save_netatmo_data_to_csv(ids, csv_file)

Data saved to netatmo_datatest.csv


'netatmo_datatest.csv'

In [16]:
def get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit=1024):
    url = "https://api.netatmo.net/api/getmeasure"
    
    params = {
        'access_token': access_token,
        'device_id': device_id,
        'module_id': module_id,
        'scale': scale,
        'type': types,
        'date_begin': date_begin,
        'date_end': date_end,
        'limit': limit
    }

    response = requests.get(url, params=params)

    if response.status_code == 200:
        return response.json()
    else:
        return None

In [25]:
def load_device_and_module_id_from_csv(csv_file):
    with open(csv_file, mode='r', newline='', encoding='utf-8') as file:
        reader = csv.reader(file)
        next(reader)  # Skip the header row
        row = next(reader, None)  # Get the first row of data
        if row:
            device_id = row[0]
            module_id = row[1].strip("[]'")  # Remove square brackets and single quotes
            return device_id, module_id
        else:
            print("Error: CSV file is empty.")
            return None, None


# Example usage:
device_id, module_id = load_device_and_module_id_from_csv(csv_file)

if device_id and module_id:
    scale = "1hour"  # Change according to your requirement
    types = "Temperature,Humidity,Pressure"  # Add or remove types as needed
    date_begin = 946684800  # Start timestamp
    date_end = 1609459199  # End timestamp, set to "last" to retrieve only the last measurement
    limit = 1024  # Default limit

    measurements = get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
else:
    print("Error: Unable to load device_id and module_id from CSV.")

In [29]:
def save_measurements_to_csv(measurements, filename):
    with open(filename, 'w', newline='') as csvfile:
        fieldnames = ['acquisition_time', 'temperature', 'humidity', 'pressure']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        acquisition_time = measurements['body'][0]['beg_time']
        step_time = measurements['body'][0]['step_time']
        for entry in measurements['body']:
            for value in entry['value']:
                row = {
                    'acquisition_time': datetime.utcfromtimestamp(acquisition_time).strftime('%Y%m%d%H%M'),
                    'temperature': value[0],
                    'humidity': value[1],
                    'pressure': value[2]
                }
                writer.writerow(row)
                acquisition_time += step_time

# Example usage
save_measurements_to_csv(measurements, 'measurements000.csv')

In [None]:
#--------------------------------------FUNCTION--------------------------------------#

In [42]:
# Your credentials
client_id = "6639b5281c1e11c3320623e9"
client_secret = "y4UauIiAIWvNiPAeXoWZHgRx9f"
refresh_token = "6639b0eb66bcd29b9809e8f4|e7a7a25cd535430315068879948c04f0"

# URL for token endpoint
token_url = "https://api.netatmo.com/oauth2/token"

# Parameters for token request
params = {
    "grant_type": "refresh_token",
    "refresh_token": refresh_token,
    "client_id": client_id,
    "client_secret": client_secret
}

# Send POST request to get access token
response = requests.post(token_url, data=params)

# Check if request was successful
if response.status_code == 200:
    # Parse JSON response
    token_data = response.json()
    # Access token
    access_token = token_data["access_token"]
    print("Access Token:", access_token)
    # Refresh token
    refresh_token = token_data["refresh_token"]
    print("Refresh Token:", refresh_token)
else:
    print("Error:", response.status_code, response.text)

Access Token: 6639b0eb66bcd29b9809e8f4|85bdc9f5cebd48eee2789352ccaa8708
Refresh Token: 6639b0eb66bcd29b9809e8f4|e7a7a25cd535430315068879948c04f0


In [43]:
#get access token

# Your credentials
client_id = "6639b5281c1e11c3320623e9"
client_secret = "y4UauIiAIWvNiPAeXoWZHgRx9f"
refresh_token = "6639b0eb66bcd29b9809e8f4|e7a7a25cd535430315068879948c04f0"

# URL for token endpoint
token_url = "https://api.netatmo.com/oauth2/token"

# Parameters for token request
params = {
    "grant_type": "refresh_token",
    "refresh_token": refresh_token,
    "client_id": client_id,
    "client_secret": client_secret
}

# Send POST request to get access token
response = requests.post(token_url, data=params)

# Check if request was successful
if response.status_code == 200:
    # Parse JSON response
    token_data = response.json()
    # Access token
    access_token = token_data["access_token"]
    # Refresh token
    refresh_token = token_data["refresh_token"]
else:
    print("Error:", response.status_code, response.text)

In [44]:
def get_ids():
    params = {
        'access_token': access_token,
        'lat_ne' : -36.580214,
        'lon_ne' : 175.189765,
        'lat_sw' : -37.119948,
        'lon_sw' : 174.385622,
    }

    ids = {}
    NoResponse = True
    retry_count = 0
    while NoResponse:
        #try to get stations in given region, 5 attempts before moving on to next area
        try:
            #try to get stations
            response = requests.post("https://api.netatmo.com/api/getpublicdata", params=params)
            response.raise_for_status()
            data = response.json()["body"]
            for station in data:
            #find each value for each station
                _id = station['_id']
                mod = [n for n in station['modules'] if n.startswith('02:')]
                location = station['place']['location']
                altitude = station['place']['altitude']
                if 'city' in station['place'].keys():
                    city = station['place']['city']
                else:
                    city = 'no city'
                ids[_id] = ({'module_name':mod, 'location':location, 'altitude':altitude,
                             'city':city, 'full_modules':station['modules']})

            #Checking that some data has been returned
            if len(ids) == 0:
            #if everything works but we have no data returned in the given box, raise
                raise NameError('length')                
        except requests.exceptions.HTTPError as error:
            #if there's an error, try four more times before moving on
            print(error.response.status_code, error.response.text)
            if retry_count < 5:
                retry_count += 1
            else:
                return({})
        except NameError:
            if retry_count < 5:
                retry_count += 1
            else:
                return({})
        else:
            NoResponse = False
            return(ids)
        
testi =get_ids()

In [45]:
csv_file = "netatmo_data29.csv"
with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerow(['MAC_address', 'module_name', 'latitude', 'longitude', 'altitude', 'city', 'full_modules'])
    for mac_address, value in testi.items():
        latitude, longitude = value['location']
        writer.writerow([mac_address, value['module_name'], latitude, longitude, value['altitude'], value['city'], value['full_modules']])

print("Data saved to netatmo_data29.csv")

Data saved to netatmo_data29.csv


In [31]:
import time

def get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit=1024):
    all_measurements = []

    while date_begin < date_end:
        measurements = get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
        if measurements is None or "body" not in measurements or not measurements["body"]:
            # No more data available or an error occurred, stop the loop
            break

        # Extract the last timestamp from the received data
        last_timestamp = measurements["body"][-1]["beg_time"]
        
        # Append the measurements to the list
        all_measurements.extend(measurements["body"])

        # Update date_begin to fetch the next batch of data
        date_begin = last_timestamp + measurements["body"][-1]["step_time"]

        # Introduce a delay to avoid hitting rate limits
        time.sleep(1)

    return all_measurements

# Example usage:
device_id = "70:ee:50:29:0d:30"
module_id = "02:00:00:29:24:3c"
scale = "1hour"  # Change according to your requirement
types = "Temperature,Humidity,Pressure"  # Add or remove types as needed
date_begin = 946684800  # Start timestamp
date_end = 1609459199  # End timestamp, set to "last" to retrieve only the last measurement
limit = 1024  # Default limit

measurementsZZZ = get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
print("Total measurements:", len(measurementsZZZ))


Total measurements: 693


In [60]:
import time

def get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit=1024):
    all_measurements = []

    while date_begin < date_end:
        measurements = get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
        if measurements is None or "body" not in measurements or not measurements["body"]:
            # No more data available or an error occurred, stop the loop
            break

        # Extract the last timestamp from the received data
        last_timestamp = measurements["body"][-1]["beg_time"]
        
        # Append the measurements to the list
        all_measurements.extend(measurements["body"])

        # Update date_begin to fetch the next batch of data
        date_begin = last_timestamp + measurements["body"][-1]["step_time"]

        # Introduce a delay to avoid hitting rate limits
        time.sleep(1)

    return all_measurements

# Example usage:
device_id = "70:ee:50:29:0d:30"
module_id = "02:00:00:29:24:3c"
scale = "1day"  # Change according to your requirement
types = "Temperature,Humidity,Pressure"  # Add or remove types as needed
date_begin = 946684800  # Start timestamp
date_end = 1609459199  # End timestamp, set to "last" to retrieve only the last measurement
limit = 1024  # Default limit

measurementsZZZ = get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
print("Total measurements:", len(measurementsZZZ))


Total measurements: 0


In [59]:
import time
import csv
from datetime import datetime

def get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit=1024):
    all_measurements = []

    while date_begin < date_end:
        measurements = get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
        if measurements is None or "body" not in measurements or not measurements["body"]:
            # No more data available or an error occurred, stop the loop
            break

        # Extract the last timestamp from the received data
        last_timestamp = measurements["body"][-1]["beg_time"]
        
        # Append the measurements to the list
        all_measurements.extend(measurements["body"])

        # Update date_begin to fetch the next batch of data
        date_begin = last_timestamp + measurements["body"][-1]["step_time"]

        # Save measurements to CSV file
        save_measurements_to_csv(measurements, 'measurements.csv')

        # Introduce a delay to avoid hitting rate limits
        time.sleep(1)

    return all_measurements

def save_measurements_to_csv(measurements, filename):
    with open(filename, 'a', newline='') as csvfile:
        fieldnames = ['acquisition_time', 'temperature', 'humidity', 'pressure']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        # Skip writing header if file is not empty
        if csvfile.tell() == 0:
            writer.writeheader()

        acquisition_time = measurements['body'][0]['beg_time']
        step_time = measurements['body'][0]['step_time']
        for entry in measurements['body']:
            for value in entry['value']:
                row = {
                    'acquisition_time': datetime.utcfromtimestamp(acquisition_time).strftime('%Y%m%d%H%M'),
                    'temperature': value[0],
                    'humidity': value[1],
                    'pressure': value[2]
                }
                writer.writerow(row)
                acquisition_time += step_time

# Example usage:
device_id = "70:ee:50:29:0d:30"
module_id = "02:00:00:29:24:3c"
scale = "1day"  # Change according to your requirement
types = "Temperature,Humidity,Pressure"  # Add or remove types as needed
date_begin = 946684800  # Start timestamp
date_end = 1609459199  # End timestamp, set to "last" to retrieve only the last measurement
limit = 1024  # Default limit

measurementss = get_historical_measurements_batch(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)
print("Total measurements:", len(measurementss))

Total measurements: 0


In [61]:
def save_measurements_to_csv(measurements, filename):
    with open(filename, 'w', newline='') as csvfile:
        fieldnames = ['acquisition_time', 'temperature', 'humidity', 'pressure']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        acquisition_time = measurements['body'][0]['beg_time']
        step_time = measurements['body'][0]['step_time']
        for entry in measurements['body']:
            for value in entry['value']:
                row = {
                    'acquisition_time': datetime.utcfromtimestamp(acquisition_time).strftime('%Y%m%d%H%M'),
                    'temperature': value[0],
                    'humidity': value[1],
                    'pressure': value[2]
                }
                writer.writerow(row)
                acquisition_time += step_time

# Example usage
save_measurements_to_csv(measurementsZZZ, 'measurementszzzzz.csv')


TypeError: list indices must be integers or slices, not str

In [30]:
def get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit=1024):
    url = "https://api.netatmo.net/api/getmeasure"
    
    params = {
        'access_token': access_token,
        'device_id': device_id,
        'module_id': module_id,
        'scale': scale,
        'type': types,
        'date_begin': date_begin,
        'date_end': date_end,
        'limit': limit
    }

    response = requests.get(url, params=params)

    if response.status_code == 200:
        return response.json()
    else:
        return None

# Example usage:

device_id = "70:ee:50:29:0d:30"
module_id= "02:00:00:29:24:3c"
scale = "1hour"  # Change according to your requirement
types = "Temperature,Humidity,Pressure"  # Add or remove types as needed
date_begin = 946684800  # Start timestamp
date_end = 1609459199  # End timestamp, set to "last" to retrieve only the last measurement
limit = 1024  # Default limit

measurements = get_historical_measurements(access_token, device_id, module_id, scale, types, date_begin, date_end, limit)

KeyboardInterrupt: 

In [7]:
def save_measurements_to_csv(measurements, filename):
    with open(filename, 'w', newline='') as csvfile:
        fieldnames = ['acquisition_time', 'temperature', 'humidity', 'pressure']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        acquisition_time = measurements['body'][0]['beg_time']
        step_time = measurements['body'][0]['step_time']
        for entry in measurements['body']:
            for value in entry['value']:
                row = {
                    'acquisition_time': datetime.utcfromtimestamp(acquisition_time).strftime('%Y%m%d%H%M'),
                    'temperature': value[0],
                    'humidity': value[1],
                    'pressure': value[2]
                }
                writer.writerow(row)
                acquisition_time += step_time

# Example usage
save_measurements_to_csv(measurements, 'measurements.csv')
