# Лабораторная работа по теме “Парсинг данных”

## Импорт необходимых библиотек

In [48]:
import os

import requests
import json
import pandas as pd
import psycopg2

## Парсинг Open Weather API

Для получения данных о погоде используется сервис Open Weather API. Для того, чтобы начать им пользоваться, необходимо зарегистрироваться и получить APIKEY. В бесплатной подписке дается 60 запросов в минуту до 1 млн запросов в месяц. Доступны различные эндпоинты: например в бесплатной версии можно запрашивать информацию по текущей погоде, погоде за пять дней с шагом в 3 часа, данные о координатах интересующей вас локации, данные по загрязнению воздуха и погодные дашборды.
<br>
Для того, чтобы получить данные по погоде в интересующем нас месте (например город Казань) необходимо получить ее координаты и передать в запрос к API. Для этого можно воспользоваться Geocoding API <a href='https://openweathermap.org/api/geocoding-api'>ТЫК!</a>. Далее запрашиваем данные о погоде из Current Weather API <a href='https://openweathermap.org/current'>ТЫК2!</a>.

In [2]:
APIKEY = '<secret-api-key>'
location_part = 'Kazan,Russia'      # Указываем город,страну
limit = 5       # Количество получаемых подходящих вариантов (по дефолту 5)

In [75]:
url = 'https://api.openweathermap.org'
api_weather_template = 'data/2.5/weather?lat={lat}&lon={lon}&units=metric&lang=ru&appid={api_key}'      # Weather API
api_geocoding_template = 'geo/1.0/direct?q={location_part}&limit={limit}&appid={api_key}'       # Geocoding API

Отправим запрос к Geocoding API для получения широты и долготы города Казань

In [4]:
api_geocoding = api_geocoding_template.format(location_part=location_part, api_key=APIKEY, limit=limit)
geocoding_response = requests.get(f'{url}/{api_geocoding}')
geocoding_response.status_code

200

In [5]:
geo_data = json.loads(geocoding_response.text)

In [68]:
lat, lon, name = geo_data[0]['lat'], geo_data[0]['lon'], geo_data[0]['name']
lat, lon

(55.7823547, 49.1242266)

Эти данные мы будем использовать далее для получения данных о погоде. Но так как эта информация статическая, мы можем ее сохранить в базе данных, чтобы сразу обращаться к ней вместо обращения к API. Сформируем INSERT запрос, который выполним позже. (схемы таблиц будут приведены позже)

In [72]:
insert_city_expr = f"""
INSERT INTO city
VALUES ('{name}', {lat}, {lon});
"""

Используем Current Weather API для получения данных о погоде города Казань

In [53]:
api_weather = api_weather_template.format(lat=lat, lon=lon, api_key=APIKEY)
response = requests.get(f'{url}/{api_weather}')
response.status_code

200

Какой ответ мы получаем: координаты, данные о погоде (пасмурно, солнечно), количественные данные (температура, давление, влажность, видимость, скорость и направление ветра, облачность), время запроса, время рассвета и заката, данные о часовом поясе и другие внутренние параметры API. Преобразуем временные параметры, добавим поле - название города в EN, и распарсим поле <strong>weather</strong> - он представляет собой список словарей, нам нужно описание состояния погоды (поле <strong>description</strong>) в каждом состоянии.

In [54]:
response.text

'{"coord":{"lon":49.1242,"lat":55.7824},"weather":[{"id":804,"main":"Clouds","description":"пасмурно","icon":"04d"}],"base":"stations","main":{"temp":5.85,"feels_like":4.68,"temp_min":3.79,"temp_max":6.66,"pressure":1015,"humidity":80,"sea_level":1015,"grnd_level":1009},"visibility":10000,"wind":{"speed":1.7,"deg":251,"gust":2.2},"clouds":{"all":99},"dt":1697801055,"sys":{"type":2,"id":48937,"country":"RU","sunrise":1697772194,"sunset":1697808793},"timezone":10800,"id":551487,"name":"Казань","cod":200}'

In [55]:
df = pd.json_normalize(json.loads(response.text), sep='_')

