In [4]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql import types

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df.show()

In [None]:
df.write.parquet('zones')

In [None]:
# fhvhv_tripdata_2021-02.parquet

spark = SparkSession.builder \
    .appName("fhv") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

#fhv = spark.read.parquet("fhvhv_tripdata_2021-02.parquet")

fhv = spark.read.parquet("fhv_r")

### Old Questions

### 1.

In [None]:
spark.version

In [None]:
# https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-coalesce/

# DataFrame repartition
fhv_r = fhv.repartition(24)
print(fhv_r.rdd.getNumPartitions())

In [None]:
fhv_r.write.parquet("fhv_r")

In [None]:
os.getcwd() + "\\fhv_r"

### 2.

In [None]:
import os
from pathlib import Path
# Modified from https://www.geeksforgeeks.org/how-to-get-size-of-folder-using-python/

output_path = Path(os.getcwd()) / "fhv_r"

size = 0

for path, dirs, files in os.walk(str(output_path)):
    for file in files:
        size += os.path.getsize(output_path / file)

print(f"Folder size: {round(size / 1048576, 1)} MB")

### 3.

In [None]:
fhv.select(func.count(fhv.pickup_datetime)).show()

In [None]:
# https://stackoverflow.com/questions/31407461/datetime-range-filter-in-pyspark-sql

fhv_time = fhv.select(func.to_date(fhv.pickup_datetime).alias("time"))

date_from = "2021-02-15 00:00:00"
date_to = "2021-02-16 00:00:00"
sf = fhv_time.filter(fhv_time.time >= date_from).filter(fhv_time.time < date_to)


In [None]:
fhv.registerTempTable("fhv_temp")

trips_15th = spark.sql("""
SELECT 
    COUNT(pickup_datetime) AS trips
FROM
    fhv_temp
WHERE
    pickup_datetime >= '2021-02-15 00:00:00'
    AND pickup_datetime < '2021-02-16 00:00:00'
""")

In [None]:
sf.select(func.count(sf.time)).show()

In [None]:
trips_15th.show()

### 4.

In [None]:
max_duration = spark.sql("""
SELECT 
    MAX(FLOAT(dropoff_datetime) - FLOAT(pickup_datetime)) AS duration
FROM
    fhv_temp
""")

In [None]:
date_duration = spark.sql("""
SELECT 
    pickup_datetime,
    (FLOAT(dropoff_datetime) - FLOAT(pickup_datetime)) AS duration
FROM
    fhv_temp
""")

In [None]:
date_duration.join(max_duration, on="duration", how="inner").show()

### 5.

In [None]:
top_dispatch_base = spark.sql("""
SELECT 
    dispatching_base_num,
    count(dispatching_base_num) as base_count
FROM
    fhv_temp
GROUP BY dispatching_base_num
ORDER BY 2 DESC
LIMIT 1
""")

top_dispatch_base.show()

In [None]:
top_dispatch_base.explain()

# http://localhost:4040/jobs/

### 6.

In [None]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [None]:
zones.head()

In [None]:
zones.registerTempTable("zone_temp")

zone_pu = spark.sql("""SELECT LocationID AS PULocationID, Zone AS PUZone FROM zone_temp""")
zone_do = spark.sql("""SELECT LocationID AS DOLocationID, Zone AS DOZone FROM zone_temp""")

zone_merge = fhv.join(zone_pu, on="PULocationID", how='left')

zone_merge = zone_merge.join(zone_do, on="DOLocationID", how='left')

In [None]:
zone_merge.registerTempTable("zone_merge_temp")

top_pairs = spark.sql("""
SELECT zone_pairs, COUNT(zone_pairs)
FROM (
SELECT CONCAT(COALESCE(PUZone, 'unknown'), " / ", COALESCE(DOZone, 'unknown')) as zone_pairs
FROM
    zone_merge_temp
    )
GROUP BY zone_pairs
ORDER BY 2 DESC
LIMIT 1
""")

