In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [4]:
# !wc -l fhvhv_tripdata_2021-01.csv

In [6]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [7]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [None]:
# !gzip -d fhvhv_tripdata_2021-01.csv.gz

In [None]:
# !head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [8]:
import pandas as pd

In [9]:
df_pandas = pd.read_csv('head.csv')

In [10]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [11]:
spark.createDataFrame(df_pandas).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

In [12]:
from pyspark.sql import types

In [13]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [14]:


df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')



In [15]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   null|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   null|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   null|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   null|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   null|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [16]:
df = df.repartition(24)

In [None]:
# df.write.parquet('fhvhv/')

In [17]:
from pyspark.sql import functions as f

In [19]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'
    
crazy_stuff_udf = f.udf(crazy_stuff, returnType=types.StringType())

In [20]:
df  \
    .withColumn('pickup_date', f.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', f.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .show()
    



+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|dropoff_date|base_id|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+-------+
|           HV0003|              B02877|2021-01-04 04:27:31|2021-01-04 04:44:32|          56|         229|   null| 2021-01-04|  2021-01-04|  s/b3d|
|           HV0003|              B02884|2021-01-04 15:09:21|2021-01-04 15:16:34|          28|         215|   null| 2021-01-04|  2021-01-04|  s/b44|
|           HV0003|              B02875|2021-01-05 09:28:45|2021-01-05 09:37:04|         147|          78|   null| 2021-01-05|  2021-01-05|  e/b3b|
|           HV0005|              B02510|2021-01-02 14:54:39|2021-01-02 15:16:06|           7|         112|   nul

                                                                                

In [21]:
df.registerTempTable('fhvhv')



In [22]:
spark.sql('''
select
    *
from
    fhvhv
where PULocationID = 56        
          ''').show()

[Stage 8:>                                                          (0 + 8) / 8]

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02765|2021-01-04 21:02:06|2021-01-04 21:19:16|          56|          53|   null|
|           HV0005|              B02510|2021-01-05 10:20:32|2021-01-05 10:27:59|          56|         173|   null|
|           HV0003|              B02870|2021-01-04 08:50:48|2021-01-04 08:52:45|          56|          56|   null|
|           HV0003|              B02765|2021-01-03 03:17:25|2021-01-03 03:41:43|          56|          16|   null|
|           HV0003|              B02764|2021-01-01 08:40:27|2021-01-01 08:48:21|          56|          82|   null|
|           HV0003|              B02869|2021-01-02 14:01:07|2021-01-02 14:12:57|

                                                                                