In [None]:
!pip install dash pandas psycopg2-binary

In [None]:
import pandas as pd
import numpy as np
import psycopg2 as pg
import plotly.express as px

from datetime import date
from dash import Dash, dcc, html, Input, Output, dash_table

In [None]:
engine = pg.connect("dbname='clever' user='clever' host='postgres_clever' port='5432' password='clever'")

### Data load

#### 🚨🚨 The dashboard is based on data processed by the **Spark** based tasks in Airflow. 🚨🚨
Please run this DAG before executing the dashboard.

In [None]:
company_profiles_google_df = pd.read_sql("""SELECT 
	profiles.name,
	profiles.site,
	profiles.category,
	profiles.full_address,
	profiles.city,
	profiles.state,
	profiles.state_abbr,
	reviews.reviews,
	reviews.review_rating,
	reviews.review_text,
    reviews.review_datetime_utc,
	profiles.latitude,
	profiles.longitude
FROM public.company_profiles_google_maps AS profiles
LEFT JOIN public.customer_reviews_google AS reviews
ON profiles.name = reviews.name
WHERE business_status = 'OPERATIONAL';""",
con=engine)

In [None]:
fmcsa_df = pd.read_sql("""SELECT 
	companies.company_name,
	companies.city,
	companies.state,
	companies.state_abbr,
	complaints.complaint_category,
	complaints.complaint_year,
	complaints.complaint_count
FROM public.fmcsa_companies AS companies
LEFT JOIN public.fmcsa_complaints AS complaints
ON companies.usdot_num = complaints.usdot_num;""",
con=engine)

fmcsa_df['complaint_year'] = pd.to_datetime(fmcsa_df['complaint_year'], format='%Y')

In [None]:
cities_col = ['city', 'state']
cities_df = pd.concat([company_profiles_google_df[cities_col], fmcsa_df[cities_col]])\
    .drop_duplicates(['city', 'state'])\
    .sort_values(['city', 'state'])

____

### Dashboard definition

#### Dashboard style

In [None]:
cities_dropdown = [{"label": f"{c['city']} - {c['state']}", "value": f"{c['city']} - {c['state']}"} for c in cities_df.to_dict(orient='index').values()]

In [None]:
widgets_max_height = 500

external_stylesheets = [
    'https://fonts.googleapis.com/css2?family=Mohave:ital,wght@0,300..700;1,300..700&family=Rubik:ital,wght@0,300..900;1,300..900&display=swap',
]

card_style = {
    "borderRadius": "15px",
    "overflow": "hidden",
    "boxShadow": "0px 4px 12px rgba(0, 0, 0, 0.15)",
    "transition": "transform 0.2s ease-in-out", 
    "flex": '1',
    "padding": "2rem"

}

grid_style = {
    "display": "flex",
    "flexWrap": "wrap",
    "justifyContent": "space-between",
    "gap": "20px",
    "width": "100%"
}

reviews_samples_style = {
    "background-color": "aliceblue",
    "border-radius": "10px",
    "padding": "30px",
    "margin-top": "20px",
    "font-size": "small",
    "max-height": "200px",
    "overflow": "auto",
}

card = lambda title, element: html.Div(
    [
        html.H3(title, style={'text-align': 'center', "color":"rgb(28, 82, 110)"}), 
        element,
    ], 
    style=card_style
)

app = Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    [
        html.Div(
            [
                html.Img(src="https://homebay.com/wp-content/uploads/2023/04/CleverOrgLogo-e1682902307979.png", style={"height": "5rem"}),
                html.H3("Company analysis dashboard", style={"text-align":"center", "color":"rgb(28, 82, 110)"}),
            ],
            style={
                "justifyContent": "center",
                "alignItems": "center",
                "textAlign": "center"
            }
        ),
        html.Div([
            dcc.Dropdown(id="selected_city",
                options=cities_dropdown,
                multi=False,
                value="Miami - Florida",
                style={"width": "100%", "flex": '1'}
            ),
            dcc.DatePickerRange(
                id="date_filter",
                display_format='DD-MM-YYYY',
                start_date=date(2020, 1, 1),
                end_date=date(2025, 1, 1),
                style={"width": "100%", "flex": '1'}
            )
        ], style=grid_style
        ),
        html.Br(),
        html.Div(
            [
                card(
                    "Company reviews sorted by the number of reviews and average rating", 
                    dash_table.DataTable(id="google_profiles_df", page_size=20, style_table={"overflow": "auto"})
                ),
                card(
                    "Companies' profiles on Google", 
                    html.Div([dcc.Graph(id="google_profiles_map"), dcc.Markdown(id="google_profiles_count"), html.Div(id="google_profiles_main_cat")])
                ),
                card(
                    "Performance history of ratings by company", 
                    html.Div([
                        dcc.Graph(id='google_profiles_line_chart'), 
                        dcc.Dropdown(id="select_profile_history",style={"width": "100%"}), 
                        dcc.Markdown(id="google_profiles_reviews_ex", style=reviews_samples_style)
                    ])
                ),
            ], 
            style=grid_style
        ),
        html.Br(),
        html.Div(
            [
                card("FMCSA Data for the specified period", dash_table.DataTable(id="fmcsa_table", page_size=15, style_table={"overflow": "auto"})),
                card("Total FMCSA Complaints by Company for the specified period", dcc.Graph(id="fmcsa_pie_chart"))
            ],
            style=grid_style
        )
    ],
    style={"fontFamily": '"Rubik", sans-serif', "color": "rgb(28, 82, 110)"}
)


#### Callbacks

In [None]:
@app.callback(
    [Output(component_id='google_profiles_map', component_property='figure')],
    [Output(component_id='google_profiles_count', component_property='children')],
    [Output(component_id='google_profiles_main_cat', component_property='children')],
    [Input(component_id='selected_city', component_property='value')],
)
def update_google_profiles_map(location):
    """Update the Google profiles map widget."""
    city, state = location.split(" - ")
    
    dff = company_profiles_google_df.drop_duplicates(["name"]).copy()
    dff = dff[(dff["city"] == city) & (dff["state"] == state)]

    top_cat = dff\
        .groupby("category")\
        .agg({"name":"count"})\
        .reset_index()
    top_cat.columns = ["category", "total"]
    top_cat = list(top_cat.sort_values("total", ascending=False).to_dict(orient="index").values())

    main_colors = ["#b2182b", "#ef8a62", "#c3987f", "#3f82a7", "#67a9cf", "#2166ac"]

    color_map = {
        **{c["category"]: main_colors[i] for i, c in enumerate(top_cat[:5])},
        **{c["category"]: "#77ad6a" for c in top_cat[5:]}
    }

    fig = px.scatter_mapbox(
        dff,
        lat="latitude",
        lon="longitude",
        color="category",
        color_discrete_map=color_map,
        hover_name="name", 
        hover_data=["category", "full_address", "site"],
        zoom=10,
        height=widgets_max_height
    )

    fig.update_layout(
        mapbox_style="carto-positron",
        mapbox_center={"lat": dff["latitude"].mean(), "lon": dff["longitude"].mean()},
        showlegend=False,
        autosize=True,
        margin=dict(l=0, r=0, t=0, b=0)
    )
    
    legend = []
    for category in top_cat[:5]:
        legend.append(html.Span(f"{category['category']}: {category['total']}", style={"color": color_map[category["category"]], "font-weight": "bold"}))
        legend.append(html.Br())

    return fig, f'Total companies: \n ## {len(dff)}', legend