In [57]:
df['weather'] = df.weather.apply(lambda x: ','.join(k['description'] for k in x))
df['report_datetime'] = pd.to_datetime(df['dt'] + df['timezone'], unit='s').astype('str')
df['sunrise_datetime'] = pd.to_datetime(df['sys_sunrise'] + df['timezone'], unit='s').astype('str')
df['sunset_datetime'] = pd.to_datetime(df['sys_sunset'] + df['timezone'], unit='s').astype('str')
df['city_name_en'] = name

Сформируем список необходимых данных, которые мы далее будем загружать в БД.

In [74]:
columns = [
    'city_name_en',
    'name',
    'sys_country',
    'report_datetime',
    'weather',
    'visibility',
    'main_temp',
    'main_feels_like',
    'main_temp_min',
    'main_temp_max',
    'main_pressure',
    'main_humidity',
    'main_sea_level',
    'main_grnd_level',
    'wind_speed',
    'wind_deg',
    'wind_gust',
    'clouds_all',
    'coord_lon',
    'coord_lat',
    'sunrise_datetime',
    'sunset_datetime',
]
df[columns].to_records(index=False).tolist()[0]

('Kazan',
 'Казань',
 'RU',
 '2023-10-20 14:24:15',
 'пасмурно',
 10000,
 5.85,
 4.68,
 3.79,
 6.66,
 1015,
 80,
 1015,
 1009,
 1.7,
 251,
 2.2,
 99,
 49.1242,
 55.7824,
 '2023-10-20 06:23:14',
 '2023-10-20 16:33:13')

In [63]:
insert_expr = f"""
INSERT INTO weather
VALUES {df[columns].to_records(index=False).tolist()[0]};
"""


Создадим подключение к БД и загрузим полученные данные в БД.

In [73]:
postgres_password = os.getenv('POSTGRES_PASSWORD')
with psycopg2.connect(database='weather', user='postgres', password=postgres_password) as conn:
    with conn.cursor() as cur:
        cur.execute(insert_city_expr)
        cur.execute(insert_expr)

Объединим данные шаги в одну функцию, которую потом можно завернуть в ДАГ Airflow или обернуть как эндпоинт своего сервиса по получению данных о погоде. В данной функции у нас будут параметры: интересующий город (+страна) и опциональный параметр (количество получаемых геолокаций). Добавим также обработку ошибок и получение данных о городе из БД, если по этому городу уже до этого получали данные. <br><br> Реализуем на примере локального хранения (в случае использования в Airflow или бэке необходимо будет изменить только способ получения данных из БД - ORM (Flask, FASTApi) или hooks (Airflow) и загрузку данных (аналогично)).