In [None]:
top_pairs.show(n=1, truncate=False, vertical=True)

In [None]:
spark.sql("""

SELECT COUNT(PUZone)
FROM
    zone_merge_temp
WHERE PUZone = 'East New York' AND DOZone = 'East New York'
""").show()

### New Questions

### 1.

In [6]:
file = "fhvhv_tripdata_2021-06.csv"

spark = SparkSession.builder \
    .appName("fhv") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

In [7]:
spark.version

'3.0.3'

### 2.

In [25]:
import csv
import pandas as pd
file = "fhvhv_tripdata_2021-06.csv"
with open(file) as r:
    headers= pd.read_csv(r, nrows=0).columns.tolist()
    
headers

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [26]:
schema = types.StructType([
    # types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])



#fhv = spark.read.parquet(file)

fhv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(file)

In [27]:
# https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-coalesce/

# DataFrame repartition
fhv_r = fhv.repartition(12)
print(fhv_r.rdd.getNumPartitions())

fhv_r.write.parquet("fhv_r")

12


In [28]:
import os
from pathlib import Path
# Modified from https://www.geeksforgeeks.org/how-to-get-size-of-folder-using-python/

output_path = Path(os.getcwd()) / "fhv_r"

size = 0
file_count = 0
for path, dirs, files in os.walk(str(output_path)):
    for file in files:
        file_split = file.split(".")
        if (file_split[len(file_split) - 1]).lower() == "parquet":
            print(file)
            size += os.path.getsize(output_path / file)
            file_count += 1

print(f"Folder size: {round(size / 1048576, 1)} MB")
print(f"Files: {file_count}")

print(f"Average size: {(size / 1048576) / file_count}")

part-00000-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00001-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00002-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00003-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00004-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00005-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00006-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00007-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00008-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00009-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00010-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
part-00011-548a3904-6f48-4256-8bb0-28745127c741-c000.snappy.parquet
Folder size: 260.6 MB
Files: 12
Average size: 21.717481056849163


### 3.

In [29]:
fhv_r.registerTempTable("fhv_temp")

trips_15th = spark.sql("""
SELECT 
    COUNT(pickup_datetime) AS trips
FROM
    fhv_temp
WHERE
    pickup_datetime >= '2021-06-15 00:00:00'
    AND pickup_datetime < '2021-06-16 00:00:00'
""")

trips_15th.show()

+------+
| trips|
+------+
|452470|
+------+



### 4.

In [37]:
max_duration = spark.sql("""
SELECT 
    MAX((FLOAT(dropoff_datetime) - FLOAT(pickup_datetime)) / 3600) AS duration
FROM
    fhv_temp
WHERE dropoff_datetime IS NOT NULL and pickup_datetime IS NOT NULL
""")

date_duration = spark.sql("""
SELECT 
    pickup_datetime,
    (FLOAT(dropoff_datetime) - FLOAT(pickup_datetime))/ 3600 AS duration
FROM
    fhv_temp
""")

max_duration.show()

#date_duration.join(max_duration, on="duration", how="inner").show()

+--------+
|duration|
+--------+
|   66.88|
+--------+



### 5.

In [None]:
# http://localhost:4040/jobs/

### 6.

In [34]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

zones.registerTempTable("zone_temp")

zone_pu = spark.sql("""SELECT LocationID AS PULocationID, Zone AS PUZone FROM zone_temp""")

zone_merge = fhv.join(zone_pu, on="PULocationID", how='inner')


zone_merge.registerTempTable("zone_merge_temp")

top_pu = spark.sql("""
SELECT PUZone, COUNT(PUZone)
FROM
    zone_merge_temp
where PUZone is not null and PUZone != 'NA'
GROUP BY PUZone
ORDER BY 2 DESC
LIMIT 1
""")


top_pu.show(n=1, truncate=False, vertical=True)

-RECORD 0----------------------------
 PUZone        | Crown Heights North 
 count(PUZone) | 231279              

