
# Sri Lanka Used Bike Market — Big Data Pipeline (PySpark + MongoDB + Analytics + Streamlit Exports)



This notebook processes and analyzes a Sri Lankan used-bike dataset, stores the cleaned data in **MongoDB**, and generates **analytics + exports** ready for a **Streamlit** dashboard.

**What you get:**
- PySpark cleaning & feature engineering
- Save cleaned data to **Parquet** and **MongoDB**
- Analytics: price depreciation, brand/city insights, bike type demand, top models
- Forecast (simple trend) for future prices
- Exports (CSV + PNG) for Streamlit UI


## 0) Environment Setup

In [5]:

# If needed (Colab/new env):
# !pip install pyspark==3.5.1 pyarrow==15.0.2

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

spark = (
    SparkSession.builder
    .appName("UsedBikes-SriLanka-Clean-Store")
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")
    .getOrCreate()
)

spark


## 1) Load Raw CSV

In [6]:

raw_path = "used-bikes.csv"   # adjust path as needed
df = spark.read.csv(raw_path, header=True, inferSchema=True)
df.printSchema()
df.show(5, truncate=False)


root
 |-- Summary: string (nullable = true)
 |-- url: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Post_Details: string (nullable = true)
 |-- Bike Type: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Trim/Edition: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- Seller: string (nullable = true)
 |-- Capacity: string (nullable = true)
 |-- Price: string (nullable = true)

+------------------------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------+------------------------------------------------+------------+-----------+--------------+------------+----+---------+-------------------+--------+----------+
|Summary                                                                       |url                                                               

## 2) Cleaning & Feature Engineering

In [7]:

def digits_only(col):
    """Extract digits only (removes Rs, km, cc, commas, spaces, etc.)"""
    return F.regexp_replace(F.col(col).cast("string"), r"[^0-9]", "")

# Clean numerics + Year
df_clean = (
    df
    .withColumn("Price_Clean_str", F.trim(digits_only("Price")))
    .withColumn("Price_Clean", F.when(F.length("Price_Clean_str") > 0, F.col("Price_Clean_str").cast(IntegerType())).otherwise(F.lit(None)))
    .drop("Price_Clean_str")
    .withColumn("Mileage_Clean_str", F.trim(digits_only("Mileage")))
    .withColumn("Mileage_Clean", F.when(F.length("Mileage_Clean_str") > 0, F.col("Mileage_Clean_str").cast(IntegerType())).otherwise(F.lit(None)))
    .drop("Mileage_Clean_str")
    .withColumn("Capacity_Clean_str", F.trim(digits_only("Capacity")))
    .withColumn("Capacity_Clean", F.when(F.length("Capacity_Clean_str") > 0, F.col("Capacity_Clean_str").cast(IntegerType())).otherwise(F.lit(None)))
    .drop("Capacity_Clean_str")
    .withColumn("Year", F.col("Year").cast(IntegerType()))
)

# BikeAge and City
df_feat = df_clean.withColumn("BikeAge", F.lit(2025) - F.col("Year"))
df_feat = df_feat.withColumn(
    "City",
    F.when(F.col("Post_Details").isNotNull(), F.split(F.col("Post_Details").cast("string"), ",").getItem(0)).otherwise(F.lit(None))
)
df_feat = df_feat.withColumn("City", F.trim(F.col("City")))

df_feat.select("Brand","Model","Year","BikeAge","City","Price_Clean","Mileage_Clean","Capacity_Clean").show(10, truncate=False)


+-----------+------------+----+-------+-------------------------+-----------+-------------+--------------+
|Brand      |Model       |Year|BikeAge|City                     |Price_Clean|Mileage_Clean|Capacity_Clean|
+-----------+------------+----+-------+-------------------------+-----------+-------------+--------------+
|Other brand|Other model |2014|11     |Posted on 30 Jan 9:02 am |440000     |35000        |150           |
|Bajaj      |Pulsar 150  |2014|11     |Posted on 11 Mar 1:58 pm |370000     |50737        |150           |
|Other brand|Other model |2005|20     |Posted on 21 Jan 9:45 pm |210000     |5000         |125           |
|Electra    |Alpha       |2019|6      |Posted on 22 Feb 2:26 pm |105000     |600          |49            |
|Hero       |Maestro Edge|2018|7      |Posted on 07 Mar 2:08 pm |80000      |18500        |110           |
|Honda      |CBR         |2020|5      |Posted on 10 Feb 10:00 am|889000     |12700        |650           |
|Honda      |CBR         |2020|5     

