In [None]:
# File location and type
file_location = "/FileStore/tables/clean_tripdata.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

## Listing the top 10 bikes with most distance traveled

In [None]:
from pyspark.sql.functions import col, asc, desc

# Calculate the total distance for each bike ID and sort in descending order
df1 = df.groupBy("bike_id").sum("distance").withColumnRenamed("sum(distance)", "total_distance").sort(desc("total_distance"))

# Get the top 10 sum of distances
top_10_distances = df1.limit(10).select("total_distance").collect()

# Add a boolean column indicating if the bike ID has one of the top 10 sums of distance
df1 = df1.withColumn("is_top_10_distance", col("total_distance").isin([row.total_distance for row in top_10_distances]))

display(df1)

## Longest trip calculation

In [None]:
from pyspark.sql.functions import desc

top_10_distances = df.orderBy(desc("distance")).limit(10)
display(top_10_distances.select("bike_id", "distance"))

## Trip duration vs user type

In [None]:
# adding a new column containing the duration in minutes to make the results clearer
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import round
df_duration_minute = df.withColumn('duration_minutes', round(col('duration_seconds')/60).cast(IntegerType()))
display(df_duration_minute)

## Analysis of Usage Patterns

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

# Remove outliers
outliers = df.orderBy(desc("duration_seconds")).limit(4)
df_cluster = df.subtract(outliers)

# Preprocess for K-means
vec_assembler = VectorAssembler(inputCols=["duration_seconds"], outputCol="features")
df_cluster = vec_assembler.transform(df_cluster)
display(df_cluster)

In [None]:
kmeans = KMeans(featuresCol="features", k=3)  # initialize the algorithm
model = kmeans.fit(df_cluster)  # fit the algorithm to our data

# Assign cluster labels to the data
predictions = model.transform(df_cluster)

# Transform the outliers
outliers = vec_assembler.transform(outliers)
outlier_predictions = model.transform(outliers)

#Append outliers to predictions
predictions = predictions.union(outlier_predictions)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
display(predictions)

## Analysis of Different Pricing systems

First trying Linear Programming using PuLP then using a grid search-like method using numpy

In [None]:
!pip install pulp

In [None]:
cluster_counts = predictions.groupBy('prediction').count()
num_trips = [row['count'] for row in cluster_counts.collect()]
num_trips

from pulp import LpMaximize, LpProblem, LpStatus, lpSum, LpVariable

epsilon = 0.05

# Create the model
model = LpProblem(name = "revenue-maximization", sense = LpMaximize)

# Initialize the decision variables: the prices for the tree clusters

prices = [LpVariable(name = f"price_{i}", lowBound= 0.2, upBound=0.4) for i in range(3)]

# Add the constraints to maintain the price order
model += (prices[0] >= prices[1] + epsilon)
model += (prices[1] >= prices[2] + epsilon)

average_durations = [794.21, 60252.48, 15831.98]   #using cluster centers for average durations

# Add the objective function: total revenue = sum(price * average_durations * num_trips for each cluster)
model += lpSum([prices[i] * average_durations[i] * num_trips[i] for i in range(3)])

# Solve the problem
status = model.solve()

# Get the results
print(f"status: {model.status}, {LpStatus[model.status]}")
print(f"objective: {model.objective.value()}")

for var in prices:
    print(f"{var.name}: {var.value()}")

In [None]:
import numpy as np

# Assume these are your initial prices
prices = np.array([0.35, 0.30, 0.25])

# Assume these are the average durations (in minutes) for each cluster
average_durations = np.array([794.21, 60252.48, 15831.98])  # You should replace these with your actual data

# Assume these are the number of trips in each cluster
num_trips = np.array([1128, 6082, 512490])  # You should replace these with your actual data

# Compute the initial revenue
revenue = np.sum(prices * average_durations * num_trips)

# Define a range of price adjustments
adjustments = np.array([-0.05, 0, 0.05])

# Initialize the best revenue to the initial revenue
best_revenue = revenue
best_prices = prices

# Iterate over all combinations of price adjustments for the three clusters
for adj1 in adjustments:
    for adj2 in adjustments:
        for adj3 in adjustments:
            # Compute the adjusted prices
            adjusted_prices = prices + np.array([adj1, adj2, adj3])
            
            # Ensure the price order
            if adjusted_prices[0] >= adjusted_prices[1] >= adjusted_prices[2]:
                # Compute the revenue with the adjusted prices
                adjusted_revenue = np.sum(adjusted_prices * average_durations * num_trips)
                
                # If the adjusted revenue is better, update the best revenue and the best prices
                if adjusted_revenue > best_revenue:
                    best_revenue = adjusted_revenue
                    best_prices = adjusted_prices

print('Optimal prices:', best_prices)
print('Optimal revenue:', best_revenue)


## Identifying the most popular routes

In [None]:
from pyspark.sql.functions import concat_ws
df_routes = df.select("*")
df_routes = df_routes.withColumn("route_id", concat_ws(' --> ',df.start_station_id,df.end_station_id)) # this column contains the ids for the start and en stations combined
df_routes = df_routes.withColumn("route", concat_ws(' --> ',df.start_station_name,df.end_station_name)) # this column contains the namess for the start and en stations combined
display(df_routes)

In [None]:
# defining the top 10 routes
df_top_routes = df_routes.groupBy("route").count().orderBy(col("count").desc()).limit(10)
display(df_top_routes)

In [None]:
df_worst_routes = df_routes.groupBy("route").count().orderBy(col("count").asc()).limit(10)
display(df_worst_routes)

## Seasonal usage patterns

In [None]:
# adding a new column that contains the season
from pyspark.sql.functions import when, lit 
df_seasons = df.withColumn("season", \
      when((df.start_date >= "2017-03-01") & (df.start_date <= "2017-05-31"), lit("Spring")) \
     .when((df.start_date >= "2017-06-01") & (df.start_date <= "2017-08-31"), lit("Summer")) \
     .when((df.start_date >= "2017-09-01") & (df.start_date <= "2017-11-30"), lit("Fall (Autumn)")) \
     .otherwise(lit("Winter")) \
  )
display(df_seasons)

In [None]:
from pyspark.sql.functions import max, min
df.select(min('start_date'), max('start_date')).display()

In [None]:
df.filter((df.start_date >= "2017-03-01") & (df.start_date <= "2017-05-31")).display() #for some reason there is no result for spring season


In [None]:
# obtaining the number of trips per season
seasons_pd = df_seasons.groupBy("season").count()
display(seasons_pd)