# EX5-BATCH: More advanced RDD API programming

Your assignment: complete the `TODO`'s and include also the **output of each cell**.

### Download Bike Trip Data (Feb 2025)

In [1]:
!wget -np https://s3.amazonaws.com/tripdata/202502-citibike-tripdata.zip -P data/
![ -e "data/202502-citibike-tripdata_1.csv" ] || (cd data/ && unzip 202502-citibike-tripdata.zip)

Connecting to s3.amazonaws.com (16.15.193.82:443)
wget: can't open 'data/202502-citibike-tripdata.zip': File exists


### Data is on three files, let us take a look on one (header + a few lines)

In [2]:
!head -3 data/202502-citibike-tripdata_1.csv

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
C1F868EC9F7E49A5,electric_bike,2025-02-06 16:54:02.517,2025-02-06 17:00:48.166,Perry St & Bleecker St,5922.07,Watts St & Greenwich St,5578.02,40.73535398,-74.00483091,40.72405549,-74.00965965,member
668DDE0CFA929D5A,electric_bike,2025-02-14 10:09:49.035,2025-02-14 10:21:57.856,Dock 72 Way & Market St,4804.02,Spruce St & Nassau St,5137.10,40.69985,-73.97141,40.71146364,-74.00552427,member


In [None]:
!unzip data/202502-citibike-tripdata.zip

Archive:  data/202502-citibike-tripdata.zip
replace 202502-citibike-tripdata_3.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

### **Dataset Description**
The dataset contains **bike trip records** with the following columns:

| Column Name            | Description |
|------------------------|-------------|
| `ride_id`             | Unique trip identifier |
| `rideable_type`       | Type of bike used (e.g., docked, electric) |
| `started_at`          | Start timestamp of the trip |
| `ended_at`            | End timestamp of the trip |
| `start_station_name`  | Name of the start station |
| `start_station_id`    | ID of the start station |
| `end_station_name`    | Name of the end station |
| `end_station_id`      | ID of the end station |
| `start_lat`          | Latitude of the start location |
| `start_lng`          | Longitude of the start location |
| `end_lat`            | Latitude of the end location |
| `end_lng`            | Longitude of the end location |
| `member_casual`       | User type (`member` for subscribers, `casual` for non-subscribers) |

### Step 1: Load and Preprocess the Data
1. Start a **PySpark session (or SparkContext)**.
2. Load the dataset as an **RDD**.
3. **Remove the header** and filter out malformed rows.
4. `#TODO` Do the same for each file. Use [Spark Union transformation function](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.union.html) for that.

In [4]:
from pyspark import SparkContext

try:
    sc.stop()
except NameError:
    print("SparkContext not defined")

# Initialize Spark Context
sc = SparkContext(appName="EX5-BIGDATA", master="local[*]") # local execution
# sc = SparkContext(appName="EX5-BIGDATA", master="spark://spark:7077") # cluster execution

# Load data
file_path = "data/202502-citibike-tripdata_1.csv"
raw_rdd = sc.textFile(file_path)

# Remove header
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda row: row != header)

# Split CSV rows into lists
rdd = data_rdd.map(lambda row: row.split(","))

# Filter out malformed rows (should have 13 columns)
valid_rdd = rdd.filter(lambda cols: len(cols) == 13)

valid_rdd.take(2)

SparkContext not defined
ERROR! Session/line number was not unique in database. History logging moved to new session 3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 19:44:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

[['C1F868EC9F7E49A5',
  'electric_bike',
  '2025-02-06 16:54:02.517',
  '2025-02-06 17:00:48.166',
  'Perry St & Bleecker St',
  '5922.07',
  'Watts St & Greenwich St',
  '5578.02',
  '40.73535398',
  '-74.00483091',
  '40.72405549',
  '-74.00965965',
  'member'],
 ['668DDE0CFA929D5A',
  'electric_bike',
  '2025-02-14 10:09:49.035',
  '2025-02-14 10:21:57.856',
  'Dock 72 Way & Market St',
  '4804.02',
  'Spruce St & Nassau St',
  '5137.10',
  '40.69985',
  '-73.97141',
  '40.71146364',
  '-74.00552427',
  'member']]

## Creating a single one csv

In [5]:
!head -n 1 data/202502-citibike-tripdata_1.csv > data/all_citibike.csv
!tail -n +2 -q data/202502-citibike-tripdata_*.csv >> data/all_citibike.csv

In [6]:
from pyspark import SparkContext

try:
    sc.stop()
except NameError:
    print("SparkContext not defined")

# Initialize Spark Context
sc = SparkContext(appName="EX5-BIGDATA", master="local[*]") # local execution
# sc = SparkContext(appName="EX5-BIGDATA", master="spark://spark:7077") # cluster execution

