In [None]:
#spark init

In [2]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName('test')\
    .getOrCreate()

In [None]:
# get data

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [None]:
!wc -l fhvhv_tripdata_2021-01.csv

In [None]:
df = spark.read\
    .option("header","True")\
    .csv('fhvhv_tripdata_2021-01.csv')

In [None]:
df.head(5)

In [None]:
df.schema

In [None]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [None]:
!wc  head.csv

In [3]:
import pandas as pd

In [4]:
df_pandas = pd.read_csv('head.csv')

In [5]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [10]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

In [14]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

In [15]:
schema = StructType(
    [types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True)]
    )

In [16]:
df = spark.read\
    .option("header","True")\
    .schema(schema)\
    .csv('fhvhv_tripdata_2021-01.csv')

In [18]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [20]:
df = df.repartition(24)

In [21]:
df.write.parquet('fhvhv/2021/01/')

In [None]:
#Read parquet

In [22]:
df = spark.read.parquet('fhvhv/2021/01/')

In [26]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [34]:
from pyspark.sql import functions as F

In [37]:
def crazystuff(base_num):
    num = int (base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'


In [39]:
crazystuff('B02884')

's/b44'

In [40]:
crazy_stuff_udf = F.udf(crazystuff, returnType=types.StringType())

In [41]:
df\
    .withColumn('pickup_date',F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date',F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/b32| 2021-01-06|  2021-01-06|          74|          69|
|  e/b42| 2021-01-06|  2021-01-06|         148|          34|
|  e/b3c| 2021-01-05|  2021-01-05|         237|         265|
|  e/9ce| 2021-01-06|  2021-01-06|         140|         236|
|  s/b44| 2021-01-06|  2021-01-06|          24|          51|
|  e/b42| 2021-01-03|  2021-01-03|          68|           4|
|  e/b38| 2021-01-06|  2021-01-06|          92|         152|
|  s/b3d| 2021-01-01|  2021-01-01|         181|          61|
|  s/acd| 2021-01-05|  2021-01-05|         147|         116|
|  e/b3b| 2021-01-04|  2021-01-04|          37|          49|
|  s/acd| 2021-01-06|  2021-01-06|         113|         114|
|  s/acd| 2021-01-04|  2021-01-04|          13|         265|
|  e/9ce| 2021-01-05|  2021-01-05|         192|         129|
|  e/9ce| 2021-01-01|  2

In [30]:
df.select('pickup_datetime','dropoff_datetime','PULocationID','DOLocationID')\
    .filter(df.hvfhs_license_num == 'HV0003' ) \
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-06 23:12:28|2021-01-06 23:24:47|          74|          69|
|2021-01-06 08:40:41|2021-01-06 08:53:34|         148|          34|
|2021-01-05 08:27:01|2021-01-05 09:28:42|         237|         265|
|2021-01-06 08:14:29|2021-01-06 08:40:31|          24|          51|
|2021-01-03 14:28:58|2021-01-03 14:47:51|          68|           4|
|2021-01-06 07:20:56|2021-01-06 07:50:33|          92|         152|
|2021-01-01 20:59:13|2021-01-01 21:05:08|         181|          61|
|2021-01-05 19:08:46|2021-01-05 19:27:05|         147|         116|
|2021-01-04 08:37:49|2021-01-04 08:50:25|          37|          49|
|2021-01-06 12:57:48|2021-01-06 13:03:03|         113|         114|
|2021-01-04 23:05:00|2021-01-04 23:25:41|          13|         265|
|2021-01-05 22:50:34|2021-01-05 23:20:04|       

In [28]:
!head -n 10 head.csv

hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,
HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,
HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,
HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,
HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,
HV0005,B02510,2021-01-01 00:06:59,2021-01-01 00:43:01,88,42,
HV0005,B02510,2021-01-01 00:50:00,2021-01-01 01:04:57,42,151,
HV0003,B02764,2021-01-01 00:14:30,2021-01-01 00:50:27,71,226,
HV0003,B02875,2021-01-01 00:22:54,2021-01-01 00:30:20,112,255,
