## **Import Necessary Packages**

In [None]:
# check if this instance of the notebook already has files present
!ls

sample_data


In [None]:
!ls
!rm -f spark-3.4.[01]-bin-hadoop3.tgz*
!rm -rf spark-3.4.[01]-bin-hadoop3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
!tar -xf spark-3.4.3-bin-hadoop3.tgz
!ls -alt
print("standalone Spark is now installed")

sample_data  spark-3.4.3-bin-hadoop3  spark-3.4.3-bin-hadoop3.tgz
--2024-05-11 04:04:20--  https://downloads.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.208.237, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.208.237|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388930980 (371M) [application/x-gzip]
Saving to: ‘spark-3.4.3-bin-hadoop3.tgz.1’


2024-05-11 04:04:35 (26.5 MB/s) - ‘spark-3.4.3-bin-hadoop3.tgz.1’ saved [388930980/388930980]

total 759652
drwxr-xr-x  1 root root      4096 May 11 04:04 .
drwxr-xr-x  1 root root      4096 May 11 04:02 ..
drwxr-xr-x  1 root root      4096 May  9 13:24 sample_data
drwxr-xr-x  4 root root      4096 May  9 13:24 .config
-rw-r--r--  1 root root 388930980 Apr 15 01:30 spark-3.4.3-bin-hadoop3.tgz
-rw-r--r--  1 root root 388930980 Apr 15 01:30 spark-3.4.3-bin-hadoop3.tgz.1
drwxr-xr-x 13 1000

In [None]:
!pip3 install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# init spark (ensure SPARK_HOME set to same version as we download earlier)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
# the next line gives us 'local' mode. try 'local[2]' to use 2 cores or 'master:NNNN' to run on Spark standalone cluster at port NNNN
spark_conf = SparkConf().setMaster('local[2]').setAppName('MyApp')
sc = SparkContext(conf=spark_conf)
# see what we have by examining the Spark User Interface
from pyspark.sql import *
from pyspark.sql.functions import *
# "SparkSession" and "sc" are are key handles in to Spark API
##SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("bikes").getOrCreate()

## **Loading the Dataset**

In [None]:
# get file for given year from TfL open data
!wget https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
!unzip cyclehireusagestats-2014.zip

--2024-05-11 04:08:00--  https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
Resolving cycling.data.tfl.gov.uk (cycling.data.tfl.gov.uk)... 104.16.98.104, 104.16.97.104
Connecting to cycling.data.tfl.gov.uk (cycling.data.tfl.gov.uk)|104.16.98.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 225215129 (215M) [application/zip]
Saving to: ‘cyclehireusagestats-2014.zip’


2024-05-11 04:08:02 (111 MB/s) - ‘cyclehireusagestats-2014.zip’ saved [225215129/225215129]

Archive:  cyclehireusagestats-2014.zip
  inflating: 1. Journey Data Extract 05Jan14-02Feb14.csv  
  inflating: 10a. Journey Data Extract 14Sep14-27Sep14.csv  
  inflating: 10b. Journey Data Extract 28Sep14-11Oct14.csv  
  inflating: 11a. Journey Data Extract 12Oct14-08Nov14.csv  
  inflating: 11b. Journey Data Extract 12Oct14-08Nov14.csv  
  inflating: 12a. Journey Data Extract 09Nov14-06Dec14.csv  
  inflating: 12b. Journey Data Extract 09Nov14-06Dec14.csv  
  inflating: 13a. Jour

## **Filter the data to only include rides that Start and End in 2014**

In [None]:
# MULTIPLE FILES INPUT AND ANALYSED for only 2014 rides (rides that start and end in 2014)

file1="./1. Journey*csv" # 05 Jan to 02 Feb
file2="./2. Journey*csv" # 03 Feb to 01 Mar
file3="./3. Journey*csv" # 02 Mar to 31 Mar
file4="./4. Journey*csv" # 01 Apr to 26 Apr
file5="./5. Journey*csv" # 27 Apr to 24 May
file6="./6. Journey*csv" # 25 May to 21 Jun
file7="./7. Journey*csv" # 22 Jun to 19 Jul
file8="./8a Journey*csv" # 20 Jul to 31 Jul
file9="./8b Journey*csv" # 01 Aug to 16 Aug
file10="./9a Journey*csv" # 17 Aug to 31 Aug
file11="./9b Journey*csv" # 01 Sep to 13 Sep
file12="./10a. Journey*csv" # 14 Sep to 27 Sep
file13="./10b. Journey*csv" # 28 Sep to 11 Oct
file14="./11a. Journey*csv" # 12 Oct to 08 Nov
file15="./11b. Journey*csv" # 12 Oct to 08 Sep
file16="./12a. Journey*csv" # 09 Nov to 06 Dec
file17="./12b. Journey*csv" # 09 Nov to 06 Dec
file18="./13a. Journey*csv" # 07 Dec to 21 Dec