@app.callback(
    [Output(component_id='google_profiles_df', component_property='data')],
    [Output(component_id='select_profile_history', component_property='options')],
    [Input(component_id='selected_city', component_property='value')],
    [Input(component_id='date_filter', component_property='start_date')],
    [Input(component_id='date_filter', component_property='end_date')],
)
def update_google_profiles_ratings(location, start_date, end_date):
    """Update the Google profiles dataframe widget."""
    city, state = location.split(" - ")
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    dff = company_profiles_google_df.copy()

    date_filter = (dff["review_datetime_utc"] >= start_date) & (dff["review_datetime_utc"] <= end_date)
    location_filter = (dff["city"] == city) & (dff["state"] == state)

    # We will keep the rows when the date filter is not met, but we will not count the calculated values.
    dff["reviews"] = np.where(date_filter, dff["reviews"], np.nan)
    dff["review_rating"] = np.where(date_filter, dff["review_rating"], np.nan)

    dff = dff[location_filter]\
        .groupby(["name"])\
        .agg({
            "reviews": "count",
            "review_rating": ["mean", "max", "min"],
        })
    
    dff.columns = ['_'.join(col).strip() for col in dff.columns.values]
    dff = dff.reset_index().sort_values(by=["reviews_count", "review_rating_mean", "name"], ascending=[False, False, True])
    dff_records = dff.to_dict(orient='records')

    line_chart_opts = [{"label": c["name"], "value":c["name"]} for c in dff_records]

    return dff_records, line_chart_opts

@app.callback(
    [Output(component_id='google_profiles_line_chart', component_property='figure')],
    [Output(component_id='google_profiles_reviews_ex', component_property='children')],
    [Input(component_id='selected_city', component_property='value')],
    [Input(component_id='date_filter', component_property='start_date')],
    [Input(component_id='date_filter', component_property='end_date')],
    [Input(component_id='select_profile_history', component_property='value')],
)
def update_google_profiles_line_chart(location, start_date, end_date, company):
    """Update the Google ratings performance chart."""
    city, state = location.split(" - ")
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    dff = company_profiles_google_df.copy()

    date_filter = (dff["review_datetime_utc"] >= start_date) & (dff["review_datetime_utc"] <= end_date)
    location_filter = (dff["city"] == city) & (dff["state"] == state)
    company_filter = (dff["name"] == company)
    
    dff = dff[date_filter & location_filter & company_filter]
    dff['year_month'] = dff['review_datetime_utc'].dt.to_period('M').astype(str)

    try:
        reviews_samples = dff["review_text"].dropna().sample(n=min(10, len(dff))).tolist()
    except ValueError:
        reviews_samples = []

    reviews_md =  "#### Sample reviews  \n" + "  \n  \n".join(f"*{r}*" for r in reviews_samples)

    annual_avg = dff.groupby("year_month").agg({"review_rating": "mean", "review_text": list}).reset_index()

    fig = px.line(
        annual_avg, 
        x="year_month", 
        y="review_rating", 
        labels={"year_month": "Year", "review_ratng": "Average rating"}
    )
    
    fig.update_layout(xaxis_title="Year", yaxis_title="Average rating")

    return (fig, reviews_md)

@app.callback(
    [Output(component_id='fmcsa_table', component_property='data')],
    [Output(component_id='fmcsa_pie_chart', component_property='figure')],
    [Input(component_id='selected_city', component_property='value')],
    [Input(component_id='date_filter', component_property='start_date')],
    [Input(component_id='date_filter', component_property='end_date')],
)
def update_fmcsa_complaints(location, start_date, end_date):
    """Update the fmcsa dataframe and pie chart widgets."""
    city, state = location.split(" - ")
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
 
    dff = fmcsa_df.copy()

    date_filter = (dff["complaint_year"] >= start_date) & (dff["complaint_year"] <= end_date)
    location_filter = (dff["city"] == city) & (dff["state"] == state)

    dff = dff[location_filter & date_filter].sort_values(["company_name", "complaint_year", "complaint_count"], ascending=[True, False, False])
    dff_records = dff.to_dict(orient='records')

    # Pie chart
    dff = dff.groupby(["company_name"]).agg({"complaint_count": "sum"}).reset_index()
    fig = px.pie(dff, values="complaint_count", names="company_name", color_discrete_sequence=px.colors.sequential.Teal)
    fig.update_traces(hoverinfo="label+percent", textinfo="value")

    return dff_records, fig

#### Dashboard Execution

In [None]:
# Any change in the code requires a kernel restart to update the dashboard
app.run(debug=True, port=8050, host="0.0.0.0", jupyter_mode="external")