# The objective of this worksheet is to demonstrate how RDDs can be created from large datasets from a csv file.

# Main steps:

### 1. Read the CSV file using textFile method
### 2. Split each line by comma and parse columns
### 3. Apply the desired transformations and actions

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext("local[*]","Traffic Data Analysis using RDDs")

### The following textFile() method reads the file as an RDD of strings, where each line in the file is a single element in the RDD

In [4]:
TD_RDD = sc.textFile("UK_Traffic_Data_Small.csv")

In [5]:
TD_RDD.count()

500

### Since the first row in our CSV file is a header, we need to remove it to get data only RDD for later processes. We use the following filter() transformation for this purpose.

In [6]:
header = TD_RDD.first()

data_RDD = TD_RDD.filter(lambda row: row != header)

In [7]:
header

'count_point_id,year,region_id,region_name,region_ons_code,local_authority_id,local_authority_name,local_authority_code,road_name,road_category,road_type,start_junction_road_name,end_junction_road_name,easting,northing,latitude,longitude,link_length_km,link_length_miles,estimation_method,estimation_method_detailed,direction_of_travel,pedal_cycles,two_wheeled_motor_vehicles,cars_and_taxis,buses_and_coaches,LGVs,HGVs_2_rigid_axle,HGVs_3_rigid_axle,HGVs_4_or_more_rigid_axle,HGVs_3_or_4_articulated_axle,HGVs_5_articulated_axle,HGVs_6_articulated_axle,all_HGVs,all_motor_vehicles'

### Since each element in the RDD is a line of text, we need to split each line by commas to turn it into a list of fields. We use the map() transformation for this purpose. Map  method converts each line from a single string into a list of values, where each value corresponds to a CSV column.

In [8]:
parsed_RDD = data_RDD.map(lambda row: row.split(","))

### We want to extract year and region from the RDD. Since the data in each column is text, we need to type-cast it. Column-2 (index 1) is year, we need to conevrt it to integer for later computations. The following map() transformation will extract the desired columns data.

In [9]:
year_region_RDD = parsed_RDD.map(lambda row: (int(row[1]), row[3]))

In [10]:
year_region_RDD.collect()

