In [1]:
# default_exp pipeline

# Pipeline

<br>

### Imports

In [2]:
#exports
import pandas as pd

from spotifywatch import playlists
from dagster import execute_pipeline, pipeline, solid, Field

import os
import dotenv

<br>

### Loading Environment Variables

In [3]:
#hide
_ = dotenv.load_dotenv('../.env')

In [4]:
#exports
@solid()
def retrieve_latest_discover_weekly(_, playlist_id: str='spotify:playlist:37i9dQZEVXcOwrS8NC07JJ'):
    df_latest_discover_weekly = playlists.retrieve_playlist_df(playlist_id)
        
    return df_latest_discover_weekly

@solid()
def update_discover_weekly_archive(_, df_latest_discover_weekly, discover_weekly_archive_fp: str='data/playlists/discover_weekly.csv'):
    df_discover_weekly_archive = (pd
                                  .read_csv(discover_weekly_archive_fp)
                                  .append(df_latest_discover_weekly)
                                  .drop_duplicates()
                                 )
    
    df_discover_weekly_archive.to_csv(discover_weekly_archive_fp, index=False)
    
    return df_discover_weekly_archive

In [5]:
#exports
@pipeline
def update_discover_weekly_archive_pipeline():  
    df_latest_discover_weekly = retrieve_latest_discover_weekly()
    update_discover_weekly_archive(df_latest_discover_weekly)

In [6]:
run_config = {
    'solids': {
        'retrieve_latest_discover_weekly': {
            'inputs': {
                'playlist_id': 'spotify:playlist:37i9dQZEVXcOwrS8NC07JJ'
            },
        },
        'update_discover_weekly_archive': {
            'inputs': {
                'discover_weekly_archive_fp': '../data/playlists/discover_weekly.csv',
            },
        },
    }
}

execute_pipeline(update_discover_weekly_archive_pipeline, run_config=run_config)

[32m2021-01-02 19:34:50[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - ENGINE_EVENT - Starting initialization of resources [asset_store].
[32m2021-01-02 19:34:50[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - ENGINE_EVENT - Finished initialization of resources [asset_store].
[32m2021-01-02 19:34:50[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - PIPELINE_START - Started execution of pipeline "update_discover_weekly_archive_pipeline".
[32m2021-01-02 19:34:50[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - ENGINE_EVENT - Executing steps in process (pid: 23452)
[32m2021-01-02 19:34:50[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafc

[32m2021-01-02 19:34:52[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - retrieve_latest_discover_weekly.compute - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
[32m2021-01-02 19:34:52[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - retrieve_latest_discover_weekly.compute - OBJECT_STORE_OPERATION - Stored intermediate object for output result in memory object store using pickle.
[32m2021-01-02 19:34:52[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - retrieve_latest_discover_weekly.compute - STEP_SUCCESS - Finished execution of step "retrieve_latest_discover_weekly.compute" in 2.3s.
[32m2021-01-02 19:34:52[0m - dagster - [34mDEBUG[0m - update_discover_weekly_archive_pipeline - 39b212be-bc36-490a-ad86-da40bbafcc2d - 23452 - update_disco

<dagster.core.execution.results.PipelineExecutionResult at 0x1e46ee06460>

In [7]:
#hide
from nbdev.export import *
notebook2script()

Converted 01-discography.ipynb.
Converted 02-playlists.ipynb.
Converted 03-pipeline.ipynb.
