In [None]:
import requests
from zipfile import ZipFile
from io import BytesIO
import pandas as pd
from datetime import timedelta
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

In [None]:
TOP_1M_DOMAINS = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip'
TOP_1M_DOMAINS_FILE = 'top-1m.csv'


def get_data():
    top_doms = requests.get(TOP_1M_DOMAINS, stream=True)
    zipfile = ZipFile(BytesIO(top_doms.content))
    top_data = zipfile.read(TOP_1M_DOMAINS_FILE).decode('utf-8')

    with open(TOP_1M_DOMAINS_FILE, 'w') as f:
        f.write(top_data)


def get_top_domain_zones():
    top_data_df = pd.read_csv(TOP_1M_DOMAINS_FILE, names=['rank', 'domain'])
    f = lambda x: x.split('.')[1]
    top_data_df['dom_zone'] = top_data_df['domain'].apply(f)
    top_10_domzones = top_data_df.groupby('dom_zone', as_index = False) \
        .agg({'domain':'count'}) \
        .sort_values('domain', ascending=False) \
        .head(10)
    with open('top_10_domzones.csv', 'w') as f:
        f.write(top_10_domzones.to_csv(index=False, header=False)

def get_longest_domain():
    top_data_df = pd.read_csv(TOP_1M_DOMAINS_FILE, names=['rank', 'domain'])
    len_domain = lambda x: len(x)
    top_data_df['dom_len'] = top_data_df['domain'].apply(len_domain)
    longest_domain = top_data_df.sort_values(by = 'dom_len', ascending=False).head(1)
    with open('longest_domain.csv', 'w') as f:
        f.write(longest_domain.to_csv(index=False, header=False)                


def print_data(ds):
    with open('top_10_domzones.csv', 'r') as f:
        all_data = f.read()
    with open('longest_domain.csv', 'r') as f:
        all_data_com = f.read()
    date = ds

    print(f'Top-10 domain zones for date {date}')
    print(all_data)

    print(f'Longest domain for date {date}')
    print(all_data_com)

                
default_args = {
    'owner': 'a.kosheleva-14',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 12, 6),
    'schedule_interval': '0 10 * * *'
}

dag = DAG('KC_Airflow_Lesson2_a.kosheleva-14', default_args=default_args)

t1 = PythonOperator(task_id='get_data',
                    python_callable=get_data,
                    dag=dag)

t2 = PythonOperator(task_id='get_top_domain_zones',
                    python_callable=get_stat,
                    dag=dag)

t2_top = PythonOperator(task_id='get_longest_domain',
                        python_callable=get_stat_com,
                        dag=dag)

t3 = PythonOperator(task_id='print_data',
                    python_callable=print_data,
                    dag=dag)

t1 >> [t2, t2_top] >> t3