In [1]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,to_timestamp, hour, dayofweek, when

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=5f8978961457a659d9b5731dbaf146aaf4a9b601dadc1e8a93aab2b7cbeda631
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/29 08:24:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

root
 |-- Date/Time: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Lon: double (nullable = true)
 |-- Base: string (nullable = true)



                                                                                

# **initialize spark session**

In [36]:
# initialize spark
spark=SparkSession.builder.appName("Uber Data Analysis").getOrCreate()

# load dataset
uber_data = spark.read.csv("/kaggle/input/uber-pickups-in-new-york-city/uber-raw-data-apr14.csv", header=True, inferSchema = True)
uber_data.printSchema()

root
 |-- Date/Time: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Lon: double (nullable = true)
 |-- Base: string (nullable = true)



                                                                                

In [37]:
uber_data.show()

+----------------+-------+--------+------+
|       Date/Time|    Lat|     Lon|  Base|
+----------------+-------+--------+------+
|4/1/2014 0:11:00| 40.769|-73.9549|B02512|
|4/1/2014 0:17:00|40.7267|-74.0345|B02512|
|4/1/2014 0:21:00|40.7316|-73.9873|B02512|
|4/1/2014 0:28:00|40.7588|-73.9776|B02512|
|4/1/2014 0:33:00|40.7594|-73.9722|B02512|
|4/1/2014 0:33:00|40.7383|-74.0403|B02512|
|4/1/2014 0:39:00|40.7223|-73.9887|B02512|
|4/1/2014 0:45:00| 40.762| -73.979|B02512|
|4/1/2014 0:55:00|40.7524| -73.996|B02512|
|4/1/2014 1:01:00|40.7575|-73.9846|B02512|
|4/1/2014 1:19:00|40.7256|-73.9869|B02512|
|4/1/2014 1:48:00|40.7591|-73.9684|B02512|
|4/1/2014 1:49:00|40.7271|-73.9803|B02512|
|4/1/2014 2:11:00|40.6463|-73.7896|B02512|
|4/1/2014 2:25:00|40.7564|-73.9167|B02512|
|4/1/2014 2:31:00|40.7666|-73.9531|B02512|
|4/1/2014 2:43:00| 40.758|-73.9761|B02512|
|4/1/2014 3:22:00|40.7238|-73.9821|B02512|
|4/1/2014 3:35:00|40.7531|-74.0039|B02512|
|4/1/2014 3:35:00|40.7389|-74.0393|B02512|
+----------

# Data Cleaning

In [38]:
# data cleaning
uber_data = uber_data.dropDuplicates()
uber_data = uber_data.na.fill({'Lat':0.0,'Lon':0.0})

# Transformations

In [39]:
# transformations

# uber_data = uber_data.withColumn('DateTime',to_timestamp(col('Date/Time'),'MM/dd/yyyy HH:mm:ss'))
# uber_data = uber_data.withColumn('DateTime',to_timestamp(col('Date/Time'),'M/d/yyyy H:mm:ss'))

'''tried using above code but because Date/Time column 
has various formatting issues we try to convert this column in a valid format using below code'''

'''rlike is spark sql function that matches strings using regular expression ; similar to like but more
powerfull'''

uber_data = uber_data.withColumn(
    "DateTime",
    when(col("Date/Time").rlike(r"^\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{2}:\d{2}$"),
         to_timestamp(col("Date/Time"), "M/d/yyyy H:mm:ss"))
    .when(col("Date/Time").rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"),
          to_timestamp(col("Date/Time"), "yyyy-MM-dd HH:mm:ss"))
    .otherwise(None)  # Mark invalid rows as None
)

uber_data = uber_data.withColumn('Hour',hour(col('DateTime')))
uber_data = uber_data.withColumn('DayOfWeek',dayofweek(col('DateTime')))
uber_data = uber_data.withColumn('TimeCategory', when((col('Hour')>=5) & (col('Hour')<12), 'Morning').
                                when((col('Hour')>=12) & (col('Hour')<17), 'Afternoon').
                                when((col('Hour')>=17) & (col('Hour')<22),'Evening').
                                otherwise('Night'))


