In [1]:
import dagster
import sys
import pandas as pd
import pendulum
sys.path.append("/home/lmoraes/maestro")

In [2]:
import repositories.capturas.br_rj_riodejaneiro_onibus_gps.registros as registros
import repositories.capturas.solids as solids
import repositories.capturas.resources as resources
from repositories.helpers.helpers import read_config


In [3]:
config = read_config("/home/lmoraes/maestro/repositories/capturas/br_rj_riodejaneiro_onibus_gps/registros.yaml")
config["resources"]["timezone_config"]["config"]["timezone"] = "America/Sao_Paulo"

In [4]:
@dagster.solid(
    required_resource_keys={"basedosdados_config", "timezone_config"},
)
def pre_treatment(context, data):

    timezone = context.resources.timezone_config["timezone"]

    data = data.json()
    df = pd.DataFrame(data)
    timestamp_captura = pd.to_datetime(pendulum.now(timezone).isoformat())
    df["timestamp_captura"] = timestamp_captura
    # Remove timezone and force it to be config timezone
    df["datahora"] = df["datahora"].astype(float).apply(
        lambda ms: pd.to_datetime(pendulum.from_timestamp(ms / 1000.0).replace(tzinfo=None
        ).set(tz=timezone).isoformat())
    )

    return df

@dagster.pipeline(
    mode_defs=[
        dagster.ModeDefinition(
            "dev", resource_defs={"basedosdados_config": resources.basedosdados_config, 
                                  "timezone_config": resources.timezone_config,
                                  "discord_webhook": resources.discord_webhook}
        ),
    ]
)
def br_rj_riodejaneiro_onibus_gps_jupyter():

    file_path, partitions = solids.get_file_path_and_partitions()

    data = solids.get_raw()

    raw_file_path = solids.save_raw_local(data, file_path)

    treated_data = registros.pre_treatment(data)

    treated_file_path = solids.save_treated_local(treated_data, file_path)

In [5]:
result = dagster.execute_pipeline(br_rj_riodejaneiro_onibus_gps_jupyter, 
                                run_config = config)

2021-03-05 16:53:36 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Starting initialization of resources [basedosdados_config, io_manager, timezone_config].
2021-03-05 16:53:36 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Finished initialization of resources [basedosdados_config, io_manager, timezone_config].
2021-03-05 16:53:36 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - PIPELINE_START - Started execution of pipeline "br_rj_riodejaneiro_onibus_gps_jupyter".
2021-03-05 16:53:36 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Executing steps in process (pid: 16688)
2021-03-05 16:53:36 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - get_file_path_

In [6]:
data = result.output_for_solid('get_raw')
data

2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Starting initialization of resources [basedosdados_config, io_manager, timezone_config].
2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Finished initialization of resources [basedosdados_config, io_manager, timezone_config].


<Response [200]>

In [7]:
result.output_for_solid('save_raw_local')

2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Starting initialization of resources [basedosdados_config, io_manager, timezone_config].
2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Finished initialization of resources [basedosdados_config, io_manager, timezone_config].


'/home/lmoraes/maestro/notebooks/data/raw/br_rj_riodejaneiro_onibus_gps/registros/data=2021-03-05/hora=16/2021-03-05-16-53-36.json'

In [8]:
treated_data = result.output_for_solid('pre_treatment')
treated_data

2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Starting initialization of resources [basedosdados_config, io_manager, timezone_config].
2021-03-05 16:53:47 - dagster - DEBUG - br_rj_riodejaneiro_onibus_gps_jupyter - 30ea779c-25bf-48ff-90c5-ad2289a9a02f - 16688 - ENGINE_EVENT - Finished initialization of resources [basedosdados_config, io_manager, timezone_config].


Unnamed: 0,ordem,latitude,longitude,datahora,velocidade,linha,timestamp_captura
0,B10002,-2281085,-4319459,2021-03-05 16:50:52-03:00,41,323,2021-03-05 16:53:46.221625-03:00
1,B10004,-2284231,-432364,2021-03-05 16:51:39-03:00,22,327,2021-03-05 16:53:46.221625-03:00
2,B10006,-2280553,-4320086,2021-03-05 16:53:08-03:00,9,328,2021-03-05 16:53:46.221625-03:00
3,B10008,-2278897,-4316213,2021-03-05 16:53:08-03:00,0,328,2021-03-05 16:53:46.221625-03:00
4,B10011,-228141,-4322097,2021-03-05 16:53:28-03:00,59,901,2021-03-05 16:53:46.221625-03:00
...,...,...,...,...,...,...,...
4881,B71146,-2289952,-4328073,2021-03-05 16:53:11-03:00,0,455,2021-03-05 16:53:46.221625-03:00
4882,B71148,-2290461,-4327509,2021-03-05 16:53:02-03:00,11,455,2021-03-05 16:53:46.221625-03:00
4883,B71155,-2290401,-4327171,2021-03-05 16:53:18-03:00,36,247,2021-03-05 16:53:46.221625-03:00
4884,B71156,-2291356,-431786,2021-03-05 16:53:15-03:00,0,247,2021-03-05 16:53:46.221625-03:00