# Load data
file_path = "data/all_citibike.csv"
raw_rdd = sc.textFile(file_path)

# Remove header
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda row: row != header)

# Split CSV rows into lists
rdd = data_rdd.map(lambda row: row.split(","))

# Filter out malformed rows (should have 13 columns)
valid_rdd = rdd.filter(lambda cols: len(cols) == 13)

valid_rdd.take(2)

                                                                                

[['C1F868EC9F7E49A5',
  'electric_bike',
  '2025-02-06 16:54:02.517',
  '2025-02-06 17:00:48.166',
  'Perry St & Bleecker St',
  '5922.07',
  'Watts St & Greenwich St',
  '5578.02',
  '40.73535398',
  '-74.00483091',
  '40.72405549',
  '-74.00965965',
  'member'],
 ['668DDE0CFA929D5A',
  'electric_bike',
  '2025-02-14 10:09:49.035',
  '2025-02-14 10:21:57.856',
  'Dock 72 Way & Market St',
  '4804.02',
  'Spruce St & Nassau St',
  '5137.10',
  '40.69985',
  '-73.97141',
  '40.71146364',
  '-74.00552427',
  'member']]

### Step 2: RDD Partitioning
1. Check the **initial number of partitions**.
2. Repartition the data for better performance (change the number at will).
3. See what happens in the Spark UI.

In [7]:
# check initial partitions
initial_partitions = valid_rdd.getNumPartitions()
print(f"Initial Partitions: {initial_partitions}")

# change the number of partitions (this will trigger a full shuffle, to reorganize data)
partitioned_rdd = valid_rdd.repartition(10)

Initial Partitions: 12


In [8]:
print(partitioned_rdd.toDebugString().decode("utf-8"))

(10) MapPartitionsRDD[8] at coalesce at NativeMethodAccessorImpl.java:0 []
 |   CoalescedRDD[7] at coalesce at NativeMethodAccessorImpl.java:0 []
 |   ShuffledRDD[6] at coalesce at NativeMethodAccessorImpl.java:0 []
 +-(12) MapPartitionsRDD[5] at coalesce at NativeMethodAccessorImpl.java:0 []
    |   PythonRDD[4] at RDD at PythonRDD.scala:53 []
    |   data/all_citibike.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |   data/all_citibike.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


### Step 3: Get the top-3 most Popular starting stations
1. You should get this information and collect to the drive (tip: function [PySpark RDD sortBy](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortBy.html), however, it can be more efficient than that by using the [Reduce Action](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduce.html) -- not to be confused with the [ReduceByKey Transformation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html))
2. Broadcast this information
3. Use the broacast to append to each RDD item a new value: `starting_station_top3`, with values `yes` or `no`

In [9]:
# Extrair só os nomes das estações de início
station_names_rdd = valid_rdd.map(lambda row: row[4])

# Agrupar por nome
grouped_rdd = station_names_rdd.groupBy(lambda name: name)

# Contar ocorrências 
station_counts_rdd = grouped_rdd.map(lambda pair: (pair[0], len(list(pair[1]))))

# Usar reduce para manter só top 3
def top3_reducer(a, b):
    # Garante que 'a' e 'b' sejam listas de tuplas
    if not isinstance(a, list):
        a = [a]
    if not isinstance(b, list):
        b = [b]
    
    # Combina os dois conjuntos de dados
    combined = a + b
    # Ordena e mantém as 3 mais populares
    top_3_combined = sorted(combined, key=lambda x: x[1], reverse=True)[:3]
    
    return top_3_combined

# Printar as 3 estações mais populares
top3_stations = station_counts_rdd.reduce(top3_reducer)
print("Top 3 stations (current step):", top3_stations)

# Broadcast
top3_station_names = set([name for name, _ in top3_stations])
broadcast_top3 = sc.broadcast(top3_station_names)

# Map para adicionar a coluna de top3
def append_top3_flag(row):
    return row + ["yes" if row[4] in broadcast_top3.value else "no"]

tagged_rdd = valid_rdd.map(append_top3_flag)

tagged_rdd.take(5)

                                                                                

Top 3 stations (current step): [('W 21 St & 6 Ave', 8811), ('Pier 61 at Chelsea Piers', 7761), ('Lafayette St & E 8 St', 7190)]


