In [1]:
# ==========================================================
#   NYC TAXI BIG DATA + SPARK SQL – WERSJA DLA COLAB
#   (bez apt-get, bez instalacji Sparka, z użyciem wget)
# ==========================================================

# 1. (opcjonalnie) upewniamy się, że pyspark jest dostępny
#    Zwykle w Colab już jest, ale można wymusić wersję:
!pip install -q pyspark==3.5.1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

# 2. Start SparkSession (Colab ma już środowisko Javy)
spark = (
    SparkSession.builder
    .appName("NYC-Taxi-Colab-Fixed")
    .getOrCreate()
)

print("Spark version:", spark.version)

# 3. Pobranie danych NYC Taxi przez HTTPS na lokalny dysk
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet"
local_path = "yellow_tripdata_2022-01.parquet"

if not os.path.exists(local_path):
    print(f"\n[DOWNLOAD] Ściągam plik z {url} ...")
    !wget -q {url} -O {local_path}
    print("[DOWNLOAD] Zakończono pobieranie.")
else:
    print("\n[DOWNLOAD] Plik już istnieje lokalnie, pomijam pobieranie.")

# 4. Wczytanie Parquet lokalnie przez Spark
print(f"\n[LOAD] Wczytuję dane z pliku: {local_path}")
df = spark.read.parquet(local_path)

print("\n=== SCHEMA ===")
df.printSchema()

print("\n=== SAMPLE (5 wierszy) ===")
df.show(5, truncate=False)

# Rejestracja jako widok SQL
df.createOrReplaceTempView("taxi")
print("\n[INFO] Zarejestrowano widok SQL: taxi")

# ==========================================================
# 5. ANALIZY SQL – BIG DATA NA PRAWDZIWYCH DANYCH
# ==========================================================



# --- SQL 1: Podstawowe metryki ---
print("\n=== SQL 1: Podstawowe metryki (liczba kursów, średnia kwota, suma) ===")
spark.sql("""
    SELECT
        COUNT(*) AS total_trips,
        AVG(total_amount) AS avg_total_amount,
        SUM(total_amount) AS total_revenue
    FROM taxi
""").show()

# --- SQL 2: TOP 10 stref (PULocationID) wg przychodu ---
print("\n=== SQL 2: TOP 10 stref (PULocationID) wg przychodu ===")
spark.sql("""
    SELECT
        PULocationID AS zone,
        COUNT(*) AS trips,
        SUM(total_amount) AS revenue
    FROM taxi
    GROUP BY PULocationID
    ORDER BY revenue DESC
    LIMIT 10
""").show()

# --- SQL 3: Dzienne przychody i liczba kursów ---
print("\n=== SQL 3: Dzienne przychody i liczba kursów ===")
spark.sql("""
    SELECT
        DATE(tpep_pickup_datetime) AS day,
        COUNT(*) AS trips,
        ROUND(AVG(total_amount), 2) AS avg_amount,
        ROUND(SUM(total_amount), 2) AS revenue
    FROM taxi
    GROUP BY DATE(tpep_pickup_datetime)
    ORDER BY day
""").show()

# --- SQL 4: Najdroższe kursy w przeliczeniu na kilometr (anomalie / fraud) ---
print("\n=== SQL 4: Najdroższe kursy w przeliczeniu na kilometr (cost_per_km) ===")
spark.sql("""
    SELECT
        tpep_pickup_datetime,
        tpep_dropoff_datetime,
        passenger_count,
        trip_distance,
        total_amount,
        ROUND(total_amount / trip_distance, 2) AS cost_per_km
    FROM taxi
    WHERE trip_distance > 0
      AND total_amount > 0
    ORDER BY cost_per_km DESC
    LIMIT 20
""").show(truncate=False)

# --- SQL 5: Rozkład liczby kursów i średnich cen po godzinach ---
print("\n=== SQL 5: Rozkład liczby kursów i średnich kwot wg godzin ===")
spark.sql("""
    SELECT
        HOUR(tpep_pickup_datetime) AS hour,
        COUNT(*) AS trips,
        ROUND(AVG(total_amount), 2) AS avg_amount
    FROM taxi
    GROUP BY HOUR(tpep_pickup_datetime)
    ORDER BY hour
""").show()

# --- SQL 6: Średnia cena i dystans w zależności od liczby pasażerów ---
print("\n=== SQL 6: Średnia cena i dystans wg liczby pasażerów ===")
spark.sql("""
    SELECT
        passenger_count,
        COUNT(*) AS trips,
        ROUND(AVG(total_amount), 2) AS avg_price,
        ROUND(AVG(trip_distance), 2) AS avg_distance
    FROM taxi
    GROUP BY passenger_count
    ORDER BY passenger_count
""").show()

# --- SQL 7: Kursy „podejrzanie drogie” (prosta heurystyka) ---
print("\n=== SQL 7: Podejrzanie drogie kursy (fraud heurystyka) ===")
spark.sql("""
    SELECT
        tpep_pickup_datetime,
        tpep_dropoff_datetime,
        passenger_count,
        trip_distance,
        total_amount
    FROM taxi
    WHERE total_amount > 200
       OR (total_amount > 100 AND trip_distance < 2)
    ORDER BY total_amount DESC
    LIMIT 20
""").show(truncate=False)

print("\n[DONE] Demo NYC Taxi + Spark SQL w Colab zakończone ✔")


Spark version: 3.5.1

[DOWNLOAD] Ściągam plik z https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet ...
[DOWNLOAD] Zakończono pobieranie.

[LOAD] Wczytuję dane z pliku: yellow_tripdata_2022-01.parquet

=== SCHEMA ===
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double