# Bot twitter

Este bot genera reportes diarios de la calidad de aire en las últimas 24 horas usando estaciones Davis con WeatherLink y se publican en twitter.

In [3]:
# Instalar librerías

!pip install requests
!pip install folium
!pip install tweepy
!pip install selenium


Defaulting to user installation because normal site-packages is not writeable
Collecting APIMakeSens
  Downloading APIMakeSens-1.4.12-py3-none-any.whl (19 kB)
Collecting datetime
  Downloading DateTime-5.5-py3-none-any.whl (52 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.6/52.6 KB[0m [31m103.0 kB/s[0m eta [36m0:00:00[0m1m106.3 kB/s[0m eta [36m0:00:01[0m
[?25hCollecting zope.interface
  Downloading zope.interface-7.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (254 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m254.5/254.5 KB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m[31m1.9 MB/s[0m eta [36m0:00:01[0m
Installing collected packages: zope.interface, datetime, APIMakeSens
Successfully installed APIMakeSens-1.4.12 datetime-5.5 zope.interface-7.2
Defaulting to user installation because normal site-packages is not writeable
Collecting folium
  Downloading fo

### Cómo ejecutar este código automáticamente a una hora específica:

1. Descarga el notebook como un archivo .py (Asegurarse que esté en el mismo directorio que el archivo "plantilla.png".


2. Ejecuta en la terminal sudo nano /etc/crontab.


3. Añade una nueva línea al final del archivo con el siguiente formato:

    \<minuto> \<hora> * * * \<usuario> python3 /ruta/del/script.py

    \<minuto> representa el minuto de la hora en la que deseas ejecutar el comando, \<hora> representa la hora del día en formato 24 horas, y * * * significa que el comando se ejecutará cualquier día del mes y cualquier día de la semana. \<usuario> es el usuario bajo el cual se ejecutará el comando. Asegúrate de reemplazar \<usuario> con el nombre de usuario correcto.
    

4. Guarda y cierra el archivo /etc/crontab.

    Después de guardar los cambios en /etc/crontab, la tarea programada se ejecutará automáticamente según la hora especificada. Recuerda asegurarte de proporcionar la ruta correcta hacia el intérprete de Python (para saberla ejecutar en una terminal el comando "which python3") y la ruta correcta hacia tu script (/ruta/del/script.py). Además, ten en cuenta que el usuario especificado debe tener los permisos adecuados para ejecutar el script y acceder a cualquier archivo o recurso requerido por el script.

    
Finalmente es importante cambiar la ruta de la variable path dentro del código para que desde cron pueda acceder a ellas correctamente.

In [4]:
# Librerias
import requests
import hmac
import hashlib
import time
import numpy as np
import datetime
import pytz
import folium
from folium.plugins import BeautifyIcon
import os
import pandas as pd
import json

In [None]:
class WeatherLinkAPI:
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = "https://api.weatherlink.com/v2"
    
    def _generate_signature(self, parameters):
        sorted_params = sorted(parameters.items())
        param_string = '&'.join([f"{k}={v}" for k, v in sorted_params])
        
        signature = hmac.new(
            self.api_secret.encode('utf-8'),
            param_string.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return signature
    
    def get_stations(self):
        timestamp = str(int(time.time()))
        
        parameters = {
            'api-key': self.api_key,
            't': timestamp
        }
        
        signature = self._generate_signature(parameters)
        parameters['api-signature'] = signature
        
        response = requests.get(f"{self.base_url}/stations", params=parameters)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error al obtener estaciones: {response.status_code}")
            return None
    
    def get_historic_data(self, station_id, start_timestamp, end_timestamp):
        timestamp = str(int(time.time()))
        
        parameters = {
            'api-key': self.api_key,
            'station-id': str(station_id),
            'start-timestamp': str(start_timestamp),
            'end-timestamp': str(end_timestamp),
            't': timestamp
        }
        
        signature = self._generate_signature(parameters)
        parameters['api-signature'] = signature
        
        response = requests.get(f"{self.base_url}/historic", params=parameters)
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error al obtener datos históricos para estación {station_id}: {response.status_code}")
            return None

In [None]:
WEATHERLINK_API_KEY = "tu_api_key_aqui"
WEATHERLINK_API_SECRET = "tu_api_secret_aqui"

In [None]:
# Inicializar API
weather_api = WeatherLinkAPI(WEATHERLINK_API_KEY, WEATHERLINK_API_SECRET)

In [5]:
fechaFin = datetime.datetime.now(pytz.timezone('America/Bogota')).strftime('%Y-%m-%d %H:%M:%S')
past_date = datetime.datetime.now(pytz.timezone('America/Bogota')) - datetime.timedelta(hours=24)
fechaInicio = past_date.strftime('%Y-%m-%d %H:%M:%S')

end_timestamp = int(datetime.datetime.now(pytz.timezone('America/Bogota')).timestamp())
start_timestamp = int((datetime.datetime.now(pytz.timezone('America/Bogota')) - datetime.timedelta(hours=24)).timestamp())

print(f"Período: {fechaInicio} - {fechaFin}")

2025-06-27 03:13:45 2025-06-26 03:13:45


In [3]:
# Datos de Estaciones
Estaciones = [
    (123456, 'Estación Davis 1', 7.1393716, -73.1210968),
    (123457, 'Estación Davis 2', 7.1381572, -73.1180935),
    (123458, 'Estación Davis 3', 7.1385332, -73.121327),
    (123459, 'Estación Davis 4', 7.1277056, -73.116534),
    (123460, 'Estación Davis 5', 7.108717813305484, -73.12173474310134),
    (123461, 'Estación Davis 6', 7.06256, -73.0912),
    (123462, 'Estación Davis 7', 7.136457064606619, -73.12795546334126),
    (123463, 'Estación Davis 8', 7.093382047019532, -73.14229310567076),
    (123464, 'Estación Davis 9', 7.163165408989283, -73.14033300778485),
    (123464, 'Estación Davis 10', 7.163165408989283, -73.14033300778485)
]

In [None]:
def extract_pm_data_from_weatherlink(data):
    pm_data = {
        'pm1': [],
        'pm25': [],
        'pm10': []
    }
    
    if not data or 'sensors' not in data:
        return pm_data
    
    for sensor in data['sensors']:
        if 'data' in sensor:
            for record in sensor['data']:
                
                if 'pm_1' in record:
                    pm_data['pm1'].append(record['pm_1'])
                elif 'pm1' in record:
                    pm_data['pm1'].append(record['pm1'])
                elif 'particulate_matter_1' in record:
                    pm_data['pm1'].append(record['particulate_matter_1'])
                
               
                if 'pm_2p5' in record:
                    pm_data['pm25'].append(record['pm_2p5'])
                elif 'pm25' in record:
                    pm_data['pm25'].append(record['pm25'])
                elif 'particulate_matter_2p5' in record:
                    pm_data['pm25'].append(record['particulate_matter_2p5'])
                
                
                if 'pm_10' in record:
                    pm_data['pm10'].append(record['pm_10'])
                elif 'pm10' in record:
                    pm_data['pm10'].append(record['pm10'])
                elif 'particulate_matter_10' in record:
                    pm_data['pm10'].append(record['particulate_matter_10'])
    
    return pm_data

In [None]:
def get_pm_data_weatherlink(Estaciones):
    pm_data = {
        'pm1': [],
        'pm25': [],
        'pm10': []
    }
    Ntotal = int(24 * 30)  # Número esperado de mediciones en 24 horas
    
    for est in Estaciones:
        station_id = est[0]
        station_name = est[1]
        
        print(f"Obteniendo datos de {station_name} (ID: {station_id})")
        
        data = weather_api.get_historic_data(station_id, start_timestamp, end_timestamp)
        
        if data:
            pm_values = extract_pm_data_from_weatherlink(data)
            
            for pm_type in ['pm1', 'pm25', 'pm10']:
                if pm_values[pm_type]:
                    max_limits = {'pm1': 80, 'pm25': 120, 'pm10': 200}
                    max_val = max_limits[pm_type]
                    
                    pm_filtered = [val for val in pm_values[pm_type] if 0 <= val <= max_val]
                    
                    Ndata = len(pm_filtered)
                    print(f"{station_name} - {pm_type.upper()}: {Ndata} mediciones válidas")
                    
                    if Ndata > 0.5 * Ntotal:
                        pm_mean = np.mean(pm_filtered)
                        pm_data[pm_type].append(pm_mean)
                    else:
                        print(f"{station_name} - {pm_type.upper()}: Datos insuficientes ({Ndata}/{Ntotal})")
                        pm_data[pm_type].append(0.0)
                else:
                    print(f"{station_name}: No se encontraron datos de {pm_type.upper()}")
                    pm_data[pm_type].append(0.0)
        else:
            print(f"{station_name}: Error al obtener datos")
            for pm_type in ['pm1', 'pm25', 'pm10']:
                pm_data[pm_type].append(0.0)
    
    return pm_data

In [None]:
pm_data = get_pm_data_weatherlink(Estaciones)
print(f"Valores PM1: {pm_data['pm1']}")
print(f"Valores PM2.5: {pm_data['pm25']}")
print(f"Valores PM10: {pm_data['pm10']}")

In [None]:
pm25 = pm_data['pm25']

### Mapa con valores de material particulado

In [7]:
latitudes = np.array(Estaciones)[:,2].astype('float64')
latitud = (np.max(latitudes)+np.min(latitudes))/2
longitudes = np.array(Estaciones)[:,3].astype('float64')
longitud = (np.max(longitudes)+np.min(longitudes))/2

z = int(13-np.log2(10*(np.max(latitudes)-np.min(latitudes))))

shift1 = 0.001
latitudes[0] += 4*shift1
longitudes[1] += shift1
longitudes[2] -= 2*shift1
latitudes[-2] -=2*shift1

shift2 = 0.005
ubicaciones = []
for i in range(len(latitudes)):
    ubicaciones.append([latitudes[i],longitudes[i]])

ubicaciones[0][0] += shift2
ubicaciones[1][1] += shift2
ubicaciones[2][0] += 0.65*shift2
ubicaciones[2][1] -= 0.9*shift2
ubicaciones[3][0] -= shift2
ubicaciones[4][0] -= shift2
ubicaciones[5][0] += shift2
ubicaciones[6][0] -= shift2
ubicaciones[7][0] += shift2
ubicaciones[8][0] -= shift2


bucaramanga_map = folium.Map(location=[latitud, longitud], 
                              zoom_start=z, tiles='CartoDB Positron',
                              width=677, height=660, zoom_control=False)

def get_color_pm1(number):
    """Determina el color según el nivel de PM1 (basado en estándares WHO)"""
    if number == 0:
        return "#808080"  # Gris
    elif 0 < number <= 5.0:
        return '#00FF2E'  # Verde
    elif 5 < number <= 15.0:
        return '#FFE000'  # Amarillo
    elif 15 < number <= 25.0:
        return '#FF6100'  # Naranja
    elif 25 < number:
        return '#FF0000'  # Rojo

def get_color_pm25(number):
    """Determina el color según el nivel de PM2.5"""
    if number == 0:
        return "#808080"  # Gris
    elif 0 < number <= 12.0:
        return '#00FF2E'  # Verde
    elif 12 < number <= 37.0:
        return '#FFE000'  # Amarillo
    elif 37 < number <= 55:
        return '#FF6100'  # Naranja
    elif 55 < number:
        return '#FF0000'  # Rojo

def get_color_pm10(number):
    """Determina el color según el nivel de PM10"""
    if number == 0:
        return "#808080"  # Gris
    elif 0 < number <= 25.0:
        return '#00FF2E'  # Verde
    elif 25 < number <= 50.0:
        return '#FFE000'  # Amarillo
    elif 50 < number <= 90.0:
        return '#FF6100'  # Naranja
    elif 90 < number:
        return '#FF0000'  # Rojo

def get_color(number):
    """Determina el color según el nivel de PM2.5 (mantener compatibilidad)"""
    return get_color_pm25(number)


for i in range(len(pm25)):
    if pm25[i] >= 0.0:
        if pm25[i] == 0.0:
            valor = 'X'
        else:
            valor = np.round(pm25[i], 1)
        
        icon_number = BeautifyIcon(
            border_color=get_color(pm25[i]),
            text_color='#000000',
            number=valor,
            background_color='transparent',
            icon_size=(40, 40),
            inner_icon_style='line-height:25px; font-size: 12px;',
            spin=True,
            border_width=2,
        )

        folium.Marker(
            [latitudes[i], longitudes[i]],
            popup=Estaciones[i][1],
            icon=icon_number
        ).add_to(bucaramanga_map)
        
        location_marker = BeautifyIcon(
            icon_shape='square',
            text_color='#ffffff',
            number=i+1,
            border_color='#2f5786',
            background_color='#2f5786',
            icon_size=(20, 20),
            inner_icon_style='line-height:10px; font-size: 12px;',
            spin=True,
            border_width=2,
        )

        folium.Marker(
            ubicaciones[i],
            popup=Estaciones[i][1],
            icon=location_marker
        ).add_to(bucaramanga_map)

# Ajustar límites del mapa
ne = [7.164165408989283, -73.14033300778485]
sw = [7.0530, -73.0979968]
bucaramanga_map.fit_bounds([sw, ne])

In [None]:
# Mostrar el mapa
bucaramanga_map

Para guardar el mapa como imagen debe tenerse la plantilla en el mismo directorio que este código

In [9]:
import io
from PIL import Image

path = './'

def create_combined_image():
    """Crear imagen combinada con los tres tipos de PM"""
    
    pm25_img_data = pm25_map._to_png(5)
    pm25_img = Image.open(io.BytesIO(pm25_img_data))
    
    pm1_img_data = pm1_map._to_png(5)
    pm1_img = Image.open(io.BytesIO(pm1_img_data))
    
    pm10_img_data = pm10_map._to_png(5)
    pm10_img = Image.open(io.BytesIO(pm10_img_data))
    
    try:
        plantilla = Image.open(path + 'plantilla.png')
        map_width = plantilla.width // 3
        map_height = int(pm25_img.height * (map_width / pm25_img.width))
        
        pm25_resized = pm25_img.resize((map_width, map_height))
        pm1_resized = pm1_img.resize((map_width, map_height))
        pm10_resized = pm10_img.resize((map_width, map_height))
        
        combined_height = plantilla.height + map_height + 100  
        combined_img = Image.new('RGB', (plantilla.width, combined_height), 'white')
        
        combined_img.paste(plantilla, (0, 0))


        y_pos = plantilla.height + 50
        x_positions = [0, map_width, map_width * 2]
        
        
        combined_img.paste(pm1_resized, (x_positions[0], y_pos))
        combined_img.paste(pm25_resized, (x_positions[1], y_pos))
        combined_img.paste(pm10_resized, (x_positions[2], y_pos))
        
        
        draw = ImageDraw.Draw(combined_img)
        try:
            font = ImageFont.truetype("arial.ttf", 24)
        except:
            font = ImageFont.load_default()
        
        titles = ["PM1 (μg/m³)", "PM2.5 (μg/m³)", "PM10 (μg/m³)"]
        for i, title in enumerate(titles):
            text_width = draw.textlength(title, font=font)
            x_center = x_positions[i] + (map_width - text_width) // 2
            draw.text((x_center, y_pos - 40), title, fill='black', font=font)
        
        combined_img.save(path + 'mapa_full_combined.png')
        return combined_img
        
    except FileNotFoundError:
        print("Plantilla no encontrada, creando imagen simple")
       
        map_width = 677
        map_height = 660
        
        combined_width = map_width * 3
        combined_height = map_height + 100
        
        combined_img = Image.new('RGB', (combined_width, combined_height), 'white')
        
        
        combined_img.paste(pm1_img, (0, 50))
        combined_img.paste(pm25_img, (map_width, 50))
        combined_img.paste(pm10_img, (map_width * 2, 50))
        
        
        draw = ImageDraw.Draw(combined_img)
        font = ImageFont.load_default()
    

        
        titles = ["PM1 (μg/m³)", "PM2.5 (μg/m³)", "PM10 (μg/m³)"]
        for i, title in enumerate(titles):
            draw.text((i * map_width + 20, 10), title, fill='black', font=font)
        
        combined_img.save(path + 'mapa_full_combined.png')
        return combined_img

combined_image = create_combined_image()

pm25_img_data = pm25_map._to_png(5)
pm25_img = Image.open(io.BytesIO(pm25_img_data))

try:
    plantilla = Image.open(path + 'plantilla.png')
    plantilla.paste(pm25_img, (0, 80))
    plantilla.save(path + 'mapa_full.png')
except FileNotFoundError:
    pm25_img.save(path + 'mapa_full.png')


In [None]:
combined_image = create_combined_image()

pm25_img_data = pm25_map._to_png(5)
pm25_img = Image.open(io.BytesIO(pm25_img_data))

try:
    plantilla = Image.open(path + 'plantilla.png')
    plantilla.paste(pm25_img, (0, 80))
    plantilla.save(path + 'mapa_full.png')
except FileNotFoundError:
    pm25_img.save(path + 'mapa_full.png')