## 1. Выгрузка данных из API Яндекс.Погоды и преобразование их в csv


In [48]:
import requests
import csv
from datetime import datetime
from geopy.geocoders import Nominatim


yandex_disk_token  = 'y0_AgAAAAADieUXAAs3OQAAAAD5l583AABSW7sLLA5F-a1fdwaAcqq19cGXgQ'
api_key = 'c5827d58-8a0e-4999-ab0d-3563ba95c270'

cities = ['Moscow', 'Kazan', 'Saint Petersburg', 'Tula', 'Novosibirsk']

def get_coordinates(city):
    """Получает координаты города с использованием геокодера."""
    geolocator = Nominatim(user_agent="weather_app", timeout=10)
    location = geolocator.geocode(f'{city}, Russia')
    return {'lat': location.latitude, 'lon': location.longitude}

def get_weather_data(city):
    """Получает прогноз погоды для заданного города."""
    coordinates = get_coordinates(city)
    url = f'https://api.weather.yandex.ru/v2/forecast?lat={coordinates["lat"]}&lon={coordinates["lon"]}&hours=true&extra=true'
    headers = {'X-Yandex-API-Key': api_key}
    response = requests.get(url, headers=headers)
    return response.json()

def transform_data(weather_data):
    """Преобразует данные прогноза погоды в требуемый формат."""
    transformed_data = []
    for day in weather_data['forecasts']:
        date_str = day['date']
        date = datetime.strptime(date_str, '%Y-%m-%d').strftime('%d.%m.%Y')
        for hour_data in day['hours']:
            hour = hour_data['hour']
            temperature_c = hour_data['temp']
            pressure_mm = hour_data['pressure_mm']
            is_rainy = 1 if 'prec_mm' in hour_data and hour_data['prec_mm'] > 0 else 0
            transformed_data.append([date, hour, temperature_c, pressure_mm, is_rainy])
    return transformed_data

def save_to_csv(city, data):
    """Сохраняет данные в CSV файл."""
    with open('weather_data.csv', 'a', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerows([[city] + row for row in data])

def upload_to_yandex_disk(file_path, token, destination_path):
    """Загружает файл на Яндекс.Диск."""
    url = 'https://cloud-api.yandex.net/v1/disk/resources/upload'
    headers = {'Authorization': f'OAuth {token}'}

    # Получаем ссылку для загрузки файла
    params = {'path': destination_path, 'overwrite': 'true'}
    response = requests.get(url, headers=headers, params=params)
    #print(response.json())
    try:
        response.raise_for_status()
        upload_url = response.json()['href']
    except requests.exceptions.HTTPError as err:
        print(f'HTTP Error: {err.response.status_code}, {err.response.text}')
        return
    except KeyError:
        try:
            # Пробуем получить URL для загрузки из поля 'operation'
            upload_url = response.json()['operation']['href']
        except KeyError:
            # Если и это не сработает, выводим сообщение об ошибке
            print(f'Failed to retrieve upload URL for {destination_path}.')
            return

    # Загружаем файл по полученной ссылке
    with open(file_path, 'rb') as file:
        response = requests.put(upload_url, files={'file': file})

    print(f'File {file_path} uploaded to Yandex.Disk as {destination_path}')
    
    file_url = f'https://disk.yandex.ru/client/disk{destination_path}'
    #print(f'File uploaded successfully. You can access it at: {file_url}')

        
def get_shared_link(token, path):
    """Получает общедоступную ссылку на файл на Яндекс.Диске."""
    url = f'https://cloud-api.yandex.net/v1/disk/resources/publish?path={path}'
    headers = {'Authorization': f'OAuth {token}'}

    try:
        response = requests.put(url, headers=headers)
        response.raise_for_status()
        
        # Получаем информацию о файле, включая общедоступную ссылку
        file_info_url = f'https://cloud-api.yandex.net/v1/disk/resources?path={path}'
        response = requests.get(file_info_url, headers=headers)
        response.raise_for_status()
        print(f'File at {path} has been published successfully.')
        
        # Извлекаем общедоступную ссылку из ответа
        shared_link = response.json().get('public_url')
        
        if shared_link:
            print(f'Shared link for {path}: {shared_link}')
        else:
            print(f'Failed to retrieve shared link for {path}.')
    except requests.exceptions.HTTPError as e:
        print(f'HTTP Error: {e.response.status_code} - {e.response.text}')        

if __name__ == "__main__":
    with open('weather_data.csv', 'w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['city', 'date', 'hour', 'temperature_c', 'pressure_mm', 'is_rainy'])

    for city in cities:
        weather_data = get_weather_data(city)
        if 'forecasts' in weather_data:
            transformed_data = transform_data(weather_data)
            save_to_csv(city, transformed_data)
            print(f'Data for {city} saved successfully.')
        else:
            print(f'Failed to retrieve data for {city}. Check your API key or city name.')
           

    # Загрузка файла на Яндекс.Диск
    upload_to_yandex_disk('weather_data.csv', yandex_disk_token, '/weather_data.csv')
    
    # Публикация файла на Яндекс.Диск  и получение общедоступной ссылки
    get_shared_link(yandex_disk_token, '/weather_data.csv')

Data for Moscow saved successfully.
Data for Kazan saved successfully.
Data for Saint Petersburg saved successfully.
Data for Tula saved successfully.
Data for Novosibirsk saved successfully.
File weather_data.csv uploaded to Yandex.Disk as /weather_data.csv
File at /weather_data.csv has been published successfully.
Shared link for /weather_data.csv: https://yadi.sk/d/ZdaGdtyXRL3qZQ


Теперь, касательно ускорения получения данных по API и их преобразования в Airflow:

1. Параллельные запросы: Можно использовать концепцию параллельных запросов, выполняя запросы к API для разных городов параллельно. Это можно сделать, например, с использованием библиотеки concurrent.futures в Python.

2. Кеширование данных: Если данные редко изменяются, то рассмотреть возможность кеширования ответов API, чтобы избежать повторных запросов за теми же данными. В Airflow  использовать Cache операторы или хранить результаты запросов в общем хранилище данных.

3. Использование Airflow Parallelism: В Airflow настроить параллелизм выполнения задач, чтобы ускорить обработку данных.

4. Оптимизация кода: Периодически пересматривайте свой код на предмет оптимизации. Возможно, есть способы улучшить производительность кода.

Использование Airflow позволяет создавать гибкие и масштабируемые рабочие процессы с автоматизированным выполнением задач.