In [1]:
!pip install pyspark==3.1.2

Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.4/212.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.1.2)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880745 sha256=fa0fd17cd7c5fc4ea1c23b36981d55ecf88af6b02c113be751a1eb31e1ea1e86
  Stored in directory: /root/.cache/pip/wheels/ef/70/50/7882e1bcb5693225f7cc86698f10953201b48b3f36317c2d18
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, sum as spark_sum

# Initialize Spark session
spark = SparkSession.builder.appName("TaxiAnalysis").getOrCreate()

# Load data into DataFrame with appropriate column names
column_names = ["medallion", "hack_license", "pickup_datetime", "dropoff_datetime",
           "trip_time_in_secs", "trip_distance", "pickup_longitude", "pickup_latitude",
           "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount",
           "surcharge", "mta_tax", "tip_amount", "tolls_amount", "total_amount"]

taxi_df = spark.read.csv('/taxi-data-sorted-small (1).csv', header=True, inferSchema=True)

# Rename the columns using alias
for old_col, new_col in zip(taxi_df.columns, column_names):
    taxi_df = taxi_df.withColumnRenamed(old_col, new_col)


cleaned_taxi_df = taxi_df.filter((col("trip_time_in_secs").between(120, 3600)) &
               (col("fare_amount").between(3, 200)) &
               (col("trip_distance").between(1, 50)) &
               (col("tolls_amount") >= 3))
#count
n = cleaned_taxi_df.count()

# Calculate sum of products (xy), sum of trip_distance, sum of fare_amount, and sum of squared trip_distance
sum_xy = cleaned_taxi_df.select(spark_sum(col("trip_distance") * col("fare_amount"))).first()[0]
sum_x = cleaned_taxi_df.agg(spark_sum("trip_distance")).first()[0]
sum_y = cleaned_taxi_df.agg(spark_sum("fare_amount")).first()[0]
sum_x_squared = cleaned_taxi_df.select(spark_sum(col("trip_distance") ** 2)).first()[0]

# Calculate slope (m) and y-intercept (b)
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x_squared - sum_x ** 2)
b = (sum_y - m * sum_x) / n

print("Slope (m):", m)
print("Y-intercept (b):", b)






Slope (m): 2.3692447119543347
Y-intercept (b): 8.645236596570763


In [None]:
#task 2
# Initial parameters
m, b = 0.1, 0.1
learning_rate = 0.0001
num_iterations = 100
#start_time = time.time()

# Cost function
def calculate_cost(data, m, b):
    return data.select(spark_sum((col("fare_amount") - (m * col("trip_distance") + b)) ** 2)).first()[0]

# Gradient Descent
for i in range(num_iterations):
    count = cleaned_taxi_df.count()

    gradients = cleaned_taxi_df.withColumn(
        "gradient_x",
        (-2 / count) * col("trip_distance") * (col("fare_amount") - (m * col("trip_distance") + b))
    ).withColumn(
        "gradient_b",
        (-2 / count) * (col("fare_amount") - (m * col("trip_distance") + b))
    )

    # Update parameters
    m = m - learning_rate * gradients.select(spark_sum("gradient_x")).first()[0]
    b = b - learning_rate * gradients.select(spark_sum("gradient_b")).first()[0]

    cost = calculate_cost(cleaned_taxi_df, m, b)
    print(f"Iteration {i + 1}, Cost: {cost}, Parameters: (m={m}, b={b})")


Iteration 1, Cost: 59892252.86775915, Parameters: (m=0.20862494023752476, b=0.10755512445390783)
Iteration 2, Cost: 55598767.582484365, Parameters: (m=0.3130872900306293, b=0.11482911704746726)
Iteration 3, Cost: 51627992.361712664, Parameters: (m=0.41354654142778896, b=0.12183274929043918)
Iteration 4, Cost: 47955671.32929683, Parameters: (m=0.5101560754484187, b=0.12857637997603277)
Iteration 5, Cost: 44559371.75498222, Parameters: (m=0.6030633962304445, b=0.1350699709943762)
Iteration 6, Cost: 41418347.02118597, Parameters: (m=0.6924103562063821, b=0.14132310254008523)
Iteration 7, Cost: 38513409.88960439, Parameters: (m=0.7783333726516581, b=0.14734498773714513)
Iteration 8, Cost: 35826815.29349564, Parameters: (m=0.860963635935766, b=0.15314448670343178)
Iteration 9, Cost: 33342151.939649776, Parameters: (m=0.9404273097941548, b=0.15873012007634263)
Iteration 10, Cost: 31044242.05788538, Parameters: (m=1.0168457239265882, b=0.16411008202018548)
Iteration 11, Cost: 28919048.6856877

