In [9]:
import pyspark
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [11]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-04 12:07:02--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T120530Z&X-Amz-Expires=300&X-Amz-Signature=fae9e96168db7b2ae54406a1aae86724935fea4c6d09aa230596eb8647acc482&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-04 12:07:02--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [12]:
!gzip -fd "fhv_tripdata_2019-10.csv.gz"

In [14]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [15]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [16]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [17]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [18]:
import pandas as pd

In [19]:
df_pandas = pd.read_csv('head.csv')

In [20]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [21]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

Integer - 4 bytes
Long - 8 bytes

In [22]:
from pyspark.sql import types

In [23]:
schema = types.StructType([
	 types.StructField('dispatching_base_num', types.StringType(), True), 
	 types.StructField('pickup_datetime', types.TimestampType(), True), 
	 types.StructField('dropOff_datetime', types.TimestampType(), True), 
	 types.StructField('PUlocationID', types.DoubleType(), True), 
	 types.StructField('DOlocationID', types.DoubleType(), True), 
	 types.StructField('SR_Flag', types.DoubleType(), True), 
	 types.StructField('Affiliated_base_number', types.StringType(), True)
 ])

In [24]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [25]:
df = df.repartition(6)

In [27]:
df.write.parquet('fhvhv/2019/10/')

In [28]:
!ls -lh --block-size=M fhvhv/2019/10

total 37M
-rw-r--r-- 1 jovyan users 7M Mar  4 12:08 part-00000-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 7M Mar  4 12:08 part-00001-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 6M Mar  4 12:08 part-00002-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 7M Mar  4 12:08 part-00003-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 6M Mar  4 12:08 part-00004-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 7M Mar  4 12:08 part-00005-fdbd02f0-6e7f-4b91-a55f-8f867a97edf4-c000.snappy.parquet
-rw-r--r-- 1 jovyan users 0M Mar  4 12:08 _SUCCESS


In [93]:
df = spark.read.parquet('fhvhv/2019/10/')

In [94]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



SELECT * FROM df WHERE hvfhs_license_num =  HV0003

In [95]:
from pyspark.sql import functions as F

In [96]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B01485|2019-10-03 09:02:28|2019-10-03 09:18:19|       264.0|        39.0|   NULL|                B02866|
|              B01233|2019-10-03 14:17:25|2019-10-03 14:26:32|       264.0|        81.0|   NULL|                B01233|
|              B02157|2019-10-03 07:47:15|2019-10-03 08:40:07|       264.0|       265.0|   NULL|                B02157|
|              B00350|2019-10-03 14:01:57|2019-10-03 14:15:56|       264.0|        42.0|   NULL|                B00350|
|              B02292|2019-10-04 11:19:04|2019-10-04 11:51:23|       264.0|        80.0|   NULL|                B02869|
|              B02735|2019-10-03 13:58:4

In [98]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [99]:
crazy_stuff('B02884')

's/b44'

In [100]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [112]:
def interval_hours(init_date, finish_date):
    interval = finish_date.cast("long") - init_date.cast("long")
    interval_hours = interval / 3600
    # interval_hours = 1
    return interval_hours

In [115]:
df2 = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .withColumn('length_trip', interval_hours(df.pickup_datetime,df.dropOff_datetime))\
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID','length_trip') 

In [116]:
df2.show()

+-------+-----------+------------+------------+------------+--------------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|         length_trip|
+-------+-----------+------------+------------+------------+--------------------+
|  a/5cd| 2019-10-03|  2019-10-03|       264.0|        39.0| 0.26416666666666666|
|  a/4d1| 2019-10-03|  2019-10-03|       264.0|        81.0| 0.15194444444444444|
|  a/86d| 2019-10-03|  2019-10-03|       264.0|       265.0|  0.8811111111111111|
|  s/15e| 2019-10-03|  2019-10-03|       264.0|        42.0| 0.23305555555555554|
|  a/8f4| 2019-10-04|  2019-10-04|       264.0|        80.0|  0.5386111111111112|
|  e/aaf| 2019-10-03|  2019-10-03|       264.0|         3.0| 0.12583333333333332|
|  e/479| 2019-10-03|  2019-10-03|       264.0|       212.0| 0.10472222222222222|
|  e/136| 2019-10-01|  2019-10-01|       264.0|       168.0|                0.21|
|  a/4b0| 2019-10-03|  2019-10-03|       264.0|        37.0|   1.051388888888889|
|  e/1bd| 2019-1

In [122]:
df_filtered = df2.filter(df2.pickup_date == '2019-10-15')


In [123]:
df_filtered.count()

62610

In [124]:
from pyspark.sql.functions import col, max
max_valor = df2.agg(max(col("length_trip"))).collect()[0][0]
print(max_valor)

631152.5


In [125]:
!head -n 10 head.csv

dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
B00021         ,2019-10-01 00:00:48,2019-10-01 00:07:12,129,129,,B00021         
B00021         ,2019-10-01 00:47:23,2019-10-01 00:53:25,57,57,,B00021         
B00021         ,2019-10-01 00:10:06,2019-10-01 00:19:50,173,173,,B00021         
B00021         ,2019-10-01 00:51:37,2019-10-01 01:06:14,226,226,,B00021         