## 3) Optional: Trim Extreme Price Outliers

In [8]:

# Compute approx 1st and 99th percentiles
bounds = df_feat.approxQuantile("Price_Clean", [0.01, 0.99], 0.01)
low, high = bounds if len(bounds) == 2 else (None, None)

if low is not None and high is not None:
    df_fair = df_feat.where(
        (F.col("Price_Clean").isNotNull()) &
        (F.col("Year").isNotNull()) &
        (F.col("Price_Clean").between(low, high))
    )
else:
    df_fair = df_feat.where((F.col("Price_Clean").isNotNull()) & (F.col("Year").isNotNull()))

print("Rows after fair-trim:", df_fair.count())


Rows after fair-trim: 5016


## 4) Save Cleaned Dataset to Parquet

In [9]:

parquet_out = "parquet/used_bikes_cleaned"
(
    df_fair
    .coalesce(1)  # demo-friendly single file (remove on real clusters)
    .write.mode("overwrite").parquet(parquet_out)
)
print("Saved to:", parquet_out)


Saved to: parquet/used_bikes_cleaned


## 5) Save to MongoDB (Integration)

In [1]:
#!pip install pymongo==4.2.0
!pip install "pymongo[srv]" --upgrade



In [10]:
from pymongo import MongoClient

mongo_uri = "mongodb+srv://hiran_db_user:qU4wT6aS7D10VLlF@cluster0.hliggrv.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

client = MongoClient(mongo_uri)
print(client.list_database_names())


['admin', 'local']


In [12]:
mongo_uri = "mongodb+srv://hiran_db_user:qU4wT6aS7D10VLlF@cluster0.hliggrv.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

db_name   = "BikeDb"
coll_name = "used_bikes_cleaned"

(df_fair
 .write
 .format("mongodb")
 .mode("overwrite")
 .option("spark.mongodb.connection.uri", mongo_uri)   # ✅ Use this key
 .option("spark.mongodb.database", db_name)
 .option("spark.mongodb.collection", coll_name)
 .save()
)

print(f"✅ Saved to MongoDB Atlas → {db_name}.{coll_name}")


✅ Saved to MongoDB Atlas → BikeDb.used_bikes_cleaned


In [15]:
# Read-back verification (sanity check)
df_mongo = (
    spark.read
    .format("mongodb")
    .option("spark.mongodb.connection.uri", mongo_uri)
    .option("spark.mongodb.database", db_name)
    .option("spark.mongodb.collection", coll_name)
    .load()
)

df_mongo.show(5, truncate=False)

+------------+-------+-----------+--------+--------------+------------------------+---------+-------------+------------+------------------------------------------------+----------+-----------+-------------------+------------------------------------------------------------------------------+----------------------------+--------------+----+------------------------+---------------------------------------------------------------------------+
|Bike Type   |BikeAge|Brand      |Capacity|Capacity_Clean|City                    |Mileage  |Mileage_Clean|Model       |Post_Details                                    |Price     |Price_Clean|Seller             |Summary                                                                       |Title                       |Trim/Edition  |Year|_id                     |url                                                                        |
+------------+-------+-----------+--------+--------------+------------------------+---------+-------------+---------

## 6) Analytics (Spark SQL) — Depreciation, Brands, Cities, Demand

In [16]:
df_fair.createOrReplaceTempView("bikes")

avg_price_by_year = spark.sql("""
    SELECT Year, AVG(Price_Clean) AS avg_price
    FROM bikes
    GROUP BY Year
    ORDER BY Year
""")

