# Exploring Big Data with PySpark and Dask

Welcome! In this notebook, we'll dive into scalable data analysis using two popular Python tools: PySpark and Dask. Instead of working with randomly generated data, we'll use a real-world dataset to make our exploration more meaningful and practical. Let's see how these frameworks help us handle and analyze large datasets efficiently!

## 1. Getting Started: Installing and Importing Libraries

Before we begin, let's make sure we have all the necessary libraries. We'll use PySpark and Dask for big data processing, and Pandas for some quick data wrangling. If you don't have these installed, don't worry—the code below will handle it for you!

In [1]:
# Load and sample the dataset for faster processing
import pandas as pd
import os
nyc_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-01.csv.gz'
local_path = 'yellow_tripdata_2020-01.csv.gz'
if not os.path.exists(local_path):
    import urllib.request
    print('Downloading NYC taxi data...')
    urllib.request.urlretrieve(nyc_url, local_path)
    print('Download complete.')
df = pd.read_csv(local_path, compression='gzip', low_memory=False)
# Use a smaller sample for demonstration
sample_df = df.sample(100000, random_state=42)
sample_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
3340840,2.0,2020-01-17 18:18:36,2020-01-17 18:46:24,1.0,3.6,1.0,N,264,264,1.0,18.5,1.0,0.5,4.56,0.0,0.3,27.36,2.5
4887036,2.0,2020-01-25 10:49:58,2020-01-25 11:07:35,1.0,3.28,1.0,N,142,246,1.0,14.0,0.0,0.5,1.7,0.0,0.3,19.0,2.5
2737398,2.0,2020-01-15 07:30:08,2020-01-15 07:40:01,1.0,1.75,1.0,N,238,166,1.0,8.5,0.0,0.5,1.2,0.0,0.3,13.0,2.5
1452246,2.0,2020-01-09 06:29:09,2020-01-09 06:35:44,1.0,0.87,1.0,N,100,164,2.0,5.5,0.0,0.5,0.0,0.0,0.3,8.8,2.5
5124188,2.0,2020-01-26 12:24:04,2020-01-26 12:29:15,2.0,0.98,1.0,N,161,43,2.0,5.5,0.0,0.5,0.0,0.0,0.3,8.8,2.5


## 2. Loading a Real-World Dataset

To make our analysis more relevant, we'll use a real dataset. For this example, let's work with the NYC Yellow Taxi Trip data, a classic big data resource. If you don't have it locally, the code will download a sample CSV for you.

In [2]:
# PySpark Analysis
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NYCTaxiAnalysis').getOrCreate()
spark_df = spark.createDataFrame(sample_df)
spark_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|     2.0| 2020-01-17 18:18:36|  2020-01-17 18:46:24|            1.0|          3.6|       1.0|                 N|         264|         264|         1.0|       18.5|  1.0|    0.5|      4.56|         0.0|                  0.3

## 3. Analyzing Taxi Data with PySpark

Now, let's see how PySpark can help us process and analyze this large taxi trip dataset. We'll convert our Pandas DataFrame to a PySpark DataFrame and take a quick look at the data.

In [3]:
# Dask Analysis
import dask.dataframe as dd
dask_df = dd.from_pandas(sample_df, npartitions=8)
dask_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
15,1.0,2020-01-01 00:15:35,2020-01-01 00:27:06,3.0,1.6,1.0,N,211,234,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5
42,1.0,2020-01-01 00:54:57,2020-01-01 00:58:50,6.0,0.9,1.0,N,95,95,2.0,5.0,0.5,0.5,0.0,0.0,0.3,6.3,0.0
122,1.0,2020-01-01 00:31:16,2020-01-01 01:05:43,1.0,9.9,1.0,N,231,82,2.0,33.0,2.5,0.5,0.0,0.0,0.3,36.3,2.5
169,1.0,2020-01-01 00:44:41,2020-01-01 00:50:09,1.0,1.2,1.0,N,125,158,1.0,6.0,3.0,0.5,1.96,0.0,0.3,11.76,2.5


