In [25]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/03 11:42:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/03 11:42:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [7]:
spark.version

'3.5.0'

In [40]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('fhv_tripdata_2019-10.csv.gz')

                                                                                

In [6]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [8]:
df.repartition(6) \
    .write.parquet('fhv/2019/10')

                                                                                

In [10]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
+--------------------+------------------

In [11]:
df.createOrReplaceTempView('fhv_trips')

In [16]:
spark.sql("""
SELECT count(*)
FROM fhv_trips
WHERE pickup_datetime between '2019-10-15 00:00:00' and '2019-10-15 23:59:59'
""").show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [35]:
df.withColumn("DiffInHours",(df.dropOff_datetime.cast(types.LongType()) - df.pickup_datetime.cast(types.LongType()))/3600) \
  .sort("DiffInHours", ascending=False) \
  .show()


[Stage 24:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|       DiffInHours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832|          631152.5|
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   NULL|                B02832|          631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        NULL|        NULL|   NULL|                B02416| 87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   NULL|       B00746         | 70128.02805555555|
|              B02921|2019-

                                                                                

In [45]:
spark.sql("""
SELECT PUlocationID, count(*) record_count
FROM fhv_trips
group by PUlocationID
order by record_count
""").show()

[Stage 34:>                                                         (0 + 1) / 1]

+------------+------------+
|PUlocationID|record_count|
+------------+------------+
|           2|           1|
|         105|           2|
|         111|           5|
|          30|           8|
|         120|          14|
|          12|          15|
|         207|          23|
|          27|          25|
|         154|          26|
|           8|          29|
|         128|          39|
|         253|          47|
|          96|          53|
|          34|          57|
|          59|          62|
|          58|          77|
|          99|          89|
|         190|          98|
|          54|         105|
|         217|         110|
+------------+------------+
only showing top 20 rows



                                                                                

In [41]:
df_zone = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .parquet('test_pyspark/zones')

In [46]:
df_zone.filter(df_zone.LocationID == 2).show()

+----------+-------+-----------+------------+
|LocationID|Borough|       Zone|service_zone|
+----------+-------+-----------+------------+
|         2| Queens|Jamaica Bay|   Boro Zone|
+----------+-------+-----------+------------+

