In [1]:
import time

data_folder = 'app/data1/'
csv_path = data_folder + "big_data.csv"  # путь к жирному файлу

parquet_path = csv_path.replace('.csv', '.parquet')
print(f"Последний запуск {time.ctime()}")


Последний запуск Fri May 30 01:20:52 2025


In [2]:
import sys
!{sys.executable} -m pip install pyarrow duckdb polars plotly jupyter_dash dash --upgrade
# зависимости
print("Проверка зависимостей")

import duckdb
import polars as pl
import plotly
import jupyter_dash

print("DuckDB OK:", duckdb.__version__)
print("Polars OK:", pl.__version__)
print("Plotly OK:", plotly.__version__)
print("Jupiter_dash OK:", jupyter_dash.__version__)


Проверка зависимостей
DuckDB OK: 1.3.0
Polars OK: 1.30.0
Plotly OK: 6.1.2
Jupiter_dash OK: 0.4.2


In [3]:
import duckdb
import polars as pl


def read_parquet_with_duckdb(data_path: str, no_metro=False, no_overground=False):
    where_clause = ""
    if no_overground:
        where_clause += " WHERE TRANSPORT_TYPE_ID = 1 "
    if no_metro:
        where_clause += " WHERE TRANSPORT_TYPE_ID > 1 "

    query = f"""
        SELECT *
        FROM (
            SELECT
                TRAN_DATE,
                DEVICE_NO,
                TRANSPORT_TYPE_ID,
                PLACE_ID,
                BUS_RT_NO,
                ROW_NUMBER() OVER () AS row_num
            FROM '{data_path}'
            {where_clause}
        ) AS sub
    """

    result = duckdb.query(query).pl().with_columns([
        pl.col("TRAN_DATE").dt.date().alias("TRAN_ONLY_DATE"),
        pl.col("TRAN_DATE").dt.time().alias("TRAN_ONLY_TIME")
    ]).with_columns(
        pl.when(
            (pl.col("TRANSPORT_TYPE_ID") == 1) & (pl.col("BUS_RT_NO").is_null())
        )
        .then(-239)
        .otherwise(pl.col("BUS_RT_NO"))
        .alias("BUS_RT_NO")).with_columns(
        pl.col("TRAN_DATE").dt.strftime("%A").alias("DAY_NAME")
    )
    return result


big_data = read_parquet_with_duckdb(parquet_path)
big_data.sample()

TRAN_DATE,DEVICE_NO,TRANSPORT_TYPE_ID,PLACE_ID,BUS_RT_NO,row_num,TRAN_ONLY_DATE,TRAN_ONLY_TIME,DAY_NAME
datetime[μs],i64,i64,i64,i64,i64,date,time,str
2025-03-12 13:04:36,43726,1,1524,-239,8298333,2025-03-12,13:04:36,"""Wednesday"""


In [4]:
def show(fig):
    fig.update_layout(
        coloraxis_showscale=False,
        updatemenus=[{
            "type": "buttons",
            "showactive": False,
            "buttons": [{
                "label": "Play",
                "method": "animate",
                "args": [None, {
                    "frame": {"duration": 2000, "redraw": True},  # 1000 мс = 1 секунда на кадр
                    "fromcurrent": True,
                    "transition": {"duration": 500, "easing": "linear"}  # плавность
                }]
            }, {
                "label": "Pause",
                "method": "animate",
                "args": [[None], {
                    "frame": {"duration": 0, "redraw": False},
                    "mode": "immediate",
                    "transition": {"duration": 0}
                }]
            }]
        }],
        sliders=[{
            "transition": {"duration": 500},
            "currentvalue": {"pref"
                             "ix": "День: "},
            "pad": {"t": 30},
            "len": 0.9
        }],
        height=500,
        width=1200,
    )

    fig.show()

In [5]:
def filter_by_time(data, start_dt, end_dt, start_tm, end_tm, continuous):
    if continuous:
        filtered = data.filter(
            (pl.col("TRAN_DATE") >= datetime.strptime(start_dt + " " + start_tm, "%Y-%m-%d %H:%M")) &
            (pl.col("TRAN_DATE") <= datetime.strptime(end_dt + " " + end_tm, "%Y-%m-%d %H:%M"))
        )
    else:
        filtered = data.filter(
            (pl.col("TRAN_ONLY_DATE") >= datetime.strptime(start_dt, "%Y-%m-%d").date()) &
            (pl.col("TRAN_ONLY_DATE") <= datetime.strptime(end_dt, "%Y-%m-%d").date()) &
            (pl.col("TRAN_ONLY_TIME") >= datetime.strptime(start_tm, "%H:%M").time()) &
            (pl.col("TRAN_ONLY_TIME") <= datetime.strptime(end_tm, "%H:%M").time())
        )
    return filtered

