In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

23/03/03 10:58:24 WARN Utils: Your hostname, LX10001 resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp0s20f3)
23/03/03 10:58:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 10:58:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## question 1 What is the output of spark.version

In [3]:
spark.version

'3.3.2'

In [4]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-03 10:58:26--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230303T095854Z&X-Amz-Expires=300&X-Amz-Signature=854b6a171743c84dfa95ab66021f5ff22b03ef99fbcbde0733d45943d3895358&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-03 10:58:26--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [5]:

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

df = (spark
      .read
      .option('header', 'true')
      .schema(schema)
      .csv('./fhvhv_tripdata_2021-06.csv.gz')
     )

In [6]:
df \
.repartition(12) \
.write.parquet('data/raw/2021/06/', mode="overwrite")

[Stage 0:>                                                          (0 + 1) / 1]

23/03/03 10:59:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/03/03 10:59:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/03 10:59:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/03/03 10:59:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/03 10:59:23 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers


[Stage 2:>                                                        (0 + 12) / 12]

23/03/03 10:59:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/03 10:59:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/03/03 10:59:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/03 10:59:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




## Question 2: What is the average size of the Parquet?


In [7]:
files = !ls -lah ./data/raw/2021/06/*parquet

total_size_mb = 0
for f in files:
    total_size_mb+=float(f.split(' ')[4][:-1])
total_size_mb/len(files)

24.0

## Question 3: How many taxi trips were there on June 15?

In [None]:
df.registerTempTable('trips_data')
spark.sql("""
SELECT
    COUNT(*)
FROM
    trips_data
WHERE
    CAST(pickup_datetime AS DATE) == to_date('2021-06-15','yyyy-MM-dd')
""").show()

[Stage 3:>                                                          (0 + 1) / 1]

## Question 4:  Longest trip for each day

In [None]:
df.columns

In [None]:
df \
    .withColumn('duration_h',
                F.round((F.unix_timestamp('dropoff_datetime') - F.unix_timestamp('pickup_datetime'))/3600, 2)) \
    .groupBy(F.to_date('pickup_datetime')) \
    .agg(F.max('duration_h').alias('max_duration_h')) \
    .orderBy(F.col("max_duration_h").desc()) \
    .limit(1) \
    .show()

## Question 5 Spark’s User Interface which shows application's dashboard runs on which local port?

Answer is 4040

## Question 6 : Most frequent pickup location zone

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [None]:
spark \
    .read \
    .option('header', 'true') \
    .csv('./taxi_zone_lookup.csv') \
    .createOrReplaceTempView("zone")


spark.sql(
    """
    SELECT
        zone,
        COUNT(*) as n_pickup
    FROM
        trips_data t
    JOIN zone z 
        ON t.PULocationID == z.LocationID
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 1
    """
    
).show()