## Inserting Trimmed Data into MongoDB

In [25]:
import json
from pymongo import MongoClient
import csv

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["Project"]

# Drop old collections if they exist
# db["business"].drop()


# === Trim and load only first 1000 rows of Bike data ===
with open("project_file.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f) 
    BikeData = [row for _, row in zip(range(1000), reader)]
db["Project"].insert_many(BikeData)
print(f"✅ Inserted {len(BikeData)} rows of data")


✅ Inserted 1000 rows of data


## Starting Spark Session

In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Bike Data Analysis").getOrCreate()

## Loading data from MongoDB via PyMongo

In [39]:
## Read documents, exclude '_id'
BikeData_docs = list(db["Project"].find({}, {"_id": 0}))

# Convert to Spark DataFrames
BikeData = spark.createDataFrame(BikeData_docs)

#BikeData.show(10 )

In [40]:
print(BikeData.columns)

['Covered distance (m)', 'Departure station id', 'Departure station name', 'Duration (sec.)', 'Return', 'Return station id', 'Return station name', '\ufeffDeparture']


In [41]:
from pyspark.sql.functions import avg
# Select only necessary fields

data_selected = BikeData.select(
    "Covered distance (m)",
    "Departure station name",
    "Return station name",
    "Duration (sec.)"
)
# Compute average
df_avg = data_selected.groupBy("business_id", "name", "categories") \
    .agg(avg("review_stars").alias("avg_rating")) \
    .orderBy("avg_rating", ascending=False)

df_avg.show(10, truncate=False)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Duration (sec`.`)` cannot be resolved. Did you mean one of the following? [`Duration (sec.)`, `Return`, `Return station id`, `Return station name`, `﻿Departure`].;
'Project [Covered distance (m)#839, Departure station name#841, Return station name#845, 'Duration (sec.)]
+- LogicalRDD [Covered distance (m)#839, Departure station id#840, Departure station name#841, Duration (sec.)#842, Return#843, Return station id#844, Return station name#845, ﻿Departure#846], false


In [42]:
from pyspark.sql.functions import avg

data_df = spark.createDataFrame(BikeData_docs)
# Valitaan olennaiset sarakkeet
data_selected = data_df.select(
    "Covered distance (m)",
    "Departure station name",
    "Return station name",
    "Duration (sec.)"
)

# Lasketaan keskiarvot lähtö- ja paluuaseman mukaan
df_avg = data_selected.groupBy(
    "Departure station name",
    "Return station name"
).agg(
    avg("Covered distance (m)").alias("avg_distance_m"),
    avg("Duration (sec.)").alias("avg_duration_sec")
).orderBy("avg_distance_m", ascending=False)

# Näytetään 10 pisimmän keskimatkan asemaparia
df_avg.show(10, truncate=False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Duration (sec`.`)` cannot be resolved. Did you mean one of the following? [`Duration (sec.)`, `Return`, `Return station id`, `Return station name`, `﻿Departure`].;
'Project [Covered distance (m)#855, Departure station name#857, Return station name#861, 'Duration (sec.)]
+- LogicalRDD [Covered distance (m)#855, Departure station id#856, Departure station name#857, Duration (sec.)#858, Return#859, Return station id#860, Return station name#861, ﻿Departure#862], false


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Returning the most visited departure stations
data_selected = BikeData.select(
    "Departure station name",
    "departure station id",
    
)
df_Max = data_selected.groupBy("departure station id","Departure station name")\
.agg(F.count("*").alias("counts")
)

window_spec = Window.partitionBy("departure station id").orderBy(F.col("visit_count").desc())

# Add a row number column to pick the station with the highest count
station_rank =df_Max .withColumn("rank", F.row_number().over(window_spec))
most_visited_stations = station_rank.filter(F.col("rank") == 1)

# Show the result
most_visited_stations.select("departure station id", "Departure station name", "visit_count").show()


## Visualization 

In [None]:
#Most visited  visited departure stations 
import matplotlib.pyplot as plt
import pandas as pd

top_df = df_avg.limit(10).toPandas()
plt.barh(top_df['Departure station name'], top_df['counts'])
plt.ylabel("Count of Max Visits")
plt.title("Top 10 Departure Stations by Maximun number of Visits")
plt.gca().invert_xaxis()
plt.show()
