# ТЗ BI Consult
## Выполнил кандидат Тутов Артем
## на позицию Data Engineer
## резюме: (https://spb.hh.ru/resume/7ecc5374ff08b1cf280039ed1f6a36506d3534)

In [1]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd
import psycopg2
from airflow.models import Variable

# РАБОТА С ДАННЫМИ УЧЕБНЫХ ЗАВЕДЕНИЙ
(https://github.com/Hipo/university-domains-list-api)

In [2]:
# функция делает гет-запрос на url и возвращает жсон-ответ
def search_universities_by_name(name):
    url = f"http://universities.hipolabs.com/search?name={name}"
    response = requests.get(url)
    return response.json()

# функция делает гет-запрос на url и возвращает жсон-ответ
def search_universities_by_name_and_country(name, country):
    url = f"http://universities.hipolabs.com/search?name={name}&country={country}"
    response = requests.get(url)
    return response.json()

universities = search_universities_by_name("middle")

edu_est = pd.DataFrame(universities)
# для удобвства
edu_est.rename(columns = {'state-province'  : 'state_province'}, inplace = True)

In [3]:
edu_est

Unnamed: 0,alpha_two_code,name,country,domains,web_pages,state_province
0,TR,Middle East Technical University,Turkey,[metu.edu.tr],[http://www.metu.edu.tr/],
1,JO,Middle East University,Jordan,[meu.edu.jo],[http://www.meu.edu.jo/],
2,US,Middle Tennessee State University,United States,[mtsu.edu],[http://www.mtsu.edu/],
3,US,Middle Georgia State College,United States,[mga.edu],[http://www.mga.edu/],
4,KW,American University of Middle East,Kuwait,[aum.edu.kw],[http://www.aum.edu.kw/],
5,US,Middlebury College,United States,[middlebury.edu],[http://www.middlebury.edu/],
6,GB,Middlesbrough College,United Kingdom,"[middlesbro.ac.uk, mbro.ac.uk]",[https://www.mbro.ac.uk/],
7,GB,Middlesex University - London,United Kingdom,[mdx.ac.uk],[https://www.mdx.ac.uk/],
8,US,Middlesex County College,United States,[middlesexcc.edu],[http://www.middlesexcc.edu],
9,US,Middlesex Community College,United States,[middlesex.mass.edu],[http://www.middlesex.mass.edu],


In [4]:
edu_est.dtypes

alpha_two_code    object
name              object
country           object
domains           object
web_pages         object
state_province    object
dtype: object

In [5]:
edu_est.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   alpha_two_code  10 non-null     object
 1   name            10 non-null     object
 2   country         10 non-null     object
 3   domains         10 non-null     object
 4   web_pages       10 non-null     object
 5   state_province  0 non-null      object
dtypes: object(6)
memory usage: 608.0+ bytes


In [6]:
for i in edu_est['name'].unique():
    print(i)

Middle East Technical University
Middle East University
Middle Tennessee State University
Middle Georgia State College
American University of Middle East
Middlebury College
Middlesbrough College
Middlesex University - London
Middlesex County College
Middlesex Community College


# ОБРАБОТКА ПОЛУЧЕННОГО ДАТАФРЕЙМА

In [7]:
# функция определения типа учебного заведения
## осуществляется проверка на наличие ключевых слов в столбце name
def determine_institution_type(name):
    if 'College' in name:
        return 'College'
    elif 'University' in name:
        return 'University'
    elif 'Institute' in name:
        return 'Institute'
    else:
        return 'No Institution Type'
    
# создаем столбец с типом уч.завед. с применением функции
edu_est['Institution_Type'] = edu_est['name'].apply(determine_institution_type)
edu_est = edu_est.drop(['web_pages', 'domains'], axis=1) # дроп столбцов по условию тз

In [8]:
edu_est

Unnamed: 0,alpha_two_code,name,country,state_province,Institution_Type
0,TR,Middle East Technical University,Turkey,,University
1,JO,Middle East University,Jordan,,University
2,US,Middle Tennessee State University,United States,,University
3,US,Middle Georgia State College,United States,,College
4,KW,American University of Middle East,Kuwait,,University
5,US,Middlebury College,United States,,College
6,GB,Middlesbrough College,United Kingdom,,College
7,GB,Middlesex University - London,United Kingdom,,University
8,US,Middlesex County College,United States,,College
9,US,Middlesex Community College,United States,,College


# Войдем в контейнер Airflow

### docker exec -it airflow-webserver bash

# Создадим переменные для Airflow

#### airflow variables set POSTGRES_HOST 127.0.0.1
#### airflow variables set POSTGRES_DB universities_db
#### airflow variables set POSTGRES_USER ar4i
#### airflow variables set POSTGRES_PASSWORD i17i71i11i77i
#### airflow variables set LAST_SUCCESSFUL_LOAD "2023-05-05 00:00:00"

# РАБОТА С БД PG И НАСТРОЙКА DAG -ОВ

In [9]:
# подключение к ПГ
def load_data_to_postgres():
    # 
    conn = psycopg2.connect(
        host=Variable.get("POSTGRES_HOST"),
        database=Variable.get("POSTGRES_DB"),
        user=Variable.get("POSTGRES_USER"),
        password=Variable.get("POSTGRES_PASSWORD")
    )
    cursor = conn.cursor()

    # gолучаем дату последней успешной загрузки
    ## проверка и фильтрация данных для инкрементальной загрузки на основе времени последней успешной загрузки
    last_successful_load = Variable.get("LAST_SUCCESSFUL_LOAD", default_var=None)
    # блок проверяет, существует ли значение
    if last_successful_load:
        last_successful_load = datetime.strptime(last_successful_load, "%Y-%m-%d %H:%M:%S") ## преобразуем в формат датавремя
        ## выбираем только те строки, у которых индекс больше времени последней успешной загрузки
        edu_est = edu_est[edu_est.index > last_successful_load.timestamp()]

    # Загрузка данных в PostgreSQL
    for _, row in edu_est.iterrows(): # цикл выполняет итерацию по каждой строке
        # затем вставляет данные из каждой строки в таблицу в бд ПГ
        cursor.execute(
            "INSERT INTO universities (country, name, \"Institution_Type\") VALUES (%s, %s, %s)",
            (row['country'], row['name'], row['Institution_Type'])
        )
    conn.commit() # фиксируем изменения
    cursor.close() # закрываем курсор - освобождаем ресурсы
    conn.close() # закрываем соединение с бд

    # Сохранение даты последней успешной загрузки
    Variable.set("LAST_SUCCESSFUL_LOAD", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 5, 5),
    'email': ['artem_analyze@mail.ru'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'university_data_load',
    default_args=default_args,
    description='Загрузка данных по университетам',
    schedule_interval='0 3 * * *', # расписание DAG будет запускаться каждый день в 3 часа ночи
    catchup=False # не запускаем пропущеные даги
)

# задачи для загрузки данных
load_data_task = PythonOperator(
    task_id='load_data_to_postgres',
    python_callable=load_data_to_postgres,
    dag=dag
)