avg_price_by_brand = spark.sql("""
    WITH topbrands AS (
        SELECT Brand
        FROM bikes
        GROUP BY Brand
        ORDER BY COUNT(*) DESC
        LIMIT 10
    )
    SELECT b.Brand, AVG(b.Price_Clean) AS avg_price
    FROM bikes b
    JOIN topbrands t ON b.Brand = t.Brand
    GROUP BY b.Brand
    ORDER BY avg_price DESC
""")

bike_type_counts = spark.sql("""
    SELECT `Bike Type` AS BikeType, COUNT(*) AS n
    FROM bikes
    GROUP BY `Bike Type`
    ORDER BY n DESC
""")

city_counts = spark.sql("""
    SELECT City, AVG(Price_Clean) AS avg_price, COUNT(*) AS n
    FROM bikes
    WHERE City IS NOT NULL
    GROUP BY City
    HAVING n >= 20
    ORDER BY avg_price DESC
    LIMIT 15
""")

avg_price_by_year.show(10)
avg_price_by_brand.show(truncate=False)
bike_type_counts.show(truncate=False)
city_counts.show(truncate=False)


+----+------------------+
|Year|         avg_price|
+----+------------------+
|1923|           40000.0|
|1930|          150000.0|
|1966|          245000.0|
|1974|          290000.0|
|1975|          550000.0|
|1978|          162500.0|
|1979| 551666.6666666666|
|1980|246818.36363636365|
|1981|          245000.0|
|1982|          513750.0|
+----+------------------+
only showing top 10 rows

+--------+------------------+
|Brand   |avg_price         |
+--------+------------------+
|KTM     |840833.3333333334 |
|Yamaha  |434962.6109510086 |
|Suzuki  |433794.1176470588 |
|Honda   |342632.4549918167 |
|TVS     |285508.13787375414|
|Bajaj   |262651.60820189276|
|Hero    |236859.243697479  |
|Demak   |210042.37288135593|
|Mahindra|202592.59259259258|
|Ranomoto|146107.14285714287|
+--------+------------------+

+------------+----+
|BikeType    |n   |
+------------+----+
|Motorbikes  |3605|
|Scooters    |1404|
|Quadricycles|4   |
|E-bikes     |3   |
+------------+----+

+-------------------------+-

## 7) Future Price Prediction (Simple Trend)

In [18]:
import pandas as pd
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

train_df = avg_price_by_year.withColumn("label", F.col("avg_price")).withColumn("YearNum", F.col("Year").cast(DoubleType()))
assembler = VectorAssembler(inputCols=["YearNum"], outputCol="features")
train_vec = assembler.transform(train_df).select("features", "label")

lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_vec)

# Forecast next 3 years
last_year = train_df.agg(F.max("Year")).first()[0]
future_pdf = pd.DataFrame({"Year": [last_year + 1, last_year + 2, last_year + 3]})
future_sdf = spark.createDataFrame(future_pdf).withColumn("YearNum", F.col("Year").cast(DoubleType()))
future_vec = assembler.transform(future_sdf).select("Year", "YearNum", "features")
pred = lr_model.transform(future_vec)
pred.select("Year", F.col("prediction").alias("predicted_avg_price")).orderBy("Year").show()


+----+-------------------+
|Year|predicted_avg_price|
+----+-------------------+
|2023| 331931.99753676914|
|2024|  334323.5546865612|
|2025|  336715.1118363524|
+----+-------------------+



## 8) Export Analytics (CSV + PNG) for Streamlit

In [19]:

import os
import matplotlib.pyplot as plt
import numpy as np

exports_dir = "exports"
os.makedirs(exports_dir, exist_ok=True)

apy = avg_price_by_year.toPandas().sort_values("Year")
apb = avg_price_by_brand.toPandas()
btc = bike_type_counts.toPandas()
cty = city_counts.toPandas()

