In [1]:
from pyspark.sql import SparkSession

We create a Spark session. To monitor, visit <http://localhost:4040/jobs/>.  
Then, we load the data and create a temporary view to work with SQL.

In [2]:
# Spark session.
spark = SparkSession.builder \
    .appName("Cyclistic cleaning data") \
    .getOrCreate()

# Load data.
df = spark.read.csv("Data/cyclistic/2023_cyclistic_tripdata.csv", header=True, inferSchema=True)

# Create temporary view for SQL queries.
df.createOrReplaceTempView("cyclistic_data")

24/02/11 21:37:02 WARN Utils: Your hostname, DS-A90101.local resolves to a loopback address: 127.0.0.1; using 192.168.100.217 instead (on interface en0)
24/02/11 21:37:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/11 21:37:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/11 21:37:20 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

## Checking columns

### `ride_id`
Count the `ride_id` and compare with the total rows. We can see they match, we have no repeated ids.

In [6]:
query = """
SELECT
    COUNT(*) AS Rows,
    COUNT(DISTINCT ride_id) AS Unique_IDs
FROM
    cyclistic_data;
"""

spark.sql(query).toPandas()

24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 15:07:23 WARN RowBasedKeyValueBatch: Calling spill() on

Unnamed: 0,Rows,Unique_IDs
0,5719877,5719877


### `rideable_type`

We can see that there are three unique bicycle types. This columns is also clean.

In [7]:
query = """
SELECT
    rideable_type,
    COUNT(*) AS Rides
FROM
    cyclistic_data
GROUP BY
    rideable_type
ORDER BY
    Rides DESC
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,rideable_type,Rides
0,electric_bike,2945579
1,classic_bike,2696011
2,docked_bike,78287


### Dates

We can see that in both columns, there are entries from the year 2022, and rides that ended in 2024. Since we are interested in data from 2023, we'll remove these entries.


In [8]:
query = """
SELECT
    MIN(started_at),
    MAX(started_at),
    MIN(ended_at),
    MAX(ended_at)
FROM
    cyclistic_data;
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,min(started_at),max(started_at),min(ended_at),max(ended_at)
0,2022-12-31 18:01:58,2023-12-31 17:59:38,2022-12-31 18:02:41,2024-01-01 17:50:51


#### Removing entries that are not from the year 2023

In [3]:
query = """
SELECT
    *
FROM
    cyclistic_data
WHERE
    (started_at >= "2023-01-01 00:00:00" AND
    started_at <= "2023-12-31 23:59:59") AND
    (ended_at >= "2023-01-01 00:00:00" AND
    ended_at <= "2023-12-31 23:59:59")
ORDER BY
    started_at ASC;
"""

df2 = spark.sql(query)
df2.createOrReplaceTempView("cyclistic_data_2")

After removing those entries, our table has 5,718,838 rows.

In [10]:
query = """
SELECT
    COUNT(*) AS Rows
FROM
    cyclistic_data_2
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,Rows
0,5718838


### Station ids and names

We found two issues:
- There is no information about some stations.
- The naming of the stations looks inconsistent in some cases.

#### Removing NAs

We can see that there are rides where we don't have the start information (id, name, latitude, and longitude).  
For simplicity in the analysis, we assume that the data is not enough to infer more information about these stations with the available data (i.e. we cannot infer with latitud and longitude coordinates or station id the name of a missing station).

This is the data that we want to remove.

In [4]:
query = """
SELECT
    *
FROM
    cyclistic_data_2
WHERE
    start_station_name = "NA" OR
    start_station_id = "NA" OR
    start_lat = "NA" OR
    start_lng = "NA" OR
    end_station_name = "NA" OR
    end_station_id = "NA" OR
    end_lat = "NA" OR
    end_lng = "NA"
"""

aux_df = spark.sql(query)
aux_df.createOrReplaceTempView("aux")

Now we remove it from the table and save it into a new one.

In [5]:
query = """
SELECT
    *
FROM
    cyclistic_data_2
WHERE NOT EXISTS (
    SELECT 1
    FROM aux
    WHERE cyclistic_data_2.ride_id = aux.ride_id
);
"""

df3 = spark.sql(query)
df3.createOrReplaceTempView("cyclistic_data_3")

After removing NA rows, 4,330,969 is the new size of our table.

In [13]:
query = """
SELECT
    COUNT(*) AS Rows
FROM
    cyclistic_data_3
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,Rows
0,4330969


#### Character length

It seems that the name is the same as the id.

