In [1]:
import os, socket
print("HOSTNAME:", socket.gethostname())
print("PWD:", os.getcwd())
print("DATA exists?:", os.path.exists("/home/jovyan/data/yellow_tripdata_2019-01.parquet"))


HOSTNAME: 3fa58c0c40d9
PWD: /home/jovyan
DATA exists?: True


In [2]:
import os
from pyspark.sql import SparkSession

print("exists /home/jovyan/data ?",
      os.path.exists("/home/jovyan/data/yellow_tripdata_2019-01.parquet"))
print("exists /opt/spark/work-dir/data ?",
      os.path.exists("/opt/spark/work-dir/data/yellow_tripdata_2019-01.parquet"))

spark = (SparkSession.builder
         .appName("NYC-TLC-Check")
         .master("spark://spark-master:7077")
         .getOrCreate())

print("spark.version =", spark.version)

df = spark.read.parquet("/opt/spark/work-dir/data/yellow_tripdata_2019-01.parquet")
print("rows =", df.count())

spark.stop()


exists /home/jovyan/data ? True
exists /opt/spark/work-dir/data ? True
spark.version = 3.5.0
rows = 7696617


In [3]:
df.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("NYC-TLC-01")
         .master("spark://spark-master:7077")
         .getOrCreate())

# local do arquivo
df = spark.read.parquet("/opt/spark/work-dir/data/yellow_tripdata_2019-01.parquet")

# abertura do arquivo
df.select(
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "tip_amount",
    "total_amount"
).show(5, truncate=False)

# análise das datas contidas no arquivo
df.select(
    F.min("tpep_pickup_datetime").alias("min_pickup"),
    F.max("tpep_pickup_datetime").alias("max_pickup"),
    F.min("tpep_dropoff_datetime").alias("min_dropoff"),
    F.max("tpep_dropoff_datetime").alias("max_dropoff"),
).show(truncate=False)

# definição do período de análise
in_jan = (
    (F.col("tpep_pickup_datetime") >= F.lit("2019-01-01")) &
    (F.col("tpep_pickup_datetime") <  F.lit("2019-02-01"))
)

# contar quantos registros dentro da janela de interesse x registros fora da janela
counts = (df
    .select(F.when(in_jan, F.lit("in_jan_2019")).otherwise(F.lit("out_of_window")).alias("bucket"))
    .groupBy("bucket")
    .count()
)

counts.show(truncate=False)

pickup = F.col("tpep_pickup_datetime")

cond = (pickup >= F.lit("2019-01-01")) & (pickup < F.lit("2019-02-01"))

df_jan = df.filter(cond)

print("=== Plano (não executa) ===")
df_jan.explain()

print("\n=== Agora executa (count) ===")
print("rows jan:", df_jan.count())

bucket2 = (
    F.when(pickup < F.lit("2019-01-01"), F.lit("before_2019-01-01"))
     .when(pickup >= F.lit("2019-02-01"), F.lit("on_or_after_2019-02-01"))
     .otherwise(F.lit("in_jan_2019"))
)

(df
 .select(bucket2.alias("bucket"))
 .groupBy("bucket")
 .count()
 .orderBy("bucket")
).show(truncate=False)

spark.stop()


+--------+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|tip_amount|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+
|1       |2019-01-01 00:46:40 |2019-01-01 00:53:20  |1.0            |1.5          |151         |239         |1           |7.0        |1.65      |9.95        |
|1       |2019-01-01 00:59:47 |2019-01-01 01:18:59  |1.0            |2.6          |239         |246         |1           |14.0       |1.0       |16.3        |
|2       |2018-12-21 13:48:30 |2018-12-21 13:52:40  |3.0            |0.0          |236         |236         |1           |4.5        |0.0       |5.8         |
|2       |2018-11-28 15:52:25 |2018-11-28 15:5