In [3]:
#task 3
import numpy as np

def calculate_cost(X, y, m, b):
    predictions = np.dot(X, m) + b  #vectorization
    error = y - predictions.reshape(-1, 1)

    cost = np.sum(error ** 2) / (2 * X.shape[0])
    return cost

def gradient_descent(X, y, m, b, learning_rate, beta):
    num_samples = X.shape[0]
    predictions = np.dot(X, m) + b
    errors = y.flatten() - predictions

    gradient_m = -2 * np.dot(X.T, errors) / num_samples
    gradient_b = -2 * np.sum(errors) / num_samples

    new_m = m - learning_rate * gradient_m
    new_b = b - learning_rate * gradient_b

    # Calculate new cost
    new_cost = calculate_cost(X, y, new_m, new_b)

    # Check if the new cost is lower, if not, decrease the learning rate
    if new_cost > calculate_cost(X, y, m, b): #Bold driver
        learning_rate /= beta

    return new_m, new_b, learning_rate


X = np.array(cleaned_taxi_df.select("trip_time_in_secs", "trip_distance", "fare_amount", "tolls_amount").collect())
y = np.array(cleaned_taxi_df.select("total_amount").collect())

# Add bias term to X
X = np.c_[np.ones(X.shape[0]), X]

m = np.ones(X.shape[1]) * 0.1
b = 0.1
learning_rate = 0.001
max_iterations = 100
beta = 1.5  # Bold Driver parameter

y = y.reshape(-1, 1)

# Gradient Descent with Bold Driver
for iteration in range(max_iterations):


    # Calculate cost
    cost = calculate_cost(X, y, m, b)

    # Update parameters with Bold Driver
    m, b, learning_rate = gradient_descent(X, y, m, b, learning_rate, beta)


    print(f"Iteration {iteration + 1}, Cost: {cost}")
    print("Model Parameters (m):", m)
    print("Model Parameter (b):", b)
    print("Learning Rate:", learning_rate)
    print()

# Final model parameters
print("Final Model Parameters (m):", m)
print("Final Model Parameter (b):", b)







Iteration 1, Cost: 8834.457072136147
Model Parameters (m): [-1.42930580e-01 -4.64875968e+02 -3.27844664e+00 -1.00505221e+01
 -1.13981603e+00]
Model Parameter (b): -0.14293058005427406
Learning Rate: 0.0006666666666666666

Iteration 2, Cost: 334306302351.55817
Model Parameters (m): [1.02142459e+03 1.91605607e+06 1.42024543e+04 4.29866256e+04
 5.28391774e+03]
Model Parameter (b): 1021.4245915370462
Learning Rate: 0.0004444444444444444

Iteration 3, Cost: 5.678758317474787e+18
Model Parameters (m): [-2.80589281e+06 -5.26403383e+09 -3.90182487e+07 -1.18096933e+08
 -1.45162029e+07]
Model Parameter (b): -2805892.8124737036
Learning Rate: 0.0002962962962962963

Iteration 4, Cost: 4.286218537701719e+25
Model Parameters (m): [5.13820290e+09 9.63959590e+12 7.14509374e+10 2.16261284e+11
 2.65823390e+10]
Model Parameter (b): 5138202900.254034
Learning Rate: 0.00019753086419753085

Iteration 5, Cost: 1.4373238208853006e+32
Model Parameters (m): [-6.27106858e+12 -1.17649241e+16 -8.72043665e+13 -2.63