In [6]:
overground = read_parquet_with_duckdb(parquet_path, no_metro=True)
underground = read_parquet_with_duckdb(parquet_path, no_overground=True)


In [7]:
from datetime import datetime


def load_top_stops_overground(top=20, start_dt="2025-03-10", end_dt="2025-03-16", start_tm="00:00",
                              end_tm="23:59", continuous=False):
    place_lookup = pl.read_csv(data_folder+"REF_PSG_PLACES_202503251822.csv", separator=';')
    # Фильтрация по времени
    try:
        filtered = filter_by_time(overground, start_dt, end_dt, start_tm, end_tm, continuous)
    except:
        return "Wrong data format"
    # Топ остановок
    top_stops = (
        filtered.join(
            place_lookup.select(["PLACE_ID", "NAME", "LN_NAME_SHORT"]),
            left_on="PLACE_ID",
            right_on="PLACE_ID",
            how="left"
        )
        .group_by("NAME")
        .agg(pl.len().alias("count"))
        .sort("count", descending=[True])
        .drop_nulls()
        .with_columns(
            pl.col("count").rank("dense", descending=True).alias("rank")
        )
        .filter(pl.col("rank") <= top))

    return top_stops


load_top_stops_overground()


NAME,count,rank
str,u32,u32
"""Южный филиал (1 площадка-Домод…",438787,1
"""Юго-Западный филиал (1 площадк…",325018,2
"""Северо-западный филиал (1 площ…",314371,3
"""Центральный филиал (1 площадка…",168994,4
"""Северо-Восточный филиал (1 пло…",164820,5
…,…,…
"""Стартранс (Троицк) 6102""",63216,16
"""Центральный филиал (4 площадка…",61382,17
"""Трансавтолиз (Батюнинский)""",60946,18
"""Северо-Восточный филиал (4 пло…",59425,19


In [8]:
def load_top_stops_underground(top=20, start_dt="2025-03-10", end_dt="2025-03-16", start_tm="00:00",
                               end_tm="23:59", continuous=False):
    try:
        place_lookup = pl.read_csv(data_folder + "REF_PSG_PLACES_202503251822.csv", separator=';')
    except FileNotFoundError:
        return "No file to look up places: REF_PSG_PLACES_202503251822.csv"

    try:
        filtered = filter_by_time(underground, start_dt, end_dt, start_tm, end_tm, continuous)
    except:
        return "Wrong data format"
    top_stops = (
        filtered.join(
            place_lookup.select(["PLACE_ID", "NAME", "LN_NAME_SHORT"]),
            left_on="PLACE_ID",
            right_on="PLACE_ID",
            how="left"
        )
        .group_by("PLACE_ID", "NAME", "LN_NAME_SHORT")
        .agg(pl.len().alias("count"))
        .sort("count", descending=[True])
        .drop_nulls()
        .with_columns(
            pl.col("count").rank("dense", descending=True).alias("rank")
        )
        .filter(pl.col("rank") <= top))

    return top_stops


load_top_stops_underground()

PLACE_ID,NAME,LN_NAME_SHORT,count,rank
i64,str,str,u32,u32
1327,"""Щёлковская ( Северный )""","""Арбатско-Покровская""",39993,1
1459,"""Павелецкая КЛ""","""Кольцевая""",38218,2
1294,"""Бауманская""","""Арбатско-Покровская""",38205,3
1322,"""Славянский бульвар (Восток)""","""Арбатско-Покровская""",36779,4
1691,"""Текстильщики (Западный)""","""Таганско-Краснопресненская""",36366,5
…,…,…,…,…
1453,"""Комсомольская К (подземный зал…","""Кольцевая""",25648,16
1561,"""Бульвар Дмитрия Донского (Южны…","""Серпуховско-Тимирязевская""",25401,17
38954,"""Некрасовка (Первый)""","""Некрасовская""",25017,18
1461,"""Парк культуры КЛ""","""Кольцевая""",24982,19


In [9]:
def load_top_routes_overground(top=20, start_dt="2025-03-10", end_dt="2025-03-16", start_tm="00:00",
                               end_tm="23:59", continuous=False, popularity=False):
    route_lookup = pl.read_csv(data_folder + "REF_TRANSPORT_WAY_202503251803.csv", separator=';')
    transport_type_lookup = pl.read_csv(data_folder + "REF_TRANSPORT_TYPE_202503251727.csv", separator=';')

    filtered = filter_by_time(overground, start_dt, end_dt, start_tm, end_tm, continuous)
    top_routes = (
        filtered.join(
            route_lookup.select(["WAY_ID", "NAME", "TRANSPORT_ID"]),
            left_on="BUS_RT_NO",
            right_on="WAY_ID",
            how="left").filter(pl.col("BUS_RT_NO").is_not_null())
        .group_by([
            "BUS_RT_NO", "NAME", "TRANSPORT_ID",
        ])
        .agg([pl.len().alias("count"),
              pl.col("DEVICE_NO").n_unique().alias("vehicle_count")
              ])
        .drop_nulls()
        .with_columns([
            (pl.col("count") / pl.col("vehicle_count")).alias("rides_per_vehicle")
        ])
        .with_columns([
            pl.col("count").rank("dense", descending=True).alias("popularity"),
            pl.col("rides_per_vehicle").rank("dense", descending=True).alias("overload")]
        ).join(
            transport_type_lookup, left_on="TRANSPORT_ID", right_on="TRANSPORT_ID", how="left"
        )
    )
    if popularity:
        return top_routes.filter(pl.col("popularity") <= top)
    else:
        return top_routes.filter(pl.col("overload") <= top)


load_top_routes_overground()



BUS_RT_NO,NAME,TRANSPORT_ID,count,vehicle_count,rides_per_vehicle,popularity,overload,NAME_right
i64,str,i64,u32,u32,f64,u32,u32,str
2083,"""ТыСбТ01 автобус""",2,8018,90,89.088889,57,12,"""Автобус"""
605,"""608 автобус""",2,15583,189,82.449735,14,20,"""Автобус"""
90,"""92 автобус""",2,10108,121,83.53719,31,18,"""Автобус"""
1944,"""МАН01 автобус""",2,11746,108,108.759259,27,2,"""Автобус"""
707,"""710 автобус""",2,4544,51,89.098039,121,11,"""Автобус"""
…,…,…,…,…,…,…,…,…
937,"""878 автобус пригород МО""",2,15646,156,100.294872,13,7,"""Автобус"""
1947,"""СВА01 автобус""",2,5088,61,83.409836,108,19,"""Автобус"""
247,"""249 автобус""",2,7238,68,106.441176,71,3,"""Автобус"""
1407,"""320 коммерческий автобус""",2,14665,170,86.264706,18,14,"""Автобус"""


In [10]:
def plot(data: pl.DataFrame, x: str, y: str, title="", color=None, do_show=False):
    df_pandas = data.to_pandas()
    if color == None:
        fig = px.bar(
            df_pandas,
            x=x,
            y=y,
            title=title,
        )
    else:
        fig = px.bar(
            df_pandas,
            x=x,
            y=y,
            title=title,
            color=color,
        )

    fig.update_layout(
        xaxis_title="Остановка",
        yaxis_title="Количество поездок",
        xaxis_categoryorder="total descending",
        xaxis_tickangle=-45,
        margin=dict(t=50, b=100),
        coloraxis_showscale=False
    )
    if do_show:
        show(fig)

    return fig

In [11]:
def output_size():
    return html.Div([
        dcc.Input(id='top-number', type='text', value='20'),
        html.Span("Размер вывода", className=""),
    ], className="field border max", style={'padding': 10})



In [12]:

def date_picker(min_date, max_date):
    return html.Div([
        html.Div([
            dcc.DatePickerRange(
                id='date-picker',
                min_date_allowed=min_date,
                max_date_allowed=max_date,
                start_date=min_date,
                end_date=max_date,
                className="center transparent-datepicker",
                style={'background-color': 'transparent'},
            ), ]),
        html.Span(
            "Дата от/до",
            className="",
            style={
                'display': 'block',
                'textAlign': 'center',
                'margin': '0 auto'
            }),
    ], className="column", style={'padding': 10})


In [13]:

def time_picker():
    return html.Div([
        html.Div([
            html.Div([
                html.Div([
                    dcc.Input(id='start-time', type='text', value='00:00', className=""),
                    html.Span("Начало", className=""),
                ], className="field border max")
            ], className="max"),
            html.Div([
                html.Div([
                    dcc.Input(id='end-time', type='text', value='23:59', className=""),
                    html.Span("Конец", className=""),
                ], className="field border max")
            ], className="max"),
        ], className="row")
    ], style={'padding': 10})

In [14]:
def time_segment_type():
    html.Div([
        html.Label("Тип временного отрезка"),
        dcc.RadioItems(

            labelStyle={'display': 'inline-block', 'marginLeft': '15px'},
            className='horizontal',
        ),
    ], style={'marginBottom': 10})

    return html.Div([
        html.Div([
            dcc.Dropdown(
                id='time-type',
                options=[
                    {'label': 'Непрерывно', 'value': 'h0'},
                    {'label': 'Кусочно', 'value': 'h1'},
                ],
                value='h0',
                className="beer",
                style={'backgroundColor': 'transparent'},
            ),
            html.Span("Выбор отсчёта времени", className="",
                      style={'display': 'inline-block', 'marginTop': '-15px'}),
        ], className="field border")
    ], className="max", style={'padding-right': 10})

In [15]:
def graph_type():
    return html.Div([
        html.Div([
            dcc.Dropdown(
                id='data-source',
                options=[
                    {'label': 'Маршруты популярность', 'value': 't0'},
                    {'label': 'Маршруты перегруженность', 'value': 't1'},
                    {'label': 'Остановки', 'value': 't2'},
                    {'label': 'Метро популярность', 'value': 't3'},
                ],
                value='t0',  # Начальное значение
                className="beer",
                style={'backgroundColor': 'transparent'},

            ),
            html.Span("Выбор графика", className="", style={'display': 'inline-block', 'marginTop': '-15px'}),
        ], className="field border")
    ], className="max", style={'padding-left': 10})

In [16]:
def submit_button():
    return html.Div([html.Button("Построить график", id='submit-button', n_clicks=0),
                     ], className="")

In [17]:
def layout(df):
    return html.Div([
        html.Div(className="l2"),
        html.Div([
            html.Br(),
            html.H3(
                "Инфографика транспорта Москвы",
                className="center",
                style={"color": "orange", "textAlign": "center"},
            ),

            html.Fieldset([
                date_picker(df["TRAN_ONLY_DATE"].min(), df["TRAN_ONLY_DATE"].max()),
                time_picker(),
                html.Br(),

                html.Div([
                    graph_type(),
                    time_segment_type(), ], className="row"),
                html.Br(),
                html.Div([
                    output_size(),
                    submit_button(), ], className="row"),
                html.Br(),

            ]),
            dcc.Graph(id='graph', style={'width': '100%'}), ]
            , className="l8"),
        html.Div(className="l2"),

    ], className="grid")

In [18]:
from dash import Dash, dcc, html, Input, Output, State
import plotly.express as px
import pandas as pd


def application(df):
    app = Dash(__name__)
    app.layout = layout(df)

    @app.callback(
        Output('graph', 'figure'),
        Input('submit-button', 'n_clicks'),
        State('date-picker', 'start_date'),
        State('date-picker', 'end_date'),
        State('start-time', 'value'),
        State('end-time', 'value'),
        State("data-source", "value"),
        State('top-number', 'value'),
        State('time-type', 'value'),
    )
    def update_graph(n_clicks, start_date, end_date, start_time_str, end_time_str, data_source, top_number, time_type):
        if not n_clicks or data_source == 't-1':
            return px.scatter(title="Выберите параметры и нажмите кнопку")

        continuous = True
        if time_type == 'h1':
            continuous = False

        try:
            top_number = int(top_number)
            start_dt = pd.to_datetime(f"{start_date} {start_time_str}")
            end_dt = pd.to_datetime(f"{end_date} {end_time_str}")
        except Exception as e:
            return px.scatter(title=f"Ошибка в формате даты/времени: {e}")

        if data_source == 't0':
            data = load_top_routes_overground(top_number, start_date, end_date, start_time_str, end_time_str,
                                              continuous,
                                              True)
            fig = plot(data, "NAME", "count", color="NAME_right")
        elif data_source == 't1':
            data = load_top_routes_overground(top_number, start_date, end_date, start_time_str, end_time_str,
                                              continuous,
                                              False)
            fig = plot(data, "NAME", "rides_per_vehicle", color="NAME_right")
        elif data_source == 't2':
            data = load_top_stops_overground(top_number, start_date, end_date, start_time_str, end_time_str, continuous)
            print("dadddddddddd")
            fig = plot(data, "NAME", "count", )
        elif data_source == 't3':
            data = load_top_stops_underground(top_number, start_date, end_date, start_time_str, end_time_str,
                                              continuous)
            fig = plot(data, "NAME", "count", color="LN_NAME_SHORT")

        return fig

    app.run(mode='external', port=8239)



application(big_data)