# we might say therefore that data in these 18 files corresponds to actual "journey" in 2014
journey_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load([file1, file2, file3, file4, file5, file6, file7, file8, file9,
                file10, file11, file12, file13, file14, file15, file16, file17,
                file18])) # i.e pass a Python list of files to load (into a single DF)

In [None]:
# show top 10
journey_df.show(10)

+---------+--------+-------+----------------+-------------+--------------------+----------------+---------------+--------------------+
|Rental Id|Duration|Bike Id|        End Date|EndStation Id|     EndStation Name|      Start Date|StartStation Id|   StartStation Name|
+---------+--------+-------+----------------+-------------+--------------------+----------------+---------------+--------------------+
| 34263367|    1080|   9076|24/06/2014 00:57|          695|Islington Green, ...|24/06/2014 00:39|            311|Foley Street, Fit...|
| 34603487|     660|   6328|03/07/2014 11:51|          695|Islington Green, ...|03/07/2014 11:40|             22|Northington Stree...|
| 34689078|     120|   2006|05/07/2014 15:09|          357|Howland Street, F...|05/07/2014 15:07|            311|Foley Street, Fit...|
| 34724273|    1260|   7904|06/07/2014 16:35|          695|Islington Green, ...|06/07/2014 16:14|            311|Foley Street, Fit...|
| 34956750|    1500|   8251|13/07/2014 00:41|          

In [None]:
# number of rows
journey_df.count()

10653857

## **Conversion**

In [None]:
new_journey_df = journey_df.select("startstation Id", "startstation Name", "EndStation Id", "EndStation Name",
                                   ((col("Duration")/60).alias('minutes'))
)
new_journey_df.groupBy("StartStation Id", "StartStation Name", "EndStation Name").sum("minutes").orderBy("sum(minutes)", ascending=False).show(10)

+---------------+--------------------+--------------------+------------+
|StartStation Id|   StartStation Name|     EndStation Name|sum(minutes)|
+---------------+--------------------+--------------------+------------+
|            191|Hyde Park Corner,...|Hyde Park Corner,...|   1438308.0|
|            307|Black Lion Gate, ...|Black Lion Gate, ...|    705803.0|
|            303|Albert Gate, Hyde...|Albert Gate, Hyde...|    569525.0|
|            406|Speakers' Corner ...|Speakers' Corner ...|    559325.0|
|            407|Speakers' Corner ...|Speakers' Corner ...|    392088.0|
|            248|Triangle Car Park...|Triangle Car Park...|    363442.0|
|            404|Palace Gate, Kens...|Palace Gate, Kens...|    332747.0|
|            191|Hyde Park Corner,...|Albert Gate, Hyde...|    268338.0|
|            213|Wellington Arch, ...|Wellington Arch, ...|    223989.0|
|            304|Cumberland Gate, ...|Cumberland Gate, ...|    213129.0|
+---------------+--------------------+-------------

## **Identify the "Baylis Road, Waterloo" station**

In [None]:
# Identify the "Baylis Road, Waterloo" station
baylis_road_waterloo = new_journey_df.filter(new_journey_df["StartStation Name"] == "Baylis Road, Waterloo")

**Calculate the average duration of rides starting from Baylis Road, Waterloo**

In [None]:
baylis_road_avg_duration = baylis_road_waterloo.agg({'minutes': 'mean'}).collect()[0][0]
print(f"Average duration of rides starting from Baylis Road, Waterloo: {baylis_road_avg_duration:.2f} minutes")

Average duration of rides starting from Baylis Road, Waterloo: 17.38 minutes


## **Identify "all other stations"**

In [None]:
all_other_stations = new_journey_df.filter(new_journey_df["StartStation Name"] != "Baylis Road, Waterloo")

**Calculate the average duration of rides starting from all other stations**

In [None]:
all_other_avg_duration = all_other_stations.agg({'minutes': 'mean'}).collect()[0][0]
print(f"Average duration of rides starting from all other stations: {all_other_avg_duration:.2f} minutes")

Average duration of rides starting from all other stations: 24.29 minutes


## **Perform a statistical test to compare the means**

In [None]:
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(baylis_road_waterloo.select('minutes').rdd.map(lambda x: x[0]).collect(),
                           all_other_stations.select('minutes').rdd.map(lambda x: x[0]).collect())


**Evaluate the hypothesis**

In [None]:
if p_value < 0.05:
    print("The hypothesis is accepted. The average duration of rides starting from Baylis Road, Waterloo is significantly shorter than the average duration of rides starting from other stations.")
else:
    print("The hypothesis is rejected. There is no statistically significant difference between the average duration of rides starting from Baylis Road, Waterloo and the average duration of rides starting from other stations.")

The hypothesis is accepted. The average duration of rides starting from Baylis Road, Waterloo is significantly shorter than the average duration of rides starting from other stations.


The hypothesis is accepted. The average duration of rides starting from Baylis Road, Waterloo is significantly shorter than the average duration of rides starting from other stations.