In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [7]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [8]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Homework') \
    .getOrCreate()

In [22]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [23]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [24]:
!head -n 101 fhv_tripdata_2019-10.csv > head_october.csv

In [26]:
!wc -l head_october.csv

101 head_october.csv


In [27]:
df_pandas = pd.read_csv('head_october.csv')

In [28]:
df_pandas

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264,264,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264,264,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264,264,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264,264,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264,264,,B00014
...,...,...,...,...,...,...,...
95,B00310,2019-10-01 00:06:02,2019-10-01 00:14:04,264,242,,B00310
96,B00310,2019-10-01 00:03:43,2019-10-01 00:07:26,264,213,,B02534
97,B00310,2019-10-01 00:37:14,2019-10-01 00:51:58,264,241,,B02879
98,B00310,2019-10-01 00:42:41,2019-10-01 00:54:42,264,213,,B02875


In [30]:
from pyspark.sql import types

In [43]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [45]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [46]:
df = df.repartition(6)

In [47]:
df.write.parquet('homework/', mode='overwrite')

In [48]:
df = spark.read.parquet('homework/')

In [82]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [94]:
df.createOrReplaceTempView('fhv_october_data')

In [95]:
spark.sql("""
SELECT
    count(1)
FROM
    fhv_october_data
WHERE
    DATE(pickup_datetime) = '2019-10-15'
""").show()

+--------+
|count(1)|
+--------+
|   62295|
+--------+



In [96]:
spark.sql("""
    SELECT 
        CAST(dropoff_datetime - pickup_datetime AS INTERVAL HOUR) AS trip_duration_hours
    FROM
        fhv_october_data
    ORDER BY
        trip_duration_hours DESC
""").show()

+-------------------+
|trip_duration_hours|
+-------------------+
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
+-------------------+
only showing top 20 rows



In [3]:
pyspark.version

<module 'pyspark.version' from 'C:\\tools\\spark-3.3.2-bin-hadoop3\\python\\pyspark\\version.py'>

In [6]:
pyspark.__version__

'3.3.2'