[(2000, 'South West'),
 (2001, 'South West'),
 (2002, 'South West'),
 (2003, 'South West'),
 (2004, 'South West'),
 (2004, 'South West'),
 (2005, 'South West'),
 (2005, 'South West'),
 (2006, 'South West'),
 (2006, 'South West'),
 (2007, 'South West'),
 (2007, 'South West'),
 (2008, 'South West'),
 (2008, 'South West'),
 (2009, 'South West'),
 (2009, 'South West'),
 (2010, 'South West'),
 (2010, 'South West'),
 (2011, 'South West'),
 (2011, 'South West'),
 (2012, 'South West'),
 (2012, 'South West'),
 (2013, 'South West'),
 (2013, 'South West'),
 (2014, 'South West'),
 (2014, 'South West'),
 (2015, 'South West'),
 (2015, 'South West'),
 (2016, 'South West'),
 (2016, 'South West'),
 (2017, 'South West'),
 (2017, 'South West'),
 (2018, 'South West'),
 (2018, 'South West'),
 (2019, 'South West'),
 (2019, 'South West'),
 (2020, 'South West'),
 (2020, 'South West'),
 (2021, 'South West'),
 (2021, 'South West'),
 (2022, 'South West'),
 (2022, 'South West'),
 (2023, 'South West'),
 (2023, 'So

### We want to filter the RDD to retain only rows where year is greater than 2005.

In [11]:
year_region_RDD.filter(lambda x: x[0] > 2005).collect()

[(2006, 'South West'),
 (2006, 'South West'),
 (2007, 'South West'),
 (2007, 'South West'),
 (2008, 'South West'),
 (2008, 'South West'),
 (2009, 'South West'),
 (2009, 'South West'),
 (2010, 'South West'),
 (2010, 'South West'),
 (2011, 'South West'),
 (2011, 'South West'),
 (2012, 'South West'),
 (2012, 'South West'),
 (2013, 'South West'),
 (2013, 'South West'),
 (2014, 'South West'),
 (2014, 'South West'),
 (2015, 'South West'),
 (2015, 'South West'),
 (2016, 'South West'),
 (2016, 'South West'),
 (2017, 'South West'),
 (2017, 'South West'),
 (2018, 'South West'),
 (2018, 'South West'),
 (2019, 'South West'),
 (2019, 'South West'),
 (2020, 'South West'),
 (2020, 'South West'),
 (2021, 'South West'),
 (2021, 'South West'),
 (2022, 'South West'),
 (2022, 'South West'),
 (2023, 'South West'),
 (2023, 'South West'),
 (2006, 'South West'),
 (2006, 'South West'),
 (2007, 'South West'),
 (2007, 'South West'),
 (2008, 'South West'),
 (2008, 'South West'),
 (2009, 'South West'),
 (2009, 'So

### Only Scotland region data

In [12]:
year_region_RDD.filter(lambda x: x[1] == 'Scotland').collect()

[(2012, 'Scotland'),
 (2013, 'Scotland'),
 (2014, 'Scotland'),
 (2015, 'Scotland'),
 (2016, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2022, 'Scotland'),
 (2023, 'Scotland'),
 (2012, 'Scotland'),
 (2013, 'Scotland'),
 (2014, 'Scotland'),
 (2015, 'Scotland'),
 (2016, 'Scotland'),
 (2016, 'Scotland'),
 (2017, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2021, 'Scotland'),
 (2022, 'Scotland'),
 (2022, 'Scotland'),
 (2023, 'Scotland'),
 (2023, 'Scotland'),
 (2012, 'Scotland'),
 (2013, 'Scotland'),
 (2014, 'Scotland'),
 (2015, 'Scotland'),
 (2016, 'Scotland'),
 (2016, 'Scotland'),
 (2017, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2021, 'Scot

### Only Scotland region with year greater than 2015.

In [13]:
year_region_RDD.filter(lambda x: (x[0] > 2015) and (x[1] == 'Scotland')).collect()

[(2016, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2022, 'Scotland'),
 (2023, 'Scotland'),
 (2016, 'Scotland'),
 (2016, 'Scotland'),
 (2017, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2021, 'Scotland'),
 (2022, 'Scotland'),
 (2022, 'Scotland'),
 (2023, 'Scotland'),
 (2023, 'Scotland'),
 (2016, 'Scotland'),
 (2016, 'Scotland'),
 (2017, 'Scotland'),
 (2017, 'Scotland'),
 (2018, 'Scotland'),
 (2018, 'Scotland'),
 (2019, 'Scotland'),
 (2019, 'Scotland'),
 (2020, 'Scotland'),
 (2020, 'Scotland'),
 (2021, 'Scotland'),
 (2021, 'Scotland'),
 (2022, 'Scotland'),
 (2022, 'Scotland'),
 (2023, 'Scotland'),
 (2023, 'Scotland')]

### We want to calculate traffic distribution across road types. In our main RDD (parsed_RDD) road_type = index 10, cars_and_taxis = index 24. 

In [14]:
traffic_road_RDD = parsed_RDD.map(lambda row: (row[10], int(row[24])))

In [15]:
traffic_road_RDD.collect()

[('Major', 87),
 ('Major', 83),
 ('Major', 102),
 ('Major', 131),
 ('Major', 19),
 ('Major', 18),
 ('Major', 18),
 ('Major', 17),
 ('Major', 19),
 ('Major', 18),
 ('Major', 19),
 ('Major', 18),
 ('Major', 19),
 ('Major', 18),
 ('Major', 20),
 ('Major', 19),
 ('Major', 18),
 ('Major', 17),
 ('Major', 18),
 ('Major', 17),
 ('Major', 49),
 ('Major', 40),
 ('Major', 51),
 ('Major', 42),
 ('Major', 56),
 ('Major', 46),
 ('Major', 54),
 ('Major', 44),
 ('Major', 54),
 ('Major', 44),
 ('Major', 52),
 ('Major', 43),
 ('Major', 51),
 ('Major', 42),
 ('Major', 55),
 ('Major', 45),
 ('Major', 10),
 ('Major', 6),
 ('Major', 10),
 ('Major', 7),
 ('Major', 11),
 ('Major', 7),
 ('Major', 10),
 ('Major', 7),
 ('Major', 617),
 ('Major', 632),
 ('Major', 250),
 ('Major', 270),
 ('Major', 259),
 ('Major', 280),
 ('Major', 264),
 ('Major', 285),
 ('Major', 261),
 ('Major', 282),
 ('Major', 263),
 ('Major', 284),
 ('Major', 256),
 ('Major', 277),
 ('Major', 252),
 ('Major', 272),
 ('Major', 254),
 ('Major'

In [16]:
traffic_road_RDD.filter(lambda x: x[1] > 20000).collect()

[('Major', 41599),
 ('Major', 40997),
 ('Major', 46117),
 ('Major', 46240),
 ('Major', 23559),
 ('Major', 23724),
 ('Major', 23533),
 ('Major', 23699),
 ('Major', 23333),
 ('Major', 23497),
 ('Major', 23691),
 ('Major', 23857),
 ('Major', 20846),
 ('Major', 21328),
 ('Major', 20881),
 ('Major', 21393),
 ('Major', 21226),
 ('Major', 46387),
 ('Major', 49442),
 ('Major', 50812),
 ('Major', 51411),
 ('Major', 26252),
 ('Major', 26309),
 ('Major', 26223),
 ('Major', 26281),
 ('Major', 26000),
 ('Major', 26057),
 ('Major', 26398),
 ('Major', 26456),
 ('Major', 22099),
 ('Major', 22148),
 ('Major', 22713),
 ('Major', 22763),
 ('Major', 36529),
 ('Major', 39057),
 ('Major', 33994),
 ('Major', 38455),
 ('Major', 38583),
 ('Major', 40976),
 ('Major', 37657),
 ('Major', 39993),
 ('Major', 44171),
 ('Major', 45699),
 ('Major', 44171),
 ('Major', 45699),
 ('Major', 40964),
 ('Major', 46701),
 ('Major', 41251),
 ('Major', 47028),
 ('Major', 40302),
 ('Major', 45946),
 ('Major', 46127),
 ('Major', 4

### We want to calculate traffic distribution across road types. In our main RDD (parsed_RDD), region_name = index 3, cars_and_taxis = index 24.

In [17]:
traffic_region_RDD = parsed_RDD.map(lambda row: (row[3], int(row[24])))

In [18]:
traffic_region_RDD.reduceByKey(lambda a, b: a + b).collect()

[('South West', 200149),
 ('Scotland', 1158206),
 ('East Midlands', 151884),
 ('Wales', 4467191)]

In [19]:
# Calculating traffic distribution by region
# region_name = index 3, cars_and_taxis = index 24, buses_coaches = 25 

traffic_distribution_region_RDD = parsed_RDD.map(lambda row: (row[3], int(row[24]), int(row[25])))

In [20]:
traffic_distribution_region_RDD.collect()

[('South West', 87, 837),
 ('South West', 83, 857),
 ('South West', 102, 893),
 ('South West', 131, 926),
 ('South West', 19, 208),
 ('South West', 18, 268),
 ('South West', 18, 205),
 ('South West', 17, 265),
 ('South West', 19, 207),
 ('South West', 18, 268),
 ('South West', 19, 202),
 ('South West', 18, 260),
 ('South West', 19, 198),
 ('South West', 18, 256),
 ('South West', 20, 199),
 ('South West', 19, 258),
 ('South West', 18, 192),
 ('South West', 17, 248),
 ('South West', 18, 191),
 ('South West', 17, 246),
 ('South West', 49, 297),
 ('South West', 40, 228),
 ('South West', 51, 298),
 ('South West', 42, 229),
 ('South West', 56, 302),
 ('South West', 46, 231),
 ('South West', 54, 306),
 ('South West', 44, 235),
 ('South West', 54, 310),
 ('South West', 44, 238),
 ('South West', 52, 310),
 ('South West', 43, 238),
 ('South West', 51, 308),
 ('South West', 42, 237),
 ('South West', 55, 309),
 ('South West', 45, 237),
 ('South West', 10, 261),
 ('South West', 6, 206),
 ('South We

In [21]:
traffic_distribution_region_RDD.map(lambda x: (x[0], x[1] + x[2])).collect()

[('South West', 924),
 ('South West', 940),
 ('South West', 995),
 ('South West', 1057),
 ('South West', 227),
 ('South West', 286),
 ('South West', 223),
 ('South West', 282),
 ('South West', 226),
 ('South West', 286),
 ('South West', 221),
 ('South West', 278),
 ('South West', 217),
 ('South West', 274),
 ('South West', 219),
 ('South West', 277),
 ('South West', 210),
 ('South West', 265),
 ('South West', 209),
 ('South West', 263),
 ('South West', 346),
 ('South West', 268),
 ('South West', 349),
 ('South West', 271),
 ('South West', 358),
 ('South West', 277),
 ('South West', 360),
 ('South West', 279),
 ('South West', 364),
 ('South West', 282),
 ('South West', 362),
 ('South West', 281),
 ('South West', 359),
 ('South West', 279),
 ('South West', 364),
 ('South West', 282),
 ('South West', 271),
 ('South West', 212),
 ('South West', 296),
 ('South West', 232),
 ('South West', 318),
 ('South West', 248),
 ('South West', 324),
 ('South West', 254),
 ('South West', 633),
 ('South 

### We want to find total number of cars and taxis, and buses and coaches for each region.

In [24]:
# Map each entry to (key, (value1, value2))

traffic_distribution_region_group_RDD = traffic_distribution_region_RDD.map(lambda row: (row[0], (row[1], row[2])))

In [25]:
traffic_distribution_region_group_RDD.collect()

[('South West', (87, 837)),
 ('South West', (83, 857)),
 ('South West', (102, 893)),
 ('South West', (131, 926)),
 ('South West', (19, 208)),
 ('South West', (18, 268)),
 ('South West', (18, 205)),
 ('South West', (17, 265)),
 ('South West', (19, 207)),
 ('South West', (18, 268)),
 ('South West', (19, 202)),
 ('South West', (18, 260)),
 ('South West', (19, 198)),
 ('South West', (18, 256)),
 ('South West', (20, 199)),
 ('South West', (19, 258)),
 ('South West', (18, 192)),
 ('South West', (17, 248)),
 ('South West', (18, 191)),
 ('South West', (17, 246)),
 ('South West', (49, 297)),
 ('South West', (40, 228)),
 ('South West', (51, 298)),
 ('South West', (42, 229)),
 ('South West', (56, 302)),
 ('South West', (46, 231)),
 ('South West', (54, 306)),
 ('South West', (44, 235)),
 ('South West', (54, 310)),
 ('South West', (44, 238)),
 ('South West', (52, 310)),
 ('South West', (43, 238)),
 ('South West', (51, 308)),
 ('South West', (42, 237)),
 ('South West', (55, 309)),
 ('South West', (4

In [26]:
traffic_distribution_region_group_RDD.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])).collect()

[('South West', (200149, 18970)),
 ('Scotland', (1158206, 5836)),
 ('East Midlands', (151884, 888)),
 ('Wales', (4467191, 22566))]

# Explore the following questions.

#### What are the top 10 roads with the highest number of cars and taxis?

#### What are the top 10 roads with the highest traffic volumes? [Hint: You have to add up all types of traffic for each type of road.]