https://dashboard.ngrok.com/get-started/your-authtoken

In [None]:
!pip install streamlit pyngrok


Collecting streamlit
  Downloading streamlit-1.45.1-py3-none-any.whl.metadata (8.9 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.8-py3-none-any.whl.metadata (10 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.45.1-py3-none-any.whl (9.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.9/9.9 MB[0m [31m36.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyngrok-7.2.8-py3-none-any.whl (25 kB)
Downloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m48.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl (79 

In [None]:
import os

# Tạo cấu trúc thư mục
os.makedirs("streamlit_app/tabs", exist_ok=True)

# Tạo __init__.py để Python nhận biết đây là module
with open("streamlit_app/tabs/__init__.py", "w") as f:
    pass


## schema_tab.py

In [None]:
%%writefile streamlit_app/tabs/schema_tab.py
import streamlit as st
from pyspark.sql import functions as F

def show_schema(df):
    # Reading data
    st.subheader("📋 Dataset")
    st.dataframe(df.limit(100).toPandas())

    # Data Schema
    st.subheader("🧬 Data Schema")
    st.code(df._jdf.schema().treeString())

    # count the number of rows and columns in the dataset
    st.subheader("📊 Data Dimensions")
    st.write(f"Rows: {df.count()}")
    st.write(f"Columns: {len(df.columns)}")

    # Check duplicate
    st.subheader("🔍 Check duplicate")
    distinct_rows = df.distinct().count()
    duplicates = df.count() - distinct_rows

    if duplicates > 0:
        st.warning(f"🚨 Number of duplicate rows: `{duplicates}`")
    else:
        st.success("✅ No duplicate rows found.")

    # Define missing value
    st.subheader("🔍 Check missing values")
    # Tính số lượng missing (null hoặc NaN) cho từng cột
    null_counts = df.select([
        F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c)
        for c in df.columns
    ])

    # Chuyển sang Pandas để hiển thị trong Streamlit
    null_df = null_counts.toPandas().T
    null_df.columns = ['Missing Count']
    null_df = null_df[null_df['Missing Count'] > 0]

    if null_df.empty:
        st.success("✅ No missing values found.")
    else:
        st.dataframe(null_df)

    # Data Reduction
    st.subheader("🔍 Data Reduction")
    if 'order_id' in df.columns:
        df = df.drop("order_id")
        st.success("'order_id' column removed.")
    else:
        st.info("'order_id' column not found in dataset.")

    # Data Cleaning/Wrangling
    st.subheader("🔍 Data Cleaning/Wrangling")
    before_drop = df.count()
    df = df.dropna(subset=['transaction_type'])
    after_drop = df.count()

    st.write(f"Rows before drop: `{before_drop}`")
    st.write(f"Rows after drop: `{after_drop}`")
    st.success(f"✅ Dropped `{before_drop - after_drop}` rows with null in 'transaction_type'.")

    # Show data after drop NAN
    st.subheader("📋 Dataset after drop Null")
    st.dataframe(df.limit(100).toPandas())

    # Statistics Summary
    st.subheader("📊 Statistics Summary")
    # Chuyển kết quả từ PySpark describe() sang Pandas
    stats_summary = df.describe().toPandas()
    st.dataframe(stats_summary)

    # Cuối hàm show_schema
    st.session_state.df = df  # 👈 Lưu df1 vào session_state
    return df



Overwriting streamlit_app/tabs/schema_tab.py


## univariate_analysis.py

In [None]:
# EDA Univariate Analysis
%%writefile streamlit_app/tabs/univariate_analysis.py
import streamlit as st
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from pyspark.sql import functions as F

def calculate_percentage(df, column_name):
    total_count = df.count()
    percentage_df = (
        df.groupBy(column_name)
          .agg(F.count("*").alias("count"))
          .withColumn("percentage", F.round((F.col("count") / total_count) * 100, 2))
          .orderBy(F.col("count").desc())
    )
    return percentage_df

def univariate_analysis(df):
    # st.subheader("📊 Univariate Analysis")

    if 'df' not in st.session_state:
        st.error("❌ Data is not available. Please run the 'Schema' tab first.")
        return

    df = st.session_state.df  # 👈 Lấy df từ session_state

    st.subheader("📋 Dataset")
    st.dataframe(df.limit(5).toPandas())
    st.write(f"Rows: {df.count()}")
    st.write(f"Columns: {len(df.columns)}")

    numeric_cols = ["item_price", "quantity", "transaction_amount"]
    categorical_cols = ["item_name", "item_type", "transaction_type", "received_by", "time_of_sale"]

    # Convert small sample from Spark to Pandas
    pandas_df = df.sample(withReplacement=False, fraction=0.1, seed=1).toPandas()

    # ----- 1. Numeric Columns -----
    st.markdown("### 🔢 Numeric Columns")
    for col in numeric_cols:
        st.markdown(f"#### ➤ `{col}`")
        skewness = round(pandas_df[col].skew(), 2)
        st.write(f"Skewness: `{skewness}`")

        fig, axes = plt.subplots(1, 2, figsize=(12, 4))

        # Histogram
        sns.histplot(pandas_df[col].dropna(), kde=True, bins=30, ax=axes[0])
        axes[0].set_title(f"Histogram of {col}")
        axes[0].grid(True)

        # Boxplot
        sns.boxplot(x=pandas_df[col].dropna(), color='orange', ax=axes[1])
        axes[1].set_title(f"Boxplot of {col}")
        axes[1].grid(True)

        plt.tight_layout()
        st.pyplot(fig)

    # ----- 2. Categorical Columns -----
    st.markdown("### 🏷️ Categorical Columns")

    for col in categorical_cols:
        st.markdown(f"#### ➤ `{col}`")

        # Tính tỷ lệ phần trăm bằng Spark
        pct_df = calculate_percentage(df, col)
        pandas_pct = pct_df.toPandas()

        # Hiển thị bảng
        st.dataframe(pandas_pct)

        # Pie chart
        labels = pandas_pct[col].tolist()
        sizes = pandas_pct["percentage"].tolist()
        colors = plt.cm.Pastel1(np.linspace(0, 1, len(sizes)))

        fig, ax = plt.subplots(figsize=(5, 5))
        wedges, texts, autotexts = ax.pie(
            sizes,
            labels=labels,
            autopct='%1.1f%%',
            startangle=90,
            colors=colors,
            textprops={'fontsize': 9}
        )
        ax.axis('equal')
        ax.set_title(f"Distribution of {col}")

        st.pyplot(fig)

    return df


Overwriting streamlit_app/tabs/univariate_analysis.py


## data_transformation.py

In [None]:
# Data Transformation
%%writefile streamlit_app/tabs/data_transformation.py
import streamlit as st
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from functools import reduce
import seaborn as sns
import matplotlib.pyplot as plt

def data_transformation(df):
    if 'df' not in st.session_state:
        st.error("❌ Data is not available. Please run the 'Schema' tab first.")
        return

    df = st.session_state.df  # 👈 Lấy df từ session_state

    st.subheader("📋 Dataset")
    st.dataframe(df.limit(5).toPandas())
    st.write(f"Rows: {df.count()}")
    st.write(f"Columns: {len(df.columns)}")

    # st.header("📐 Data Transformation")

    numeric_cols = ["item_price", "quantity", "transaction_amount"]
    outlier_info = {}

    # Tính toán Q1, Q3, IQR và Outliers
    for col_name in numeric_cols:
        quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
        if len(quantiles) < 2:
            continue
        Q1, Q3 = quantiles
        IQR = Q3 - Q1

        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        outliers = df.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound))
        outlier_count = outliers.count()

        outlier_info[col_name] = {
            "Q1": Q1, "Q3": Q3, "IQR": IQR,
            "Lower Bound": lower_bound,
            "Upper Bound": upper_bound,
            "Outlier Count": outlier_count,
            "Outlier Data": outliers
        }

    # Hiển thị thông tin outlier
    for col_name, stats in outlier_info.items():
        st.subheader(f"📊 Outlier Info - {col_name}")
        st.markdown(f"""
        - **Q1**: {stats['Q1']}
        - **Q3**: {stats['Q3']}
        - **IQR**: {stats['IQR']}
        - **Lower Bound**: {stats['Lower Bound']}
        - **Upper Bound**: {stats['Upper Bound']}
        - **Outlier Count**: {stats['Outlier Count']}
        """)
        st.write(f"🔍 Outliers for column `{col_name}`")
        st.dataframe(stats["Outlier Data"].toPandas(), use_container_width=True)

    # Drop outliers
    filter_conditions = []
    for col_name in numeric_cols:
        stats = outlier_info[col_name]
        lower = stats["Lower Bound"]
        upper = stats["Upper Bound"]
        condition = (col(col_name) >= lower) & (col(col_name) <= upper)
        filter_conditions.append(condition)

    combined_condition = reduce(lambda x, y: x & y, filter_conditions)
    df = df.filter(combined_condition)

    st.markdown("### 📋 Data After Removing Outliers")
    st.dataframe(df.limit(100).toPandas())
    st.success(f"Original row count: {df.count()}")
    st.success(f"Row count after removing outliers: {df.count()}")

    # Lấy mẫu và chuyển sang Pandas
    pandas_df = df.sample(False, 0.1, seed=1).toPandas()

    # Plot histogram + boxplot
    for col_name in numeric_cols:
        st.markdown(f"### 📈 Distribution for `{col_name}`")
        st.write("Skewness:", round(pandas_df[col_name].skew(), 2))
        fig, axes = plt.subplots(1, 2, figsize=(12, 4))

        sns.histplot(pandas_df[col_name].dropna(), kde=True, ax=axes[0])
        axes[0].set_title(f"Histogram of {col_name}")

        sns.boxplot(x=pandas_df[col_name].dropna(), color='orange', ax=axes[1])
        axes[1].set_title(f"Boxplot of {col_name}")

        st.pyplot(fig)

    st.session_state.df = df  # 👈 Lưu df1 vào session_state

    st.markdown("### 📋 Data Summary")
    st.dataframe(df.describe().toPandas())

    return df


Overwriting streamlit_app/tabs/data_transformation.py


## bivariate_analysis.py

In [None]:
%%writefile streamlit_app/tabs/bivariate_analysis.py
import streamlit as st
import seaborn as sns
import matplotlib.pyplot as plt

def bivariate_analysis(df):
    # Tạo tab EDA Bivariate Analysis
    if 'df' not in st.session_state:
        st.error("❌ Data is not available. Please run the 'Schema' tab first.")
        return

    df = st.session_state.df  # 👈 Lấy df từ session_state

    st.subheader("📋 Dataset")
    st.dataframe(df.limit(5).toPandas())
    st.write(f"Rows: {df.count()}")
    st.write(f"Columns: {len(df.columns)}")

    # st.header("📊 EDA Bivariate Analysis")

    # Chọn các cột kiểu số và chuyển từ PySpark DataFrame sang Pandas
    numeric_columns = ['quantity', 'item_price', 'transaction_amount']
    pandas_df = df.select(numeric_columns).toPandas()

    # Vẽ biểu đồ Pairplot để khám phá mối quan hệ giữa các biến
    st.subheader("Pairplot of Numerical Columns")
    pairplot = sns.pairplot(pandas_df)
    st.pyplot(pairplot.figure)  # ✅ Pass figure explicitly to avoid deprecation warning

    # Tính trung bình 'transaction_amount' theo từng nhóm
    item_type_avg = df.groupBy('item_type').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).toPandas()
    transaction_type_avg = df.groupBy('transaction_type').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).toPandas()
    item_name_avg = df.groupBy('item_name').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).limit(10).toPandas()
    received_by_avg = df.groupBy('received_by').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).toPandas()
    time_of_sale_avg = df.groupBy('time_of_sale').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).limit(10).toPandas()
    quantity_avg = df.groupBy('quantity').avg('transaction_amount').orderBy('avg(transaction_amount)', ascending=False).toPandas()

    # Vẽ biểu đồ
    st.subheader("Bar Plots: Average Transaction Amount by Categories")
    fig, axarr = plt.subplots(3, 2, figsize=(16, 20))

    item_type_avg.plot.bar(x='item_type', y='avg(transaction_amount)', ax=axarr[0][0], fontsize=12)
    axarr[0][0].set_title("Item Type Vs Transaction Amount", fontsize=18)

    transaction_type_avg.plot.bar(x='transaction_type', y='avg(transaction_amount)', ax=axarr[0][1], fontsize=12)
    axarr[0][1].set_title("Transaction Type Vs Transaction Amount", fontsize=18)

    item_name_avg.plot.bar(x='item_name', y='avg(transaction_amount)', ax=axarr[1][0], fontsize=12)
    axarr[1][0].set_title("Item Name Vs Transaction Amount", fontsize=18)

    received_by_avg.plot.bar(x='received_by', y='avg(transaction_amount)', ax=axarr[1][1], fontsize=12)
    axarr[1][1].set_title("Received By Vs Transaction Amount", fontsize=18)

    time_of_sale_avg.plot.bar(x='time_of_sale', y='avg(transaction_amount)', ax=axarr[2][0], fontsize=12)
    axarr[2][0].set_title("Time of Sale Vs Transaction Amount", fontsize=18)

    quantity_avg.plot.bar(x='quantity', y='avg(transaction_amount)', ax=axarr[2][1], fontsize=12)
    axarr[2][1].set_title("Quantity Vs Transaction Amount", fontsize=18)

    # Tùy chỉnh layout
    plt.subplots_adjust(hspace=1.0, wspace=.5)
    sns.despine()

    # Hiển thị biểu đồ
    st.pyplot(fig)

    # Phân tích đa biến - Ma trận tương quan
    st.subheader("Correlation Matrix")
    pandas_df = df.toPandas()
    numeric_df = pandas_df.select_dtypes(include=['number'])
    corr_matrix = numeric_df.corr()

    fig_corr, ax_corr = plt.subplots(figsize=(12, 10))
    sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', ax=ax_corr)
    ax_corr.set_title("Correlation Matrix")
    st.pyplot(fig_corr)


