In [0]:
%sql
-- Peek at 10 rows
SELECT * FROM samples.nyctaxi.trips LIMIT 10;

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
2016-02-13T21:47:53.000Z,2016-02-13T21:57:15.000Z,1.4,8.0,10103,10110
2016-02-13T18:29:09.000Z,2016-02-13T18:37:23.000Z,1.31,7.5,10023,10023
2016-02-06T19:40:58.000Z,2016-02-06T19:52:32.000Z,1.8,9.5,10001,10018
2016-02-12T19:06:43.000Z,2016-02-12T19:20:54.000Z,2.3,11.5,10044,10111
2016-02-23T10:27:56.000Z,2016-02-23T10:58:33.000Z,2.6,18.5,10199,10022
2016-02-13T00:41:43.000Z,2016-02-13T00:46:52.000Z,1.4,6.5,10023,10069
2016-02-18T23:49:53.000Z,2016-02-19T00:12:53.000Z,10.4,31.0,11371,10003
2016-02-18T20:21:45.000Z,2016-02-18T20:38:23.000Z,10.15,28.5,11371,11201
2016-02-03T10:47:50.000Z,2016-02-03T11:07:06.000Z,3.27,15.0,10014,10023
2016-02-19T01:26:39.000Z,2016-02-19T01:40:01.000Z,4.42,15.0,10003,11222


In [0]:
# Do the same in Python
df = spark.read.table("samples.nyctaxi.trips")
df.printSchema()
df.limit(5).display()

tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,fare_amount,pickup_zip,dropoff_zip
2016-02-13T21:47:53.000Z,2016-02-13T21:57:15.000Z,1.4,8.0,10103,10110
2016-02-13T18:29:09.000Z,2016-02-13T18:37:23.000Z,1.31,7.5,10023,10023
2016-02-06T19:40:58.000Z,2016-02-06T19:52:32.000Z,1.8,9.5,10001,10018
2016-02-12T19:06:43.000Z,2016-02-12T19:20:54.000Z,2.3,11.5,10044,10111
2016-02-23T10:27:56.000Z,2016-02-23T10:58:33.000Z,2.6,18.5,10199,10022


In [0]:
%sql
SELECT
  pickup_zip,
  COUNT(*)                    AS trips,
  ROUND(SUM(fare_amount), 2)  AS total_fares
FROM samples.nyctaxi.trips
GROUP BY pickup_zip
ORDER BY trips DESC
LIMIT 20;

pickup_zip,trips,total_fares
10001,1227,13028.01
10003,1181,12965.5
10011,1129,12321.5
10021,1021,10424.0
10018,1012,11541.51
10023,1008,10123.0
10028,929,9481.5
10012,834,9467.0
10110,763,8316.0
10065,702,6885.5


In [0]:
from pyspark.sql import functions as F

df_feat = (
    df
    .select("trip_distance", "fare_amount", "tpep_pickup_datetime")
    .withColumn("hour", F.hour("tpep_pickup_datetime"))
    .withColumn("fare_per_mile", F.when(F.col("trip_distance") > 0, F.col("fare_amount")/F.col("trip_distance")).otherwise(None))
).na.drop()

df_feat.limit(10).display()

trip_distance,fare_amount,tpep_pickup_datetime,hour,fare_per_mile
1.4,8.0,2016-02-13T21:47:53.000Z,21,5.714285714285714
1.31,7.5,2016-02-13T18:29:09.000Z,18,5.7251908396946565
1.8,9.5,2016-02-06T19:40:58.000Z,19,5.277777777777778
2.3,11.5,2016-02-12T19:06:43.000Z,19,5.0
2.6,18.5,2016-02-23T10:27:56.000Z,10,7.115384615384615
1.4,6.5,2016-02-13T00:41:43.000Z,0,4.642857142857143
10.4,31.0,2016-02-18T23:49:53.000Z,23,2.980769230769231
10.15,28.5,2016-02-18T20:21:45.000Z,20,2.8078817733990147
3.27,15.0,2016-02-03T10:47:50.000Z,10,4.587155963302752
4.42,15.0,2016-02-19T01:26:39.000Z,1,3.3936651583710407


In [0]:
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from pyspark.sql import functions as F

# Ensure numeric types (if you haven't already)
df_num = (
    spark.read.table("samples.nyctaxi.trips")
    .select(
        F.col("trip_distance").cast("double").alias("trip_distance"),
        F.col("fare_amount").cast("double").alias("fare_amount")
    )
    .na.drop()
)

# Convert a modest sample to pandas to avoid memory issues on serverless
sample_rows = 100000  # adjust smaller if you hit memory limits
pdf = df_num.limit(sample_rows).toPandas()

X = pdf[["trip_distance"]].values
y = pdf["fare_amount"].values

# Simple train/test split
split = int(0.8 * len(pdf))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

model = LinearRegression()
model.fit(X_train, y_train)
pred = model.predict(X_test)

rmse = mean_squared_error(y_test, pred, squared=False)
r2 = r2_score(y_test, pred)

print("Coefficient:", float(model.coef_[0]))
print("Intercept:", float(model.intercept_))
print("RMSE:", float(rmse))
print("R2:", float(r2))

import pandas as pd
preview = pd.DataFrame({
    "trip_distance": X_test.flatten()[:10],
    "actual_fare": y_test[:10],
    "predicted_fare": pred[:10]
})
display(preview)


trip_distance,actual_fare,predicted_fare
8.96,26.0,28.97000871931997
1.2,7.0,7.816504845267724
3.46,11.5,13.977190251886032
0.7,4.5,6.453521348228276
0.66,6.0,6.34448266846512
0.56,4.5,6.07188596905723
0.59,4.5,6.153664978879597
0.85,4.5,6.862416397340111
18.48,52.0,54.921214502951074
1.43,6.5,8.44347725390587


In [0]:
%sql
SELECT fare_amount FROM samples.nyctaxi.trips WHERE fare_amount < 20 LIMIT 25

fare_amount
8.0
7.5
9.5
11.5
18.5
6.5
15.0
15.0
13.5
6.0