In [95]:
def load_current_weather_data(city: str, country: str, limit=5):
    # Данные параметры можно завернуть в глобальные
    # В данном случае привожу их в функции чтобы алгоритм был в одном месте
    url = 'https://api.openweathermap.org'
    api_weather_template = 'data/2.5/weather?lat={lat}&lon={lon}&units=metric&lang=ru&appid={api_key}'      # Weather API
    api_geocoding_template = 'geo/1.0/direct?q={location_part}&limit={limit}&appid={api_key}'       # Geocoding API
    postgres_password = os.getenv('POSTGRES_PASSWORD')

    # Получаем имеющие данные по данному городу
    with psycopg2.connect(database='weather', user='postgres', password=postgres_password) as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT * FROM city WHERE name = '{city.lower()}'")
            city_date = cur.fetchone()
            if city_date is None:
                location_part = city + ',' + country
                api_geocoding = api_geocoding_template.format(location_part=location_part, api_key=APIKEY, limit=limit)
                geocoding_response = requests.get(f'{url}/{api_geocoding}')
                if geocoding_response.status_code != 200:
                    geocoding_response.raise_for_status()
                geo_data = json.loads(geocoding_response.text)[0]
                name, lat, lon = geo_data['name'], geo_data['lat'], geo_data['lon']
                insert_city_expr = f"""
                    INSERT INTO city
                    VALUES ('{name.lower()}', {lat}, {lon});
                """
                cur.execute(insert_city_expr)
                print(f'Данные о городе {city} взяты из API')
            else:
                name, lat, lon = city_date
                print(f'Данные о городе {city} взяты из БД')

            # Запрашиваем данные по погоде
            api_weather = api_weather_template.format(lat=lat, lon=lon, api_key=APIKEY)
            response = requests.get(f'{url}/{api_weather}')
            if response.status_code != 200:
                response.raise_for_status()

            df = pd.json_normalize(json.loads(response.text), sep='_')
            df['weather'] = df.weather.apply(lambda x: ','.join(k['description'] for k in x))
            df['report_datetime'] = pd.to_datetime(df['dt'] + df['timezone'], unit='s').astype('str')
            df['sunrise_datetime'] = pd.to_datetime(df['sys_sunrise'] + df['timezone'], unit='s').astype('str')
            df['sunset_datetime'] = pd.to_datetime(df['sys_sunset'] + df['timezone'], unit='s').astype('str')
            df['city_name_en'] = name

            # Загружаем необходимые данные в БД
            columns = [
                'city_name_en','name','sys_country','report_datetime','weather','visibility','main_temp','main_feels_like','main_temp_min','main_temp_max','main_pressure','main_humidity','wind_speed','wind_deg','wind_gust','clouds_all','coord_lon','coord_lat','sunrise_datetime','sunset_datetime',
            ]
            insert_expr = f"""
                INSERT INTO weather
                VALUES {df[columns].to_records(index=False).tolist()[0]};
            """
            cur.execute(insert_expr)


In [96]:
load_current_weather_data('Kazan', 'Russia')

Данные о городе Kazan взяты из БД


In [91]:
load_current_weather_data('Moscow', 'Russia')

Данные о городе Moscow взяты из БД


In [100]:
from pprint import pprint

postgres_password = os.getenv('POSTGRES_PASSWORD')
with psycopg2.connect(database='weather', user='postgres', password=postgres_password) as conn:
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM city;")
        city_data = cur.fetchall()
        pprint(city_data)

[('kazan', Decimal('55.782355'), Decimal('49.124227')),
 ('moscow', Decimal('55.750446'), Decimal('37.617494'))]


In [101]:
postgres_password = os.getenv('POSTGRES_PASSWORD')
with psycopg2.connect(database='weather', user='postgres', password=postgres_password) as conn:
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM weather;")
        weather_data = cur.fetchall()
        pprint(weather_data)

[('kazan',
  'Казань',
  'RU',
  datetime.datetime(2023, 10, 20, 15, 28, 18),
  'пасмурно',
  10000,
  Decimal('5.85'),
  Decimal('5.85'),
  Decimal('3.79'),
  Decimal('6.66'),
  1015,
  76,
  Decimal('0.96'),
  255,
  Decimal('1.36'),
  100,
  Decimal('49.124200'),
  Decimal('55.782400'),
  datetime.datetime(2023, 10, 20, 6, 23, 14),
  datetime.datetime(2023, 10, 20, 16, 33, 13)),
 ('moscow',
  'Москва',
  'RU',
  datetime.datetime(2023, 10, 20, 15, 20, 25),
  'пасмурно',
  7965,
  Decimal('5.41'),
  Decimal('2.28'),
  Decimal('3.99'),
  Decimal('7.15'),
  1011,
  90,
  Decimal('4.15'),
  56,
  Decimal('5.88'),
  100,
  Decimal('37.617500'),
  Decimal('55.750400'),
  datetime.datetime(2023, 10, 20, 7, 9, 16),
  datetime.datetime(2023, 10, 20, 17, 19, 14)),
 ('kazan',
  'Казань',
  'RU',
  datetime.datetime(2023, 10, 20, 15, 28, 18),
  'пасмурно',
  10000,
  Decimal('5.85'),
  Decimal('5.85'),
  Decimal('3.79'),
  Decimal('6.66'),
  1015,
  76,
  Decimal('0.96'),
  255,
  Decimal('1.36

Схема БД:
<img src="img/weather_schema.png">