# Дипломное задание по курсу
Преподаватель: Игорь Бричко
**Дипломная работа по курсу **

Цель: построить пайплайн через Airflow

**Задание: **

Данные для выполнения дипломного задания

1. Вам необходимо построить airflow пайплайн выгрузки ежедневных отчётов по количеству поездок на велосипедах в городе Нью-Йорк.

2. Рекомендации при выполнении работы:

Пайплайн должен состоять из следующих шагов:

1. Отслеживание появление новых файлов в своём бакете на AWS S3. Представим, что пользователь или провайдер данных будет загружать новые исторические данные по поездкам в Ваш бакет;
2. При появлении нового файла запускается оператор импорта данных в созданную таблицу базы данных Clickhouse;
3. Необходимо сформировать таблицы с ежедневными отчётами по следующим критериям:
 – количество поездок в день
 – средняя продолжительность поездок в день
 – распределение поездок пользователей, разбитых по категории «gender»
4. Данные статистики необходимо загрузить на специальный S3 бакет с хранящимися отчётами по загруженным файлам.

## Step №1
Работа с источником данных:
1. Парсинг данных с сайта - источника информации;
2. Анализ перечня файлов на сервере по их параметрам;
3. Определение наиболее нового файла с данными

In [3]:
# Сайт-источник данных (список архивов): https://s3.amazonaws.com/tripdata/index.html
# Программа для Step №1

import requests                                                   # Импорт библиотеки для парсинга данных с сайта с данными
from datetime import datetime as dt                               # Импорт библиотеки для работой с датами и временем

class FindNewBucket:                                              # Создание класса для парсинга данных с сайта
    def __init__(self, format_='Date Modified'):
        self.format_ = format_
        self.name = 'Name'
        self.date = 'Date Modified'
    
    def get_bucket(self):
        self.db = requests.get('https://s3.amazonaws.com/tripdata/index.html')
        return self.db.json()
    
    def snow_name(self):
        return self.name

    def snow_date(self):
        return self.date

print('============= start ==============')                       # Стартовый экранный буллит,
print('Поиск наиболее новых данных на сервере хранения')          # описывающий процесс исполняемого модуля
print('==================================')                       #

all_base = FindNewBucket(format_='full')                          # Присвоение переменной для использования класса
db = all_base.get_bucket()                                        # Выгрузка данных с сайта через класс

val_base = dict(db['Date Modified'])                              # Отделение сутевого содержимого из общей выгруженной инф-ции
print(val_base)                                                   # Контрольный вывод формата содержимого

result_name = ''                                                  # Объявление и обнуление переменной для результата "Name"
result_date = ''                                                  # Объявление и обнуление переменной для результата "Date Modified"
result_size = ''                                                  # Объявление и обнуление переменной для результата "Size"
result_type = ''                                                  # Объявление и обнуление переменной для результата "Type"

for bucket in val_base:                                           # Цикл обработки каждой из выгруженных записей bucket'ов (подряд)
    date_dt = ''                                                  # Объявление и обнуление переменной для преобразований дат
    line = {}                                                     # Обнуление рабочего словаря для данных по проверяемой
    line = val_base[bucket]                                       # Запись данных для конкретной валюты для анализа
    print(line)                                                   # Контрольный вывод анализируемой строки с данными
    
    try:                                                          # Проверка на корректность формата дат на сайте-источнике
        date_dt = dt.strptime(line['Date Modified'], '%b %d %Y, %I:%M:%S %p') # Приведение дат к единому формату "Дата - время"
    except:
        print ('Incorrected date format: ', line['Date Modified'])# Вывод сообщения об ошибке при невозможности трансформации даты
    
    if result_name == '':                                         # Проверка для выявления 1го шага цикла
        result_name = line['Name']                                # Запись наименования файла bucket'а
        result_date = date_dt                                     # Запись значения даты обновления файла bucket'а
        result_size = line['Size']                                # Запись размера файла bucket'а
        result_type = line['Type']                                # Запись значения типа архива файла bucket'а
        
    elif result_date <= (date_dt):                                # Проверка на превышение найденного ранее МАХ значения даты
        result_name = line['Name']                                # Запись наименования файла bucket'а
        result_date = date_dt                                     # Запись значения даты обновления файла bucket'а
        result_size = line['Size']                                # Запись размера файла bucket'а
        result_type = line['Type']                                # Запись значения типа архива файла bucket'а
        print('Новый лидер: ', result_name, ', обновлённый: ', result_date) # Контрольный вывод рез-та
    
    else:                                                         # Простейшая защита от некорректных дат
        pass

