## Sales etl code

```python
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from datetime import timedelta

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.hooks.filesystem import FSHook

import logging
import os
import json


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['jakub.kanclerz@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

from pprint import pprint as pp
from dateutil.relativedelta import relativedelta

def load_time(**kwargs):
    pp(kwargs['execution_date'])
    current = kwargs['execution_date']
    limit = current.today() + relativedelta(months=12)
    db = PostgresHook(postgres_conn_id="my_warehouse_1")

    pp(db)
    conn = db.get_conn()
    c = conn.cursor()

    c.execute('''
        INSERT INTO datetime_dimension
        SELECT
            cast(to_char(ds, 'YYYYMMDDHH24') as INTEGER) as id,
            ds,
            EXTRACT(year from ds) as year,
            EXTRACT(month from ds) as month,
            EXTRACT(day from ds) as day,
            EXTRACT(WEEK from ds) as week,
            EXTRACT(HOUR from ds) as hour,
            EXTRACT(dow from ds) as day_of_week

        from (
        SELECT
            generate_series(
                (date %(current)s)::timestamp,
                (date %(limit)s)::timestamp,
                interval '1 hour') as ds ) as t1
        ON CONFLICT 
        ON CONSTRAINT datetime_dimension_pkey DO NOTHING
    ''', {"limit": limit, "current": current})

    conn.commit()

def extract_products(**kwargs):
    db = MySqlHook(mysql_conn_id='my_ecommerce')
    pp(kwargs['execution_date'])

    fs = FSHook(conn_id="tmp_fs")
    logging.info("FS path: {}".format(fs.get_path()))
   
    file_name = str(kwargs["execution_date"]) + '.json'
    dir_path = os.path.join(
        fs.get_path(),
        kwargs["dag"].dag_id,
        kwargs["task"].task_id)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    
    target = os.path.join(dir_path, file_name)
    

    conn = db.get_conn()
    c = conn.cursor()

    c.execute('''
        SELECT
            p.id as product_id,
            p.code as name
        FROM
            ecommerce_product p 
    ''')

    res = c.fetchall()
    json.dump(res, open(target, 'w+'))

def load_products(**kwargs):
    # Set source file
    fs = FSHook(conn_id="tmp_fs")
    source_file_name = str(kwargs["execution_date"]) + '.json'
    source_dir_path = os.path.join(
        fs.get_path(),
        kwargs["dag"].dag_id,
        'extract_products')
    target = os.path.join(source_dir_path, source_file_name)

    with open(target) as json_file:
        products = json.load(json_file)
    
    db = PostgresHook(postgres_conn_id="my_warehouse_1")

    pp(products)
    conn = db.get_conn()
    c = conn.cursor()
    c.executemany('''
        Insert into products (product_id, name) VALUES (%s,%s)
        ON CONFLICT 
        ON CONSTRAINT products_product_id_key DO NOTHING
    ''', products)

    conn.commit()

def extract_sales(**kwargs):
    db = MySqlHook(mysql_conn_id='my_ecommerce')
    pp(kwargs['execution_date'])

    fs = FSHook(conn_id="tmp_fs")
    logging.info("FS path: {}".format(fs.get_path()))
   
    file_name = str(kwargs["execution_date"]) + '.json'
    dir_path = os.path.join(
        fs.get_path(),
        kwargs["dag"].dag_id,
        kwargs["task"].task_id)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    
    target = os.path.join(dir_path, file_name)
    

    conn = db.get_conn()
    c = conn.cursor()

    c.execute('''
        SELECT
            oi.order_id,
            t.id as category_id,
            pv.product_id,
            CAST(DATE_FORMAT(o.created_at, '%Y%m%d%H') as INTEGER) as datetime_dim,
            CAST(sum(oi.total) as INTEGER) as total
        FROM
            ecommerce_order_item oi
        JOIN 
            ecommerce_product_variant pv on pv.id = oi.variant_id
        JOIN 
            ecommerce_order o on oi.order_id = o.id
        JOIN
            ecommerce_product p on p.id = pv.product_id
        JOIN
            ecommerce_taxon t on t.id = p.main_taxon_id
        GROUP BY
            oi.order_id,
            t.id,
            pv.product_id
        ;
    ''')

    res = c.fetchall()
    pp(res)
    json.dump(res, open(target, 'w+'))

def load_sales(**kwargs):
    # Set source file
    fs = FSHook(conn_id="tmp_fs")
    source_file_name = str(kwargs["execution_date"]) + '.json'
    source_dir_path = os.path.join(
        fs.get_path(),
        kwargs["dag"].dag_id,
        'extract_sales')
    target = os.path.join(source_dir_path, source_file_name)

    with open(target) as json_file:
        products = json.load(json_file)
    
    db = PostgresHook(postgres_conn_id="my_warehouse_1")

    pp(products)
    conn = db.get_conn()
    c = conn.cursor()
    c.executemany('''
        INSERT INTO sales (order_id, product_id, category_id, datetime_id, total_sales) VALUES
            (
                %s,
                (select p.id from products p where p.product_id = %s),
                (select 1 from categories where 0 != %s LIMIT 1),
                %s,
                %s
            )
    ''', products)

    conn.commit()

with DAG('etl-sales', start_date=datetime(2021, 1, 10), default_args=default_args, schedule_interval=None) as dag:

    start = DummyOperator(task_id='start')

    extract_products = PythonOperator(task_id='extract_products', python_callable=extract_products)
    transform_products = DummyOperator(task_id='transform_products')
    load_products = PythonOperator(task_id='load_products', python_callable=load_products)

    extract_categories = DummyOperator(task_id='extract_categories')
    load_categories = DummyOperator(task_id='load_categories')

    extract_channels = DummyOperator(task_id='extract_channels')
    load_channels = DummyOperator(task_id='load_channels')

    extract_sales = PythonOperator(task_id='extract_sales', python_callable=extract_sales)
    load_sales = PythonOperator(task_id='load_sales', python_callable=load_sales)
    
    load_time = PythonOperator(
        task_id='load_time',
        python_callable=load_time
    )


    end = DummyOperator(task_id='end')
    
    start >> extract_products >> transform_products >> load_products
    start >> extract_categories >> load_categories
    start >> extract_channels >> load_channels
    start >> extract_sales >> load_sales
    start >> load_time

    [load_products, load_time, load_categories, load_channels, load_sales] >> end

```