In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "Football Analysis")

In [None]:
data = sc.textFile("/content/Used_Bikes.csv")


header = data.first()
rows = data.filter(lambda x: x != header)

# Question 1: Which city has the highest average bike price?

In [14]:
city_price_rdd = (
rows
.map(lambda row: (row.split(',')[2], float(row.split(',')[1])) if row.split(',')[1] else None)
.filter(lambda x: x is not None)
)



city_stats_rdd = city_price_rdd \
.mapValues(lambda price: (price, 1)) \
.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))



city_avg_rdd = city_stats_rdd.mapValues(lambda x: x[0] / x[1])



highest_avg_city = city_avg_rdd.takeOrdered(1, key=lambda x: -x[1])

print("City with highest average bike price:", highest_avg_city)

City with highest average bike price: [('Chandrapur', 1250000.0)]


# Question 2: Which brand has the lowest average bike price?

In [15]:
brand_price_rdd = (
    rows
    .map(lambda row: row.split(','))
    .filter(lambda parts: len(parts) > 7 and parts[7] and parts[1]) # Ensure brand and price columns exist and are not empty
    .map(lambda parts: (parts[7], float(parts[1])))
)


brand_stats_rdd = brand_price_rdd \
    .mapValues(lambda price: (price, 1)) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))


brand_avg_rdd = brand_stats_rdd.mapValues(lambda x: x[0] / x[1])



lowest_avg_brand = brand_avg_rdd.takeOrdered(1, key=lambda x: x[1])



print("Brand with lowest average bike price:", lowest_avg_brand)

Brand with lowest average bike price: [('LML', 4400.0)]


# Question 3: Which ownership type (First, Second, etc.) is most common?

In [16]:
ownership_rdd = rows.map(lambda row: (row.split(',')[4], 1))


ownership_counts = ownership_rdd.reduceByKey(lambda a, b: a + b)


most_common_ownership = ownership_counts.takeOrdered(1, key=lambda x: -x[1])


print("Most common ownership type:", most_common_ownership)

Most common ownership type: [('First Owner', 29964)]


# Question 4: Which brand has the highest average power per bike?

In [17]:
brand_power_rdd = (
    rows
    .map(lambda row: row.split(','))
    .filter(lambda parts: len(parts) > 7 and parts[7] and parts[6])
    .map(lambda parts: (parts[7], float(parts[6])))
)


brand_power_stats_rdd = brand_power_rdd \
    .mapValues(lambda power: (power, 1)) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))


brand_avg_power_rdd = brand_power_stats_rdd.mapValues(lambda x: x[0] / x[1])


highest_avg_power_brand = brand_avg_power_rdd.takeOrdered(1, key=lambda x: -x[1])


print("Brand with highest average power:", highest_avg_power_brand)

Brand with highest average power: [('Indian', 1353.3333333333333)]


# Question 5: What is the probability that a bike is from "Royal Enfield"?

In [18]:
total_bikes = rows.count()


royal_enfield_bikes = rows.filter(lambda row: "Royal Enfield" in row.split(',')[7]).count()


probability_royal_enfield = royal_enfield_bikes / total_bikes if total_bikes > 0 else 0


print(f"Probability that a bike is from 'Royal Enfield': {probability_royal_enfield:.4f}")

Probability that a bike is from 'Royal Enfield': 0.1280


In [None]:
display(rows.take(5))

['TVS Star City Plus Dual Tone 110cc,35000.0,Ahmedabad,17654.0,First Owner,3.0,110.0,TVS',
 'Royal Enfield Classic 350cc,119900.0,Delhi,11000.0,First Owner,4.0,350.0,Royal Enfield',
 'Triumph Daytona 675R,600000.0,Delhi,110.0,First Owner,8.0,675.0,Triumph',
 'TVS Apache RTR 180cc,65000.0,Bangalore,16329.0,First Owner,4.0,180.0,TVS',
 'Yamaha FZ S V 2.0 150cc-Ltd. Edition,80000.0,Bangalore,10000.0,First Owner,3.0,150.0,Yamaha']