print('============ result ==============')
print('Послдений по времени загрузки bucket: ', result_name, ' с временем обновления: ', result_date)
print('Размер файла составляет: ', result_size, ', тип архива: ',result_type)
print('Не рекомендуется обрабатывать на стационарном ПК архивы размером более 16 Mb')
print('==================================')
print('1st step finished')
print('==================================')

Поиск наиболее новых данных на сервере хранения
<Response [200]>


TypeError: 'Response' object is not subscriptable

In [12]:
# Вариант кода №2 для Step 1
# import requests
# import pandas as pd
# import time
# from bs4 import BeautifulSoup

# url = 'https://s3.amazonaws.com/tripdata/index.html'    # url страницы
# r = requests.get(url)
# time.sleep(1.2)

# with open('test.html', 'w') as output_file:
#     output_file.write(r.text)
    
# result = pd.DataFrame()

# r = requests.get(url)                                  # Отправляем HTTP запрос и получаем результат
# soup = BeautifulSoup(r.text)                           # Отправляем полученную страницу в библиотеку для парсинга
    
# tables=soup.find_all('table', {'class': 'container'})    # Получаем все таблицы с вопросами
    
# for item in tables:
#     res=parse_table(item)
#     result=result.append(res, ignore_index=True)

# result.to_excel('result.xlsx')

## Step №2
1. Работа с данными в файле
2. Формирование таблицы с результатами обработки данных

In [173]:
import pandas as pd                                               # Импорт библиотеки Pandas
from datetime import datetime as dt                               # Импорт библиотеки для работой с датами и временем

# Обработку данных проводим с помощью Pandas - DataFrame
# Загрузка данных в переменную из файла с исходными данными: для чтения, с кодировкой "utf-8"

# Использовались два файла: 201501-citibike-tripdata.csv и 201502-citibike-tripdata.csv

# После отладки парсера на STEP 1 в строке ниже заменить название файла на result_name (имя самого нового файла, 
# с данными которого будем работать)

content = pd.read_csv('201501-citibike-tripdata.csv', sep=',', encoding='utf-8')

In [174]:
# content.head()                                 # Контрольный вывод содержимого DF и формата таблицы данных

In [175]:
# Преобразование дат в стандартный, короткий вид (ГГГГ-ММ-ДД): для сортировки по дням, без детализации до времени
content['starttime'] = pd.to_datetime(content['starttime']) # Преобразование даты к стандартному виду через PANDAS
content['starttime'].dt                                     # Преобразование столбца в формат datetime-библиотеки
content['starttime'] = content['starttime'].dt.normalize()  # Приведение вида столбца только к дате (обрезка времени)

In [176]:
# content.head()                                 # Контрольный вывод содержимого DF и формата дат в таблице данных

In [177]:
content.sort_values(by='starttime', inplace=True) # Сортировка данных в DataFrame по столбцу с датой поездки, с перезаписью
# content.head()                                    # Контрольный вывод содержимого DF и сортировки таблицы данных

In [178]:
# УЛУЧШАЙЗЕР: создание класса для всех операций над данными файла - для масштабирования при доп. запросах

class OperateWithData:                                              # Создание класса для операций с данными
    def __init__(self, format_='starttime'):
        self.format_ = format_
        self.gender = 'gender'
        self.duration = 'tripduration'
    
    def get_data_date(self):
        return self.content['starttime'][self.format_]
    
    def get_gender(self):
        return self.gender

    def get_duration(self):
        self.duration = content[content['starttime'] == self.format_].sum()['tripduration']
        return self.duration

