In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from pathlib import Path
import json
import pandas as pd
import requests
import zipfile

default_args = {
    'owner': 'airflow',
    'retries': 5,
    'retry_delay': timedelta(minutes=2)
}


#Функция записи в таблицу вакансий
def insert_ex(name, company,desc, skills):
    sqlite_hook = SqliteHook(sqlite_conn_id="sqlite_default")
    for i in range(len(name)):
        rows = [(name[i],company[i],desc[i],str(skills[i]))]
        sqlite_hook.insert_rows(
        table = 'hh_api',
        rows = rows,
        target_fields = ['name','company','description','skills']
    )

#Функция записи в таблицу компаний    
def insert_company(inn, kpp,okved_o, full_name):
    sqlite_hook = SqliteHook(sqlite_conn_id="sqlite_default")
    for i in range(len(inn)):
        rows = [(okved_o[i],inn[i],full_name[i],kpp)]
        sqlite_hook.insert_rows(
        table = 'telecom_companies',
        rows = rows,
        target_fields = ['okved','inn','full_name','kpp']
    )

#парсинг вакансий
def api_hh():
    url = "https://api.hh.ru/vacancies?text=middle python&per_page=100&page=0"
    result = requests.get(url)
    vacancies = result.json().get('items')
    links = []
    for i in range(len(vacancies)):
        links.append(vacancies[i]['url'])
    names_list=[]
    company_list=[]
    desc_list=[]
    skills_l=[]
    for link in links:
        url=link
        result = requests.get(url)
        vacancy = result.json()
        name = vacancy['name']
        emp = vacancy['employer']['name']
        desc = vacancy['description']
        skills =  [i['name'] for i in vacancy['key_skills']]
        skills = ", ".join(skills)
        names_list.append(name)
        company_list.append(emp)
        desc_list.append(desc)
        skills_l.append(skills)
    insert_ex(names_list, company_list, desc_list, skills_l)
 
 #Вспомогательная функция для парсинга компаний
def okved(row):
    try:
        if(row['data']['СвОКВЭД']['СвОКВЭДОсн']['КодОКВЭД'][:2]=='61'):
            return "yes"
    except:
        return "no"
    return "no"

#Функция записи компаний в таблицу компаний    
def record_or_replace(df):
    inn_list=[]
    kpp_list=[]
    okved_list=[]
    full_name_l=[]
    for i in range(len(df)):
        inn = df.iloc[i]['inn']
        kpp = df.iloc[i]['kpp']
        okved_o = df.iloc[i]['data']['СвОКВЭД']['СвОКВЭДОсн']['КодОКВЭД']
        full_name = df.iloc[i]['full_name']
        inn_list.append(inn)
        kpp_list.append(kpp)
        okved_list.append(okved_o)
        full_name_l.append(full_name)
    insert_company(inn_list, kpp_list,okved_list, full_name_l)

#Обобщенная функция для парсинга компаний с оквед    
def parse_company_okved():
    z = zipfile.ZipFile('/home/foggy/test/test.zip')
    z.namelist()
    for name in z.namelist():
        z.extract(name)
        with open(name,'r',encoding='utf-8') as f:
            data = pd.read_json(name)
            data['tst']=data.apply(okved,axis=1)
            dt2 = data[data.tst=='yes']
            record_or_replace(dt2)
        path = Path(name)
        path.unlink()

#Вывод топ ключевых навыков 
def top():
    sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_default')
    sqlite_conn = sqlite_hook.get_conn()
    cur = sqlite_conn.cursor()
    res = cur.execute("SELECT skills FROM hh_api")
    arr = res.fetchall() 
    
    skills=[]
    for i in range(len(arr)):
        array = arr[n][0].replace("[","").replace("\'", "").replace("]", "")
        array = array.split(',')
        skills += array
    df = pd.DataFrame(skills)
    print(df.value_counts())

with DAG(
    dag_id='download_hh',
    default_args=default_args,
    description='DAG for donwload okved file',
    start_date=datetime(2023, 8, 1, 8),
    schedule_interval='@daily'
) as dag:

    task1 = SqliteOperator(
        task_id='create_table_for_vacancies',
        sqlite_conn_id='sqlite_default',
        sql='''CREATE TABLE IF NOT EXISTS hh_api (name varchar,
                                                        company varchar,
                                                        description text,
                                                        skills varchar);'''
    )

    task2 = SqliteOperator(
        task_id='create_table_for_okved',
        sqlite_conn_id='sqlite_default',
        sql='''CREATE TABLE IF NOT EXISTS telecom_companies (okved varchar,
                                                             inn bigint,
                                                             full_name varchar,
                                                             kpp bigint);'''
    )

    task3 = PythonOperator(
        task_id='parse',
        python_callable=api_hh,
    )

    task4 = BashOperator(
        task_id='download_file',
        bash_command="cp /home/foggy/test.zip /home/foggy/test"
    )
    
    task5 = PythonOperator(
        task_id='parse_company_okved',
        python_callable=parse_company_okved,
    )
    
    task6 = PythonOperator(
        task_id='top-10',
        python_callable=top,
    )
    

    task1>>task2>>task3>>task4>>task5>>task6