In [40]:
uber_data.show()



+-----------------+-------+--------+------+-------------------+----+---------+------------+
|        Date/Time|    Lat|     Lon|  Base|           DateTime|Hour|DayOfWeek|TimeCategory|
+-----------------+-------+--------+------+-------------------+----+---------+------------+
|4/1/2014 12:32:00|40.7191|-73.9973|B02512|2014-04-01 12:32:00|  12|        3|   Afternoon|
|4/1/2014 17:46:00| 40.753|-73.9701|B02512|2014-04-01 17:46:00|  17|        3|     Evening|
|4/1/2014 19:05:00|40.7578|-73.9722|B02512|2014-04-01 19:05:00|  19|        3|     Evening|
|4/1/2014 22:44:00|40.7627|-73.9832|B02512|2014-04-01 22:44:00|  22|        3|       Night|
| 4/2/2014 7:42:00|40.7707|-73.9632|B02512|2014-04-02 07:42:00|   7|        4|     Morning|
| 4/2/2014 8:06:00|40.7305|-73.9832|B02512|2014-04-02 08:06:00|   8|        4|     Morning|
| 4/2/2014 8:11:00|40.7288|-73.9994|B02512|2014-04-02 08:11:00|   8|        4|     Morning|
|4/2/2014 12:18:00| 40.758|-73.9849|B02512|2014-04-02 12:18:00|  12|        4|  

                                                                                

# Analysis

In [41]:
# Busiest hours for Uber rides
business_hours = uber_data.groupBy('Hour').count().orderBy(col('count').desc())
busiest_hours.show()

[Stage 183:>                                                        (0 + 4) / 4]

+----+-----+
|Hour|count|
+----+-----+
|  17|44888|
|  18|42439|
|  16|41408|
|  19|38380|
|  21|36427|
|  20|35729|
|  15|34835|
|  22|30189|
|  14|26851|
|   7|24624|
|   8|22577|
|  13|22329|
|  23|20335|
|  12|19179|
|  11|18545|
|   6|18224|
|   9|17758|
|  10|17660|
|   0|11716|
|   5| 9302|
+----+-----+
only showing top 20 rows



                                                                                

In [42]:
# Most popular pickup locations
popular_locations = uber_data.groupBy('Lat','Lon').count().orderBy(col('count').desc())
popular_locations.show(10)



+-------+--------+-----+
|    Lat|     Lon|count|
+-------+--------+-----+
|40.6449|-73.7822|  408|
|40.6449|-73.7821|  369|
|40.6449|-73.7823|  364|
| 40.645|-73.7819|  364|
| 40.645| -73.782|  315|
|40.7685|-73.8625|  282|
|40.7741|-73.8725|  233|
|40.7741|-73.8726|  219|
|40.6449|-73.7824|  201|
| 40.774|-73.8726|  189|
+-------+--------+-----+
only showing top 10 rows



                                                                                


# partition the dataset by Base column 

# save the file to working directory of kaggle ; further you can download the file in local machine as well


In [35]:
# Partition data by 'Base' and write to Parquet
uber_data.write.partitionBy("Base").parquet("/kaggle/working/processed_uber_data.parquet", mode="overwrite")


                                                                                

# why partitioning

* improved query performance - When querying the data, only the partitions relevant to the query are read, reducing the amount of data scanned and improving performance

* efficient & management - Data is split into smaller, more manageable chunks based on the values in the base column, making it easier to handle large datasets
  
* Scalability: - Partitioning is especially useful for distributed storage systems (e.g., HDFS, S3), as it enables parallel processing on partitions.
  
* Cost-Effectiveness - In cloud environments (e.g., AWS S3, Databricks), partition pruning reduces the cost of processing by scanning fewer partitions.

# save morning category data in a file

In [43]:
# save morning category data in a file

morning_data = uber_data.filter(col('TimeCategory')=='Morning')
morning_data.write.csv('/kaggle/working/morning_data.csv',header=True, mode="overwrite")

                                                                                