apy.to_csv(f"{exports_dir}/avg_price_by_year.csv", index=False)
apb.to_csv(f"{exports_dir}/avg_price_by_brand.csv", index=False)
btc.to_csv(f"{exports_dir}/bike_type_counts.csv", index=False)
cty.to_csv(f"{exports_dir}/avg_price_by_city.csv", index=False)

if not apy.empty:
    newest = apy["Year"].max()
    base = float(apy.loc[apy["Year"]==newest, "avg_price"].iloc[0])
    apy["depreciation_pct_vs_newest"] = (1.0 - apy["avg_price"] / base) * 100.0 if base>0 else np.nan
    apy.to_csv(f"{exports_dir}/depreciation_percent.csv", index=False)

plt.figure(figsize=(8,5))
plt.plot(apy["Year"], apy["avg_price"], marker="o")
plt.title("Average Used Bike Price by Year (Sri Lanka)")
plt.xlabel("Year of Manufacture"); plt.ylabel("Average Price (LKR)"); plt.grid(True); plt.tight_layout()
plt.savefig(f"{exports_dir}/avg_price_by_year.png", dpi=160); plt.close()

plt.figure(figsize=(8,5))
plt.bar(apb["Brand"], apb["avg_price"])
plt.title("Average Used Bike Price by Top Brands")
plt.xlabel("Brand"); plt.ylabel("Average Price (LKR)"); plt.xticks(rotation=45, ha="right"); plt.tight_layout()
plt.savefig(f"{exports_dir}/avg_price_by_brand.png", dpi=160); plt.close()

plt.figure(figsize=(8,5))
plt.bar(btc["BikeType"], btc["n"])
plt.title("Bike Types by Demand (Listing Counts)")
plt.xlabel("Bike Type"); plt.ylabel("Count"); plt.xticks(rotation=45, ha="right"); plt.tight_layout()
plt.savefig(f"{exports_dir}/bike_types_demand.png", dpi=160); plt.close()

plt.figure(figsize=(8,5))
plt.bar(cty["City"], cty["avg_price"])
plt.title("Top Cities by Average Price")
plt.xlabel("City"); plt.ylabel("Average Price (LKR)"); plt.xticks(rotation=45, ha="right"); plt.tight_layout()
plt.savefig(f"{exports_dir}/avg_price_by_city.png", dpi=160); plt.close()

print("Exports written to:", exports_dir)


Exports written to: exports


## 9) Streamlit — Minimal App Template (loads exports)

In [20]:

app_code = r'''
import streamlit as st
import pandas as pd

st.set_page_config(page_title="Sri Lanka Used Bike Market", layout="wide")
st.title("Sri Lanka Used Bike Market — Analytics Dashboard")

avg_year = pd.read_csv("exports/avg_price_by_year.csv")
avg_brand = pd.read_csv("exports/avg_price_by_brand.csv")
bike_types = pd.read_csv("exports/bike_type_counts.csv")
avg_city = pd.read_csv("exports/avg_price_by_city.csv")

st.subheader("Average Price by Year (Depreciation)")
st.line_chart(avg_year.set_index("Year")["avg_price"])

st.subheader("Average Price by Top Brands")
st.bar_chart(avg_brand.set_index("Brand")["avg_price"])

st.subheader("Bike Types by Demand (Listing Counts)")
st.bar_chart(bike_types.set_index("BikeType")["n"])

st.subheader("Top Cities by Average Price")
st.bar_chart(avg_city.set_index("City")["avg_price"])

st.caption("Data source: used-bikes.csv (Sri Lanka market)")
'''
with open("app.py", "w", encoding="utf-8") as f:
    f.write(app_code)

print("Wrote Streamlit app → app.py")
print("Run locally:  streamlit run app.py")


Wrote Streamlit app → app.py
Run locally:  streamlit run app.py


## 10) Notes on Optimization


- Use `.coalesce(1)` only for demos; remove to let Spark write many partitions for scalability.
- Push heavy filters/joins early to reduce DataFrame size.
- Cache reused DataFrames: `df_fair.cache()` if you access it many times.
- On a cluster, tune partitions based on data size and cluster cores.
- Prefer Parquet column pruning when selecting few columns.