Overwriting streamlit_app/tabs/bivariate_analysis.py


## run_model.py

In [None]:
%%writefile streamlit_app/tabs/run_model.py
import streamlit as st
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import (
    LinearRegression,
    DecisionTreeRegressor,
    RandomForestRegressor,
    GBTRegressor
)
from pyspark.sql.functions import col

def run_model(df):
    # st.header("🤖 Run Regression Models")
    st.subheader("Model Evaluation Results")

    # Loại bỏ các dòng có null ở các cột cần thiết
    required_cols = ["item_name", "item_type", "transaction_type", "time_of_sale", "item_price", "quantity", "transaction_amount"]
    df = df.dropna(subset=required_cols)

    # Các cột dạng chuỗi cần mã hóa
    categorical_cols = ["item_name", "item_type", "transaction_type", "time_of_sale"]
    indexers = [
        StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep")
        for col in categorical_cols
    ]

    # Cột đặc trưng đầu vào cho mô hình
    feature_cols = [col + "_indexed" for col in categorical_cols] + ["item_price", "quantity"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

    # Chia tập train/test
    train_data, test_data = df.randomSplit([0.8, 0.2], seed=124)

    # Các mô hình cần đánh giá
    models = {
        "Linear Regression": LinearRegression(featuresCol="features", labelCol="transaction_amount"),
        "Decision Tree": DecisionTreeRegressor(featuresCol="features", labelCol="transaction_amount"),
        "Random Forest": RandomForestRegressor(featuresCol="features", labelCol="transaction_amount", numTrees=10),
        "Gradient-Boosted Tree": GBTRegressor(featuresCol="features", labelCol="transaction_amount", maxIter=10)
    }

    # Hàm đánh giá mô hình
    def evaluate_model(name, model, train_data, test_data):
        pipeline = Pipeline(stages=indexers + [assembler, model])
        fitted_model = pipeline.fit(train_data)
        predictions = fitted_model.transform(test_data)

        metrics = ["r2", "rmse", "mse", "mae"]
        results = {}

        for metric in metrics:
            evaluator = RegressionEvaluator(
                labelCol="transaction_amount",
                predictionCol="prediction",
                metricName=metric
            )
            results[metric.upper()] = evaluator.evaluate(predictions)

        return results

    # Hiển thị bảng kết quả
    results_table = []

    for name, model in models.items():
        # st.text(f"Running {name}...")
        results = evaluate_model(name, model, train_data, test_data)
        results_table.append({
            "Model": name,
            **results
        })

    st.dataframe(results_table)


Overwriting streamlit_app/tabs/run_model.py


## Spark SQL

## streamlit_app/app.py

In [None]:
%%writefile streamlit_app/app.py
import streamlit as st
from pyspark.sql import SparkSession
from tabs.schema_tab import show_schema
from tabs.univariate_analysis import univariate_analysis
from tabs.bivariate_analysis import bivariate_analysis
from tabs.data_transformation import data_transformation
from tabs.run_model import run_model

# Khởi tạo Spark Session
spark = SparkSession.builder.appName("Streamlit + Spark").getOrCreate()

# Tiêu đề cho ứng dụng
st.title("📊 Big Data Dashboard with PySpark")

# Tải lên tệp CSV
uploaded_file = st.file_uploader("📁 Upload CSV", type="csv")

# Kiểm tra nếu người dùng tải lên tệp
if uploaded_file:
    with open("streamlit_app/temp.csv", "wb") as f:
        f.write(uploaded_file.getbuffer())

    # Đọc dữ liệu CSV vào DataFrame PySpark
    df = spark.read.csv("streamlit_app/temp.csv", header=True, inferSchema=True)

    # Tạo các tab cho ứng dụng Streamlit
    tab1, tab2, tab3, tab4, tab5 = st.tabs([
        "📑 Read data",
        "📊 Univariate Analysis",
        "🔄 Data Transformation",
        "📊 EDA Bivariate Analysis",
        "🤖 Run Models"
    ])

    # Tab Schema
    with tab1:
        show_schema(df)

    # Tab Univariate Analysis
    with tab2:
        df = univariate_analysis(df)

    # Tab Data Transformation
    with tab3:
        cleaned_df = data_transformation(df)  # 👉 Trả về DataFrame đã dọn sạch

    # Tab EDA Bivariate Analysis
    with tab4:
        # Nếu tab trước chưa chạy, fallback về gốc
        try:
            bivariate_analysis(cleaned_df)
        except NameError:
            st.warning("⚠️ Please run 'Data Transformation' tab first.")
            bivariate_analysis(df)

    # Tab Run Model
    with tab5:
        run_model(cleaned_df)  # 👉 Sử dụng DataFrame đã dọn sạch

else:
    st.info("⬆️ Upload a CSV to get started.")


Overwriting streamlit_app/app.py


In [None]:
!ngrok config add-authtoken 2wkyIx5eiRJjyCorlDMKDYBipTC_3eQR8H83AJEt2KsRBqHJb


Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
import threading
import time

def run():
    !streamlit run streamlit_app/app.py

# Chạy Streamlit trong thread song song
threading.Thread(target=run).start()

# Chờ vài giây để Streamlit khởi động
time.sleep(10)



Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8503[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8503[0m
[34m  External URL: [0m[1mhttp://34.23.22.199:8503[0m
[0m


In [None]:
from pyngrok import ngrok

# Dừng session cũ nếu có
ngrok.kill()

# Mở cổng
public_url = ngrok.connect(8503, "http")
print("🌐 Your Streamlit app is live at:", public_url)


🌐 Your Streamlit app is live at: NgrokTunnel: "https://9f46-34-23-22-199.ngrok-free.app" -> "http://localhost:8503"


In [None]:
# %%writefile streamlit_app/app.py
# import streamlit as st
# from pyspark.sql import SparkSession
# from tabs.schema_tab import show_schema
# from tabs.duplicate_tab import check_duplicates
# from tabs.missing_tab import check_missing

# spark = SparkSession.builder.appName("Streamlit + Spark").getOrCreate()

# st.title("📊 Big Data Dashboard with PySpark")

# uploaded_file = st.file_uploader("📁 Upload CSV", type="csv")

# if uploaded_file:
#     with open("streamlit_app/temp.csv", "wb") as f:
#         f.write(uploaded_file.getbuffer())

#     df = spark.read.csv("streamlit_app/temp.csv", header=True, inferSchema=True)

#     tab1, tab2, tab3 = st.tabs(["📑 Schema", "🔁 Duplicates", "🕳️ Missing Values"])

#     with tab1:
#         show_schema(df)
#     with tab2:
#         check_duplicates(df)
#     with tab3:
#         check_missing(df)
# else:
#     st.info("⬆️ Upload a CSV to get started.")


Writing streamlit_app/app.py
