In [1]:
# default_exp pipeline

<br>

### End-to-End Pipeline

In [2]:
#exports
from percypics import instagram, whatsapp

import os
import dotenv
from typing import Any

from dagster import execute_pipeline, pipeline, solid, Field

In [3]:
_  = dotenv.load_dotenv('../.env')

test_mob = os.getenv('TEST_MOB')

In [4]:
#exports
@solid()
def download_and_save_posts(_, fp: str):
    loader = instagram.initialise_loader()
    df_posts = instagram.retrieve_posts(loader, fp)
    
    df_posts.to_csv(fp, index=False)
    
    return

@solid()
def send_posts_to_recipients(_, fp: str, num_mobs: int=5):
    client = whatsapp.initialise_client()
    
    wa_recipients = [os.getenv(f'MOB_{mob_num+1}') for mob_num in range(num_mobs)]

    for wa_recipient in wa_recipients:
        post = whatsapp.get_random_post(fp)
        message = whatsapp.send_post(client, post, wa_recipient)
        whatsapp.check_msg_status(client, message, wa_recipient)
        
    return

In [5]:
#exports
@pipeline
def retrieve_and_send_posts_pipeline():
    download_and_save_posts()
    send_posts_to_recipients()
    
    return

In [None]:
run_config = {
    'solids': {
        'download_and_save_posts': {
            'inputs': {
                'fp': '../data/percy_posts.csv'
            },
        },
        'send_posts_to_recipients': {
            'inputs': {
                'fp': '../data/percy_posts.csv',
                'num_mobs': 1
            },
        }
    }
}

execute_pipeline(retrieve_and_send_posts_pipeline, run_config=run_config)

[32m2020-12-25 03:18:10[0m - dagster - [34mDEBUG[0m - retrieve_and_send_posts_pipeline - 91f3ba57-f9d6-4839-8b9d-551ecf93abb8 - 15436 - ENGINE_EVENT - Starting initialization of resources [asset_store].
[32m2020-12-25 03:18:10[0m - dagster - [34mDEBUG[0m - retrieve_and_send_posts_pipeline - 91f3ba57-f9d6-4839-8b9d-551ecf93abb8 - 15436 - ENGINE_EVENT - Finished initialization of resources [asset_store].
[32m2020-12-25 03:18:10[0m - dagster - [34mDEBUG[0m - retrieve_and_send_posts_pipeline - 91f3ba57-f9d6-4839-8b9d-551ecf93abb8 - 15436 - PIPELINE_START - Started execution of pipeline "retrieve_and_send_posts_pipeline".
[32m2020-12-25 03:18:10[0m - dagster - [34mDEBUG[0m - retrieve_and_send_posts_pipeline - 91f3ba57-f9d6-4839-8b9d-551ecf93abb8 - 15436 - ENGINE_EVENT - Executing steps in process (pid: 15436)
[32m2020-12-25 03:18:10[0m - dagster - [34mDEBUG[0m - retrieve_and_send_posts_pipeline - 91f3ba57-f9d6-4839-8b9d-551ecf93abb8 - 15436 - download_and_save_posts.comp

In [None]:
# This is going to be super slow on GitHub actions
# Should only retrieve new posts