In [2]:
pip install pyspark



In [3]:
from pyspark.sql import SparkSession

In [4]:
spark=SparkSession.builder.appName('Uber Data Aanalysis').getOrCreate()

In [5]:
sc = spark.sparkContext

In [6]:
data_with_headerRDD = sc.textFile("/content/sample_data/uber_data")

In [8]:
data_with_headerRDD.count()

355

In [9]:
for line in data_with_headerRDD.take(5):
  print(line)

dispatching_base_number,date,active_vehicles,trips
B02512,1/1/2015,190,1132
B02765,1/1/2015,225,1765
B02764,1/1/2015,3427,29421
B02682,1/1/2015,945,7679


In [10]:
data_with_headerRDD.getNumPartitions()

2

In [11]:
header=data_with_headerRDD.first()

In [12]:
print(header)

dispatching_base_number,date,active_vehicles,trips


In [13]:
uberRDD=data_with_headerRDD.filter(lambda a : a !=header)

In [14]:
uberRDD.count()

354

In [15]:
for line in uberRDD.take(5):
  print(line)

B02512,1/1/2015,190,1132
B02765,1/1/2015,225,1765
B02764,1/1/2015,3427,29421
B02682,1/1/2015,945,7679
B02617,1/1/2015,1228,9537


In [16]:
import datetime

In [17]:
format_date = "%m/%d/%Y"

In [18]:
print(datetime.datetime.strptime("05/11/2025", format_date).strftime("%A"))

Sunday


In [35]:
tupleRDD= uberRDD.map(lambda a : (a.split(",")[0], datetime.datetime.strptime(a.split(",")[1],format_date).strftime("%A"), a.split(",")[3]))

In [36]:
for line in tupleRDD.take(5):
  print(line)

('B02512', 'Thursday', '1132')
('B02765', 'Thursday', '1765')
('B02764', 'Thursday', '29421')
('B02682', 'Thursday', '7679')
('B02617', 'Thursday', '9537')


In [37]:
tupleRDD.count()

354

In [38]:
kvPairRDD = tupleRDD.map(lambda a : (a[0] + " " + a[1], int(a[2])))

In [39]:
for line in kvPairRDD.take(5):
  print(line)

('B02512 Thursday', 1132)
('B02765 Thursday', 1765)
('B02764 Thursday', 29421)
('B02682 Thursday', 7679)
('B02617 Thursday', 9537)


In [40]:
totalTripsRDD = kvPairRDD.reduceByKey(lambda a,b : a+b)

In [42]:
for line in totalTripsRDD.collect():
  print(line)

('B02512 Thursday', 15809)
('B02764 Thursday', 304200)
('B02682 Thursday', 106643)
('B02617 Thursday', 118254)
('B02598 Friday', 93126)
('B02617 Friday', 125067)
('B02512 Friday', 16435)
('B02682 Friday', 114662)
('B02765 Friday', 34934)
('B02764 Friday', 326968)
('B02512 Saturday', 15026)
('B02617 Sunday', 91722)
('B02764 Sunday', 249896)
('B02512 Monday', 11297)
('B02682 Monday', 74939)
('B02617 Monday', 80591)
('B02764 Monday', 214116)
('B02765 Monday', 21974)
('B02765 Tuesday', 22741)
('B02617 Wednesday', 94887)
('B02682 Wednesday', 86252)
('B02764 Wednesday', 241137)
('B02598 Wednesday', 71956)
('B02765 Thursday', 30408)
('B02598 Thursday', 90333)
('B02765 Saturday', 36737)
('B02617 Saturday', 127902)
('B02598 Saturday', 94588)
('B02682 Saturday', 120283)
('B02764 Saturday', 356789)
('B02512 Sunday', 10487)
('B02682 Sunday', 82825)
('B02598 Sunday', 66477)
('B02765 Sunday', 22536)
('B02598 Monday', 60882)
('B02764 Tuesday', 221343)
('B02682 Tuesday', 76905)
('B02617 Tuesday', 8660

In [43]:
sortbyval = totalTripsRDD.sortBy(lambda a : -a[1])

In [44]:
for line in sortbyval.collect():
  print(line)

('B02764 Saturday', 356789)
('B02764 Friday', 326968)
('B02764 Thursday', 304200)
('B02764 Sunday', 249896)
('B02764 Wednesday', 241137)
('B02764 Tuesday', 221343)
('B02764 Monday', 214116)
('B02617 Saturday', 127902)
('B02617 Friday', 125067)
('B02682 Saturday', 120283)
('B02617 Thursday', 118254)
('B02682 Friday', 114662)
('B02682 Thursday', 106643)
('B02617 Wednesday', 94887)
('B02598 Saturday', 94588)
('B02598 Friday', 93126)
('B02617 Sunday', 91722)
('B02598 Thursday', 90333)
('B02617 Tuesday', 86602)
('B02682 Wednesday', 86252)
('B02682 Sunday', 82825)
('B02617 Monday', 80591)
('B02682 Tuesday', 76905)
('B02682 Monday', 74939)
('B02598 Wednesday', 71956)
('B02598 Sunday', 66477)
('B02598 Tuesday', 63429)
('B02598 Monday', 60882)
('B02765 Saturday', 36737)
('B02765 Friday', 34934)
('B02765 Thursday', 30408)
('B02765 Wednesday', 24340)
('B02765 Tuesday', 22741)
('B02765 Sunday', 22536)
('B02765 Monday', 21974)
('B02512 Friday', 16435)
('B02512 Thursday', 15809)
('B02512 Saturday', 

**Joins**

In [45]:
mylist = [100,200,300]

In [46]:
print(mylist)

[100, 200, 300]


In [47]:
listRDD = sc.parallelize(mylist)
for line in listRDD.collect():
  print(line)

100
200
300


In [48]:
rdd1 = sc.parallelize([("a", 1), ("b", 4)])
rdd1.count()

2

In [49]:
for line in rdd1.collect():
  print(line)

('a', 1)
('b', 4)


In [50]:
rdd2 = sc.parallelize([("a", 2), ("a", 3)])
for line in rdd2.collect():
  print(line)

('a', 2)
('a', 3)


In [51]:
#inner join
rdd1.join(rdd2).collect()

[('a', (1, 2)), ('a', (1, 3))]

In [52]:
#left outer join
rdd1.leftOuterJoin(rdd2).collect()

[('b', (4, None)), ('a', (1, 2)), ('a', (1, 3))]

In [53]:
#right outer join
rdd1.rightOuterJoin(rdd2).collect()

[('a', (1, 2)), ('a', (1, 3))]

In [54]:
#full outer join
rdd1.fullOuterJoin(rdd2).collect()

[('b', (4, None)), ('a', (1, 2)), ('a', (1, 3))]