In [15]:
query = """
SELECT
    start_station_name,
    SUM(CASE
        WHEN start_station_name = start_station_id THEN 1
        ELSE 0
    END) AS same_name_and_id
FROM
    cyclistic_data_3
GROUP BY
    start_station_name
HAVING
    same_name_and_id > 0
LIMIT
    10;
"""

spark.sql(query).toPandas()

24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/11 21:51:59 WARN RowBasedKeyValueBatch: Calling spill() on

Unnamed: 0,start_station_name,same_name_and_id
0,OH Charging Stx - Test,13
1,410,3


In [19]:
query = """
SELECT
    start_station_name,
    start_station_id,
    CASE
        WHEN start_station_name = start_station_id THEN 1
        ELSE 0
    END AS same_name_and_id
FROM
    cyclistic_data_3
WHERE
    CASE
        WHEN start_station_name = start_station_id THEN 1
        ELSE 0
    END = 0
LIMIT
    20;
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,start_station_name,start_station_id,same_name_and_id
0,Green St & Washington Blvd,13053,0
1,Rush St & Superior St,15530,0
2,Halsted St & Roscoe St,TA1309000025,0
3,State St & 33rd St,13216,0
4,Wabash Ave & Roosevelt Rd,TA1305000002,0
5,State St & 33rd St,13216,0
6,Morgan Ave & 14th Pl,TA1306000002,0
7,Elston Ave & Wabansia Ave,TA1309000032,0
8,Wabash Ave & 16th St,SL-012,0
9,Southport Ave & Wellington Ave,TA1307000006,0


In [17]:
query = """
SELECT
    start_station_name,
    start_station_id,
    CASE
        WHEN start_station_name = start_station_id THEN 1
        ELSE 0
    END AS same_name_and_id
FROM
    cyclistic_data_3
WHERE
    start_station_name = "OH Charging Stx - Test"
LIMIT
    20;
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,start_station_name,start_station_id,same_name_and_id
0,OH Charging Stx - Test,OH Charging Stx - Test,1
1,OH Charging Stx - Test,OH Charging Stx - Test,1
2,OH Charging Stx - Test,OH Charging Stx - Test,1
3,OH Charging Stx - Test,OH Charging Stx - Test,1
4,OH Charging Stx - Test,OH Charging Stx - Test,1
5,OH Charging Stx - Test,OH Charging Stx - Test,1
6,OH Charging Stx - Test,OH Charging Stx - Test,1
7,OH Charging Stx - Test,OH Charging Stx - Test,1
8,OH Charging Stx - Test,OH Charging Stx - Test,1
9,OH Charging Stx - Test,OH Charging Stx - Test,1


### `member_casual`

We can see that our labels are correct for this column. We expect only two.

In [14]:
query = """
SELECT
    member_casual,
    COUNT(ride_id) AS Rides
FROM
    cyclistic_data
GROUP BY
    member_casual;
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,member_casual,Rides
0,casual,2059179
1,member,3660698


Now the data is clean and we can export it.  
It is important to note that the unbalanced class problem is still present, but we'll solve it using R.

To export the data, we need to repartition the data to get a single csv file.

In [15]:
df3.repartition(1).write.csv("Data/cyclistic/2023_cyclistic_tripdata_clean", header=True)

                                                                                

In [16]:
!mv Data/cyclistic/2023_cyclistic_tripdata_clean/*.csv Data/cyclistic/2023_cyclistic_tripdata_clean.csv

In [17]:
!rm -r Data/cyclistic/2023_cyclistic_tripdata_clean/

In [18]:
!ls Data/cyclistic/

202301-divvy-tripdata.csv         202308-divvy-tripdata.csv
202302-divvy-tripdata.csv         202309-divvy-tripdata.csv
202303-divvy-tripdata.csv         202310-divvy-tripdata.csv
202304-divvy-tripdata.csv         202311-divvy-tripdata.csv
202305-divvy-tripdata.csv         202312-divvy-tripdata.csv
202306-divvy-tripdata.csv         2023_cyclistic_tripdata.csv
202307-divvy-tripdata.csv         2023_cyclistic_tripdata_clean.csv


In [3]:
df = spark.read.csv("Data/cyclistic/2023_cyclistic_tripdata_clean.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("cyclistic_data")

24/02/11 15:19:39 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [4]:
query = """
SELECT
    COUNT(*) AS Rows
FROM
    cyclistic_data;
"""

spark.sql(query).toPandas()

                                                                                

Unnamed: 0,Rows
0,4330969


In [15]:
spark.stop()

ConnectionRefusedError: [Errno 61] Connection refused