<a href="https://colab.research.google.com/github/aavarela/SPBD_Labs/blob/main/docs/labs/projeto1/Projeto1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Java setup (required for pyspark)

In [None]:
#@title Java Setup (needed for pyspark)
!apt-get install -y openjdk-17-jre 2>/dev/null > /dev/null

#Download 1% sample

In [None]:
#@title Download 1% sample
!wget -q -O taxi_rides_1pc.csv.gz https://www.dropbox.com/scl/fi/v8ei5laqcalrx30z3lsty/taxi_rides_1pc.csv.gz?rlkey=q1lq7l56c4j97h9kymsdroau5&st=iurdwnwj&dl=0

#Inspect dataset schema

In [None]:
#@title Dataset Schema
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]') \
						.appName('taxis').getOrCreate()

try :
    data = spark.read.csv('taxi_rides_1pc.csv.gz', sep =',', header=True, inferSchema=True)

    data.printSchema()

except Exception as err:
    print(err)

#Register the provided helper functions (latlon_to_grid and inBounds) as user defined functions (UDF)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, BooleanType, DoubleType

# Longitude and latitude from the upper left corner of the grid
MIN_LON = -74.916578
MAX_LAT = 41.47718278

# Longitude and latitude that correspond to a shift in 500 meters
LON_DELTA = 0.005986
LAT_DELTA = 0.004491556

def latlon_to_grid(lat, lon):
    return ((int)((MAX_LAT - lat)/LAT_DELTA), (int)((lon - MIN_LON)/LON_DELTA))

def inBounds( cell ):
    return cell[0] > 0 and cell[0] < 300 and cell[1] > 0 and cell[1] < 300

# Register latlon_to_grid as a UDF
latlon_to_grid_udf = udf(latlon_to_grid, ArrayType(IntegerType()))

# Register inBounds as a UDF
inBounds_udf = udf(inBounds, BooleanType())

#Clean the data by removing null and invalid coordinates

In [None]:
from pyspark.sql.functions import col, isnan, when

# Filter out rows with null or invalid coordinate values before applying UDFs
# Check for both None and NaN if the schema is double
cleaned_data = data.filter(
    col("pickup_latitude").isNotNull() & ~isnan(col("pickup_latitude")) &
    col("pickup_longitude").isNotNull() & ~isnan(col("pickup_longitude")) &
    col("dropoff_latitude").isNotNull() & ~isnan(col("dropoff_latitude")) &
    col("dropoff_longitude").isNotNull() & ~isnan(col("dropoff_longitude"))
)

#Convert the coordinates using the helper functions

In [None]:
# Apply latlon_to_grid_udf to pickup and dropoff coordinates
data_with_grid_coords = cleaned_data \
    .withColumn("pickup_grid_coords", latlon_to_grid_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_coords", latlon_to_grid_udf(col("dropoff_latitude"), col("dropoff_longitude")))

# Extract x and y coordinates
data_with_grid_coords = data_with_grid_coords \
    .withColumn("pickup_grid_x", col("pickup_grid_coords").getItem(0)) \
    .withColumn("pickup_grid_y", col("pickup_grid_coords").getItem(1)) \
    .withColumn("dropoff_grid_x", col("dropoff_grid_coords").getItem(0)) \
    .withColumn("dropoff_grid_y", col("dropoff_grid_coords").getItem(1))

#Filter the data by removing entries that are outside the grid

In [None]:
# Filter DataFrame to include only trips within the 300x300 grid for both pickup and dropoff
filtered_data = data_with_grid_coords.filter(
    inBounds_udf(col("pickup_grid_coords")) & inBounds_udf(col("dropoff_grid_coords"))
)

#Calculate the trips profit

In [None]:
# Calculate trip_profit
final_data = filtered_data.withColumn("trip_profit", col("fare_amount") + col("tip_amount"))

#Display data reduction and final data schema

In [None]:
print("Original DataFrame count:", data.count())
print("Cleaned DataFrame count (after removing null coords):", cleaned_data.count())
print("DataFrame with grid coordinates, filtered and profit count:", final_data.count())
final_data.printSchema()