In [1]:
# Install Java and PySpark
!apt-get install openjdk-11-jdk -y
!pip install pyspark

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 35 not upgraded.
Need to get 6,920 kB of archives.
After this operation, 16.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http://archive.ubuntu.com/ubuntu jam

In [2]:
# Set Java environment path
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

In [3]:
# Start Spark Session
from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    pass

spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
print("Spark Session Started")

Spark Session Started


In [4]:
# Import required libraries
import pandas as pd
df = pd.read_csv('/content/train.csv')

In [5]:
# Read the file in spark
df = spark.read.csv("train.csv", header=True, inferSchema=True)

# Show schema and first few rows
df.printSchema()
df.show(5)


root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+-----------

In [6]:
# Row and column count
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")

# Summary statistics
df.describe().show()

Rows: 38123, Columns: 11
+-------+---------+-------------------+------------------+-------------------+--------------------+--------------------+-------------------+------------------+-----------------+
|summary|       id|          vendor_id|   passenger_count|   pickup_longitude|     pickup_latitude|   dropoff_longitude|   dropoff_latitude|store_and_fwd_flag|    trip_duration|
+-------+---------+-------------------+------------------+-------------------+--------------------+--------------------+-------------------+------------------+-----------------+
|  count|    38123|              38123|             38123|              38123|               38123|               38123|              38123|             38123|            38123|
|   mean|     NULL| 1.5303097867429112|1.6676022348713375| -73.97341264974338|  40.751318771554025|  -73.97319497695153|   40.7520642510348|              NULL|937.6292526821079|
| stddev|     NULL|0.49908701708437225|1.3187681581368045|0.03786111765762147|0.02790

In [7]:
# Drop rows with nulls
df_clean = df.dropna()
print(f"After cleaning: {df_clean.count()} rows")

After cleaning: 38123 rows


In [8]:
# Most frequent passager count
df.groupBy("passenger_count").count().orderBy("count", ascending=False).show()

+---------------+-----+
|passenger_count|count|
+---------------+-----+
|              1|27008|
|              2| 5477|
|              5| 2031|
|              3| 1541|
|              6| 1285|
|              4|  781|
+---------------+-----+



In [9]:
# Average trip duration
from pyspark.sql.functions import avg, round
df.select(round(avg("trip_duration"), 2).alias("Avg_Trip_Duration_Seconds")).show()

+-------------------------+
|Avg_Trip_Duration_Seconds|
+-------------------------+
|                   937.63|
+-------------------------+



In [10]:
# Top 5 longest trips
df.orderBy(df["trip_duration"].desc()).select("id", "trip_duration", "pickup_datetime", "dropoff_datetime").show(5)

+---------+-------------+-------------------+-------------------+
|       id|trip_duration|    pickup_datetime|   dropoff_datetime|
+---------+-------------+-------------------+-------------------+
|id0067152|        86357|2016-02-27 21:04:05|2016-02-28 21:03:22|
|id3431345|        86352|2016-06-07 12:58:48|2016-06-08 12:58:00|
|id1428140|        86345|2016-03-04 08:15:24|2016-03-05 08:14:29|
|id3244242|        86343|2016-06-03 17:14:02|2016-06-04 17:13:05|
|id0222074|        86315|2016-06-26 11:55:51|2016-06-27 11:54:26|
+---------+-------------+-------------------+-------------------+
only showing top 5 rows



In [11]:
# Number of Trips per day
from pyspark.sql.functions import to_date
df.withColumn("pickup_date", to_date("pickup_datetime")) \
  .groupBy("pickup_date") \
  .count() \
  .orderBy("pickup_date") \
  .show()

+-----------+-----+
|pickup_date|count|
+-----------+-----+
| 2016-01-01|  183|
| 2016-01-02|  164|
| 2016-01-03|  169|
| 2016-01-04|  173|
| 2016-01-05|  172|
| 2016-01-06|  189|
| 2016-01-07|  201|
| 2016-01-08|  241|
| 2016-01-09|  234|
| 2016-01-10|  208|
| 2016-01-11|  194|
| 2016-01-12|  204|
| 2016-01-13|  216|
| 2016-01-14|  223|
| 2016-01-15|  209|
| 2016-01-16|  259|
| 2016-01-17|  199|
| 2016-01-18|  198|
| 2016-01-19|  201|
| 2016-01-20|  185|
+-----------+-----+
only showing top 20 rows



In [12]:
# Popular pickup zones
from pyspark.sql.functions import round as spark_round
df.withColumn("pickup_lon", spark_round("pickup_longitude", 2)) \
  .withColumn("pickup_lat", spark_round("pickup_latitude", 2)) \
  .groupBy("pickup_lon", "pickup_lat") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(5)

+----------+----------+-----+
|pickup_lon|pickup_lat|count|
+----------+----------+-----+
|    -73.99|     40.75| 2382|
|    -73.97|     40.76| 2342|
|    -73.98|     40.76| 2038|
|    -73.98|     40.75| 1884|
|    -73.99|     40.76| 1716|
+----------+----------+-----+
only showing top 5 rows

