## **ngrok을 사용하여 dagster를 Run !**

## **Step 1: 설치 및 설정**
> ### ngrok에서 인증키 입력


In [None]:
# Install necessary packages
!pip install dagster dagit pyngrok requests pandas

# Import ngrok and set up the authentication token
from pyngrok import ngrok

# Replace 'YOUR_NGROK_AUTH_TOKEN' with your actual ngrok auth token
ngrok.set_auth_token('???')


Collecting dagster
  Downloading dagster-1.7.6-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dagit
  Downloading dagit-1.7.6-py3-none-any.whl (6.2 kB)
Collecting pyngrok
  Downloading pyngrok-7.1.6-py3-none-any.whl (22 kB)
Collecting coloredlogs<=14.0,>=6.1 (from dagster)
  Downloading coloredlogs-14.0-py2.py3-none-any.whl (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.9/43.9 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
Collecting alembic!=1.11.0,!=1.6.3,!=1.7.0,>=1.2.1 (from dagster)
  Downloading alembic-1.13.1-py3-none-any.whl (233 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.4/233.4 kB[0m [31m22.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting croniter>=0.3.34 (from dagster)
  Downloading croniter-2.0.5-py2.py3-none-any.whl (20 kB)
Collecting grpcio-health-checking>=1.44.0 (from dagster)
  Downloading grpcio_hea

## **Step 2: Dagster 파이프라인 코드 작성**

In [None]:
# Create a new file to define your Dagster pipeline
pipeline_code = """
import json
import requests
import pandas as pd
from dagster import job, op, repository

@op
def load_topstory_ids():
    with open("topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)
    return topstory_ids

@op
def fetch_topstories(topstory_ids):
    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)
        if len(results) % 20 == 0:
            print(f"Got {len(results)} items so far.")
    return results

@op
def save_topstories(results):
    df = pd.DataFrame(results)
    df.to_csv("topstories.csv", index=False)

@job
def topstories_job():
    topstory_ids = load_topstory_ids()
    topstories = fetch_topstories(topstory_ids)
    save_topstories(topstories)

@repository
def my_repository():
    return [topstories_job]
"""

In [None]:
import json

with open("dagster_pipeline.py", "w") as f:
    f.write(pipeline_code)

# Create the topstory_ids.json file
topstory_ids = [8863, 2921983, 121003, 192327, 126809, 102351]  # Example IDs
with open("topstory_ids.json", "w") as f:
    json.dump(topstory_ids, f)


## **Step 3: ngrok 터널 설정 및 Dagit 실행**

In [None]:
import subprocess
from pyngrok import ngrok

# Run Dagit in the background
dagit_proc = subprocess.Popen(["dagit", "-f", "dagster_pipeline.py", "-h", "0.0.0.0", "-p", "3000"])

# Start ngrok tunnel
public_url = ngrok.connect(3000)
print(f"Dagit UI is available at {public_url}")

# Allow some time for the user to use the Dagit UI
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping ngrok and Dagit...")

# Stop Dagit and ngrok
dagit_proc.terminate()
ngrok.kill()


Dagit UI is available at NgrokTunnel: "https://4b75-34-66-63-190.ngrok-free.app" -> "http://localhost:3000"
Stopping ngrok and Dagit...


### **실습**

In [None]:
## json 파일
menu = {
    "breakfast": {
        "hours": "7-11",
        "items": {
            "breakfast burritos": "$6.00",
            "pancakes": "$4.00"
        }
    },
    "lunch": {
        "hours": "11-3",
        "items": {
            "hamburger": "$5.00"
        }
    },
    "dinner": {
        "hours": "3-10",
        "items": {
            "spaghetti": "$8.00"
        }
    }
}


# python dictionary is converted to JSON-formatted string
menu_json = json.dumps(menu)
# output the JSON string to the console
print(menu_json)


{"breakfast": {"hours": "7-11", "items": {"breakfast burritos": "$6.00", "pancakes": "$4.00"}}, "lunch": {"hours": "11-3", "items": {"hamburger": "$5.00"}}, "dinner": {"hours": "3-10", "items": {"spaghetti": "$8.00"}}}


In [None]:

# takes the JSON string menu_json and parses it back into a dictionary.
menu2 = json.loads(menu_json)
print(menu2)


{'breakfast': {'hours': '7-11', 'items': {'breakfast burritos': '$6.00', 'pancakes': '$4.00'}}, 'lunch': {'hours': '11-3', 'items': {'hamburger': '$5.00'}}, 'dinner': {'hours': '3-10', 'items': {'spaghetti': '$8.00'}}}


In [None]:
# JSON 파일에 쓰기
import os

os.makedirs('data', exist_ok=True)

with open('./data/topstory_ids.json', 'w') as json_file:
    json.dump(menu, json_file)

In [None]:
import json
import requests
import pandas as pd
from dagster import job, op

@op
def load_topstory_ids():
    with open("./data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)
    return topstory_ids

@op
def fetch_topstories(topstory_ids):
    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)
        if len(results) % 20 == 0:
            print(f"Got {len(results)} items so far.")
    return results

@op
def save_topstories(results):
    df = pd.DataFrame(results)
    df.to_csv("topstories.csv", index=False)

@job
def topstories_job():
    topstory_ids = load_topstory_ids()
    topstories = fetch_topstories(topstory_ids)
    save_topstories(topstories)

# 실행
if __name__ == "__main__":
    result = topstories_job.execute_in_process()


2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - RUN_START - Started execution of run for "topstories_job".
2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - ENGINE_EVENT - Executing steps in process (pid: 348)
2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - LOGS_CAPTURED - Started capturing logs in process (pid: 348).
2024-05-19 02:01:06 +0000 - dagster - DEBUG - topstories_job - 92b9e0ba-8eaa-49ed-adfe-fd6457424952 - 348 - load_topstory_ids - STEP_STA

In [None]:
%%writefile dagster_pipeline.py

import json
import requests
import pandas as pd
from dagster import job, op

@op
def load_topstory_ids():
    with open("./data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)
    return topstory_ids

@op
def fetch_topstories(topstory_ids):
    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)
        if len(results) % 20 == 0:
            print(f"Got {len(results)} items so far.")
    return results

@op
def save_topstories(results):
    df = pd.DataFrame(results)
    df.to_csv("topstories.csv", index=False)

@job
def topstories_job():
    topstory_ids = load_topstory_ids()
    topstories = fetch_topstories(topstory_ids)
    save_topstories(topstories)

# 실행
if __name__ == "__main__":
    result = topstories_job.execute_in_process()


Writing dagster_pipeline.py


In [None]:
# Run Dagit
!dagit -f dagster_pipeline.py -h 0.0.0.0 -p 3000

[32m2024-05-19 02:07:21 +0000[0m - dagit - [34mINFO[0m - Using temporary directory /content/tmpdl85pc2c for storage. This will be removed when dagster-webserver exits.
[32m2024-05-19 02:07:21 +0000[0m - dagit - [34mINFO[0m - To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
[32m2024-05-19 02:07:23 +0000[0m - dagster.code_server - [34mINFO[0m - Starting Dagster code server for file dagster_pipeline.py in process 10545
[32m2024-05-19 02:07:24 +0000[0m - dagster.code_server - [34mINFO[0m - Started Dagster code server for file dagster_pipeline.py in process 10545

  [34m[1mTelemetry:[0m

  As an open-source project, we collect usage statistics to inform development priorities. For more
  information, read https://docs.dagster.io/getting-started/telemetry.

  We will not see or store any data that is processed by your code.

  To opt-out, add the following to $DAGSTER_HOME/dagster.yaml, creating that file if necessary