In [179]:
# Основной блок анализа данных на STEP №2:
# После отладки УЛУЧШАЙЗЕРА заменить операции в цикле FOR (ниже) на функции класса OperateWithData

in_memory = pd.DataFrame(columns=['Date', 'NumberOfTrips', 'SumDuration', 'AverangeDuration', 'TripsForGenders']) # Создание
# дата-фрейма для сохранения результатов анализа данных по дням, по условиям задания работы. С названиями столбцов:
# 1. 'Date': дата дня, для которого сохраняются данные статистики;
# 2. 'NumberOfTrips': суммарное количество поездок за день (дату);
# 3. 'SumDuration': суммарная длительность всех поездок всех пользователей за день, в секундах;
# 4. 'AverangeDuration': средняя длительность всех поездок по всем пользователям и всем поездкам за день, в секундах;
# 5. 'TripsForGenders': распределение поездок за день по половому признаку пользователя сервисом, где:
# (1) - первый пол; (2) - второй пол; (0) - пол не указан в анкете пользователя

day_content = pd.DataFrame() # Создание промежуточного DataFrame для обработки данных одного дня из месяца
uni_date = content.starttime.unique() # Определение и сохранение уникальных дат в месяце

for date_ in uni_date:       # Цикл анализа данных: по дням (дате) из списка уникальных
    day_content = content[content['starttime'] == date_].sort_values(by='gender') # Save в DF данных дня, с сорт. по полу
    mem_all_trip_duration = (day_content[day_content['starttime'] == date_].sum()['tripduration']) # Суммирование длительностей
    mem_trip_of_gender = day_content.gender.value_counts()                  # Счёт поездок с разбивкой по полу пользователя
    mem_number_of_trips = len(day_content.index)                            # Счёт общего кол-ва поездок за день
    mem_averange_trip = mem_all_trip_duration / mem_number_of_trips         # Расчёт средней длительности поездки за день

#     БЛОК контрольного вывода результатов каждого из расчётов, для каждого дня анализируемого месяца
#     print('STATISTICS for ', date_, ':')                                    # Заголовок с датой, для которой результат
#     print('1) Number of Trips: ', mem_number_of_trips)                      # Общее кол-во поездок, в штуках
#     print('2) Trip duration, [sec.]: ', mem_all_trip_duration)              # Общая длителность всех поездок за день, в сек.
#     print('3) Averange duration of Trips, [sec.]: ', mem_averange_trip)     # Средняя длительность поездок за день, в сек.
#     print('4) Gender Trips: ', dict(mem_trip_of_gender))                    # Разрез кол-ва поездок по полам, в штуках

    _date_ = str(date_)                             # Преобразование длинного формата даты со временем
    _date_ = _date_[:10]                            # Обрезка даты в формат ГГГГ-ММ-ДД, для сохранения в результирующий DF
    
#   # Запись результатов в отдельный DF, с преобразованием рез-тов по поездкам в разрезе половой принадлежности в виде словаря
    in_memory.loc[len(in_memory.index)] = [_date_, mem_number_of_trips, mem_all_trip_duration,mem_averange_trip,dict(mem_trip_of_gender)]

in_memory                                           # Контрольный вывод результирующего DF

Unnamed: 0,Date,NumberOfTrips,SumDuration,AverangeDuration,TripsForGenders
0,2015-01-01,5317,4263206,801.806658,"{1: 3584, 2: 1067, 0: 666}"
1,2015-01-02,11304,8265948,731.240977,"{1: 8488, 2: 2174, 0: 642}"
2,2015-01-03,4478,2934363,655.284279,"{1: 3215, 2: 1037, 0: 226}"
3,2015-01-04,7849,5333821,679.554211,"{1: 5734, 2: 1722, 0: 393}"
4,2015-01-05,14506,9252098,637.811802,"{1: 11785, 2: 2471, 0: 250}"
5,2015-01-06,8739,5548587,634.922417,"{1: 7354, 2: 1364, 0: 21}"
6,2015-01-07,9646,5683529,589.210968,"{1: 8006, 2: 1604, 0: 36}"
7,2015-01-08,8779,5231063,595.860918,"{1: 7282, 2: 1451, 0: 46}"
8,2015-01-09,7930,5048697,636.657881,"{1: 6680, 2: 1211, 0: 39}"
9,2015-01-10,6109,3641121,596.0257,"{1: 4766, 2: 1268, 0: 75}"