[['C1F868EC9F7E49A5',
  'electric_bike',
  '2025-02-06 16:54:02.517',
  '2025-02-06 17:00:48.166',
  'Perry St & Bleecker St',
  '5922.07',
  'Watts St & Greenwich St',
  '5578.02',
  '40.73535398',
  '-74.00483091',
  '40.72405549',
  '-74.00965965',
  'member',
  'no'],
 ['668DDE0CFA929D5A',
  'electric_bike',
  '2025-02-14 10:09:49.035',
  '2025-02-14 10:21:57.856',
  'Dock 72 Way & Market St',
  '4804.02',
  'Spruce St & Nassau St',
  '5137.10',
  '40.69985',
  '-73.97141',
  '40.71146364',
  '-74.00552427',
  'member',
  'no'],
 ['67C09DF051FD00FA',
  'classic_bike',
  '2025-02-09 17:15:56.170',
  '2025-02-09 17:18:56.244',
  'E 20 St & 2 Ave',
  '5971.08',
  'E 14 St & 1 Ave',
  '5779.10',
  '40.73587678',
  '-73.98205027',
  '40.73139303364151',
  '-73.98286700248718',
  'member',
  'no'],
 ['69A6F44485E8B57D',
  'electric_bike',
  '2025-02-02 11:31:54.899',
  '2025-02-02 11:35:10.734',
  'E 20 St & 2 Ave',
  '5971.08',
  'E 14 St & 1 Ave',
  '5779.10',
  '40.73587678',
  '-73.9

### Step 4: Use Accumulators for Data Statistics
1. Generate:
   - Total trips
   - Trips with missing data
   - Trips by casual riders vs. members

In [10]:
# Accumulators for statistics
total_trips = sc.accumulator(0)
invalid_trips = sc.accumulator(0)
casual_trips = sc.accumulator(0)
member_trips = sc.accumulator(0)

# Função para contar as viagens
def count_trips(row):
    total_trips.add(1)
    
    # Verificar se há dados faltando
    if len(row) != 13 or any(not field for field in row):
        invalid_trips.add(1)
    
    # Contabilizar as viagens por tipo de usuário
    if row[12] == "casual":  
        casual_trips.add(1)
    elif row[12] == "member": 
        member_trips.add(1)

# Aplicar a contagem nas viagens
valid_rdd.foreach(count_trips)

# Exibir os resultados
print(f"Total trips: {total_trips.value}")
print(f"Invalid trips (with missing data): {invalid_trips.value}")
print(f"Casual trips: {casual_trips.value}")
print(f"Member trips: {member_trips.value}")



Total trips: 2031257
Invalid trips (with missing data): 5295
Casual trips: 192759
Member trips: 1838498


                                                                                

### Step 5: Other Insights
1. Average trip duration for members vs. casual riders.
2. Peak riding hours, i.e., the day hour in which more people are riding bikes.

Tip: use `datetime` to format string dates and calculate duration, among other date data manipulations. An example below:

```
start_str = '2025-02-06 16:54:02.517'
end_str = '2025-02-06 17:00:48.166'
start_time = datetime.strptime(cols[2], "%Y-%m-%d %H:%M:%S")
end_time = datetime.strptime(cols[3], "%Y-%m-%d %H:%M:%S")
duration = (end_time - start_time).total_seconds() / 60  # Convert to minutes
```

In [11]:
from datetime import datetime

def parse_row(row):
    try:
        # Parse times
        start_time = datetime.strptime(row[2], "%Y-%m-%d %H:%M:%S.%f")
        end_time = datetime.strptime(row[3], "%Y-%m-%d %H:%M:%S.%f")
        duration = (end_time - start_time).total_seconds() / 60  # minutos
        user_type = row[12]
        ride_hour = start_time.hour
        return (user_type, duration, ride_hour)
    except:
        return None  # Ignora linhas com erro

# Extrair (user_type, duration, ride_hour)
parsed_rdd = valid_rdd.map(parse_row).filter(lambda x: x is not None)

# Calcular duração média por tipo de usuário
# => (user_type, (duration, 1)) para somar duração e contar viagens
durations_rdd = parsed_rdd.map(lambda x: (x[0], (x[1], 1)))  # (tipo, (duração, 1))
sum_counts = durations_rdd.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
averages = sum_counts.mapValues(lambda x: x[0] / x[1]).collectAsMap()

# Descobrir hora de pico
# => Mapear (hour, 1) e contar
hours_rdd = parsed_rdd.map(lambda x: (x[2], 1))  # (hora, 1)
hour_counts = hours_rdd.reduceByKey(lambda a, b: a + b)
peak_hour_data = hour_counts.takeOrdered(1, key=lambda x: -x[1])  # maior contagem

average_casual = averages.get("casual", 0)
average_member = averages.get("member", 0)
print(f"Duração média das viagens de casuais: {average_casual:.2f} minutos")
print(f"Duração média das viagens de membros: {average_member:.2f} minutos")

if peak_hour_data:
    peak_hour, count = peak_hour_data[0]
    print(f"A hora de pico é {peak_hour}:00, com {count} viagens.")
else:
    print("Não há dados suficientes para determinar a hora de pico.")



Duração média das viagens de casuais: 15.10 minutos
Duração média das viagens de membros: 9.64 minutos
A hora de pico é 17:00, com 194468 viagens.


                                                                                