## 4. Summary Statistics with PySpark

Let's get a quick overview of our taxi trip data. We'll use PySpark to calculate basic statistics like count, mean, and standard deviation for the numeric columns.

In [4]:
summary = spark_df.select('passenger_count', 'trip_distance', 'fare_amount').describe().toPandas()
print('PySpark Summary Statistics:')
display(summary)

PySpark Summary Statistics:


Unnamed: 0,summary,passenger_count,trip_distance,fare_amount
0,count,100000.0,100000.0,100000.0
1,mean,,2.9104052999999954,12.7381706
2,stddev,,3.8134234594018808,11.861580672417016
3,min,0.0,-21.94,-52.0
4,max,,61.0,304.0


## 5. Average Fare by Passenger Count (PySpark)

Let's see how the average fare amount varies depending on the number of passengers in the taxi. We'll group the data by `passenger_count` and calculate the mean fare.

In [5]:
grouped = spark_df.groupBy('passenger_count').mean('fare_amount').orderBy('passenger_count').toPandas()
print('Average Fare by Passenger Count:')
display(grouped)

Average Fare by Passenger Count:


Unnamed: 0,passenger_count,avg(fare_amount)
0,0.0,12.287666
1,1.0,12.406684
2,2.0,13.027548
3,3.0,12.967511
4,4.0,12.86975
5,5.0,12.528996
6,6.0,12.64218
7,8.0,8.0
8,9.0,9.0
9,,31.935594


## 6. Analyzing Taxi Data with Dask

Dask is another great tool for working with big data in Python. Let's convert our Pandas DataFrame to a Dask DataFrame and take a peek at the data.

In [6]:
import dask.dataframe as dd
dask_df = dd.from_pandas(df, npartitions=8)
dask_df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2020-01-01 00:28:15,2020-01-01 00:33:03,1.0,1.2,1.0,N,238,239,1.0,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1.0,2020-01-01 00:35:39,2020-01-01 00:43:04,1.0,1.2,1.0,N,239,238,1.0,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1.0,2020-01-01 00:47:41,2020-01-01 00:53:52,1.0,0.6,1.0,N,238,238,1.0,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1.0,2020-01-01 00:55:23,2020-01-01 01:00:14,1.0,0.8,1.0,N,238,151,1.0,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2.0,2020-01-01 00:01:58,2020-01-01 00:04:16,1.0,0.0,1.0,N,193,193,2.0,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


## 7. Summary Statistics with Dask

Let's use Dask to quickly summarize the main numeric columns in our taxi trip data.

In [7]:
dask_summary = dask_df[['passenger_count', 'trip_distance', 'fare_amount']].describe().compute()
print('Dask Summary Statistics:')
display(dask_summary)

Dask Summary Statistics:


Unnamed: 0,passenger_count,trip_distance,fare_amount
count,6339567.0,6405008.0,6405008.0
mean,1.515333,2.929644,12.69411
std,1.151594,83.15911,12.1273
min,0.0,-30.62,-1238.0
25%,1.0,1.0,6.5
50%,1.0,1.7,10.0
75%,2.0,3.3,16.5
max,9.0,210240.1,4265.0


## 8. Average Fare by Passenger Count (Dask)

Now let's do the same group-wise analysis with Dask: what's the average fare for each passenger count?

In [8]:
dask_grouped = dask_df.groupby('passenger_count')['fare_amount'].mean().compute()
print('Average Fare by Passenger Count (Dask):')
display(dask_grouped)

Average Fare by Passenger Count (Dask):


Unnamed: 0_level_0,fare_amount
passenger_count,Unnamed: 1_level_1
0.0,12.211342
1.0,12.385213
2.0,12.969849
3.0,12.760572
4.0,13.220595
5.0,12.231725
6.0,12.355435
7.0,62.008276
8.0,49.745882
9.0,69.770526