# Step №3
Сохранение резульатов в файле со всеми результатами

In [183]:
import csv

try:                                                                  # Проверка на наличие ранее сохранённых данных в БД
    from_memory = pd.read_csv('FinalWork_DBforResults.csv', sep=',')  # Чтение ранее сохранённых результатов в БД
except:                                                               # Вариант при первичном запуске, с созданием файла БД
    in_memory.to_csv('FinalWork_DBforResults.csv', index=False)       # Создание первичного файла - БД, без столбца с индексом
    from_memory = pd.read_csv('FinalWork_DBforResults.csv', sep=',')  # Чтение ранее сохранённых результатов в БД

# НЕ работающие корректно варианты объединения DF: лезут дубликаты данных. Причина не понятна 
# new_DB = from_memory.merge(in_memory, how = 'outer', on = 'Date')
# new_DB = in_memory.join(from_memory, how='inner', lsuffix='_from', rsuffix='_in')
# new_DB = pd.merge(from_memory, in_memory, how='outer', left_on=['Date'], right_on=['Date'])
# new_DB = from_memory.set_index('Date').join(in_memory.set_index('Date'), how='outer', lsuffix='_from', rsuffix='_in')

# Альтернативный вариант решения для корректного сохраниения результатов в файл Базы данных (БД):
new_DB = from_memory.append(in_memory, sort=False)               # Добавление данных за новый период к ранее сохранённым в БД
new_DB.sort_values(by='Date', ascending=False)                   # Сортировка по столбцу "Date", по убыванию дат (не работает)
new_DB.drop_duplicates(subset = 'Date', keep = 'last', inplace = True) # Вычищаем дубликаты, оставляем последний по времени

new_DB.to_csv('FinalWork_DBforResults.csv', index=False)         # Выгрузка и сохранение всех данных в файл БД

from_memory = pd.read_csv('FinalWork_DBforResults.csv', sep=',') # Контрольная загрузка результата
from_memory                                                      # Контрольный вывод всех сохранённых в БД результатов

# УЛУЧШАЙЗЕР: с ходу запустить в корректную работу не получилось - всё равно требуется подгрузка данных в опер.память
# with open('FinalWork_DBforResults.csv', 'a') as csv_file:
#     in_memory.join(csv_file, how='outer')
#     print(csv_file)

Unnamed: 0,Date,NumberOfTrips,SumDuration,AverangeDuration,TripsForGenders
0,2015-02-01,6441,4110319,638.1492,"{1: 4897, 2: 1359, 0: 185}"
1,2015-02-02,1459,1343538,920.862234,"{1: 1245, 2: 203, 0: 11}"
2,2015-02-03,4754,3647385,767.224443,"{1: 4092, 2: 653, 0: 9}"
3,2015-02-04,8973,6259175,697.556559,"{1: 7457, 2: 1476, 0: 40}"
4,2015-02-05,9208,5740237,623.39672,"{1: 7752, 2: 1427, 0: 29}"
5,2015-02-06,8575,5463278,637.116968,"{1: 7145, 2: 1381, 0: 49}"
6,2015-02-07,6790,4661069,686.460825,"{1: 5217, 2: 1349, 0: 224}"
7,2015-02-08,6662,4443139,666.937706,"{1: 5099, 2: 1371, 0: 192}"
8,2015-02-09,5160,3271088,633.931783,"{1: 4311, 2: 798, 0: 51}"
9,2015-02-10,10328,6591894,638.254648,"{1: 8517, 2: 1718, 0: 93}"
