# Data Loading

In [None]:
!pip install pyspark

In [None]:
# see what already avail and thus determine which steps required prior to reading in file and handling the data
# if you see more than "sample_data" you can jump to the relevant step below
!ls

In [None]:
# set-up spark (NB if Apache amend versions on download site we will need to amend path in wget command)
print("\nWelcome to advanced top sites")
!ls
!rm -f spark-3.3.[01]-bin-hadoop3.tgz* 
!rm -rf spark-3.3.[01]-bin-hadoop3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xf spark-3.3.2-bin-hadoop3.tgz
!ls -alt


In [None]:
# install findspark if not already installed
!pip3 install findspark


In [None]:
# init spark (ensure SPARK_HOME set to same version as we download earlier)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
# the next line gives us 'local' mode. try 'local[2]' to use 2 cores or 'master:NNNN' to run on Spark standalone cluster at port NNNN
spark_conf = SparkConf().setMaster('local[2]').setAppName('MyApp')
sc = SparkContext(conf=spark_conf)
# see what we have by examining the Spark User Interface
from pyspark.sql import *
from pyspark.sql.functions import *
# "SparkSession" and "sc" are are key handles in to Spark API
##SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("bikes").getOrCreate()

In [None]:
# get file for given year from TfL open data
!wget https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
!unzip cyclehireusagestats-2014.zip



# First Hypothesis

 “In 2014, people ride for longer in Autumn than in Spring” 

Autumn Data

In [None]:
# MULTIPLE FILES INPUT AND ANALYSED (presuming identical schema - should test first!)
file1="./10a. Journey*csv" # 14 Sep to 27 Sep
file2="./10b. Journey*csv" # 28 Sep to 11 Oct
file3="./11a. Journey*csv" # 12 Oct to 08 Nov
file4="./12b. Journey*csv" # 09 Nov to 06 Dec
file5="./13a. Journey*csv" # 07 Dec to 21 Dec
# we might say therefore that data in these 5 files corresponds to "autumn" in the UK
autumn_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load([file1, file2, file3, file4, file5])) # i.e pass a Python list of files to load (into a single DF)

In [None]:
autumn_df.show(10)

In [None]:
autumn_df = autumn_df.withColumn("Duration", col("Duration").cast("float")) 

In [None]:
autumn_df.dtypes

Preprocessing

Removing the rows that are not in autumn season

In [None]:
# Specify the date columns to be converted to timestamp
date_columns = ["End Date", "Start Date"]

# Convert the date columns to timestamp format
for column in date_columns:
    autumn_df = autumn_df.withColumn(column, to_timestamp(col(column), "dd/MM/yyyy HH:mm"))

In [None]:
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
import pandas as pd

# Convert the filter date string to timestamp
filter_date = "23/09/2014 00:00"
filter_timestamp = pd.to_datetime(filter_date, format="%d/%m/%Y %H:%M")

# Filter the DataFrame to remove rows before the specified date
autumn_df = autumn_df.filter(col("Start Date") >= filter_timestamp)

In [None]:
# TO DO: how many rows do we have in the autumn_df DF?
autumn_df.count()

Inspecting for the missing values

In [None]:
from pyspark.sql.functions import col, sum

# Check for missing values in each column
missing_values = autumn_df.agg(*[
    sum(col(column).isNull().cast("integer")).alias(column)
    for column in autumn_df.columns
])

# Display the count of missing values in each column
missing_values.show()


Inspecting and removing outliers

In [None]:
import matplotlib.pyplot as plt

# Select the column to check for outliers
column_name = "Duration"

# Extract the column as a list
autumn_durations = autumn_df.select(column_name).rdd.flatMap(lambda x: x).collect()

# Create a box plot
plt.boxplot(autumn_durations)
plt.xlabel(column_name)
plt.ylabel("Value")
plt.title("Box Plot - " + column_name)
plt.show()


In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame to select rows with duration below 0
negative_duration_df = autumn_df.filter(col("Duration") < 0)

# Show the resulting rows
negative_duration_df.count()


In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame to remove rows with duration below 0
autumn_df = autumn_df.filter(col("Duration") >= 0)


In [None]:
summary_stats = autumn_df.select("Duration").describe()
summary_stats.show()

Spring Data

In [None]:
# MULTIPLE FILES INPUT AND ANALYSED (presuming identical schema - should test first!)
file1="./3. Journey*csv" # 02 Mar to 31 Mar
file2="./4. Journey*csv" # 01 Apr to 26 Apr
file3="./5. Journey*csv" # 27 Apr to 24 May
file4="./6. Journey*csv" # 25 May to 21 Jun
# we might say therefore that data in these 5 files corresponds to "autumn" in the UK
spring_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load([file1, file2, file3, file4])) # i.e pass a Python list of files to load (into a single DF)

In [None]:
spring_df.show(10)

In [None]:
spring_df = spring_df.withColumn("Duration", col("Duration").cast("float")) 

In [None]:
# Specify the date columns to be converted to timestamp
date_columns = ["End Date", "Start Date"]

# Convert the date columns to timestamp format
for column in date_columns:
    spring_df = spring_df.withColumn(column, to_timestamp(col(column), "dd/MM/yyyy HH:mm"))

In [None]:
spring_df.dtypes

Preprocessing

Removing the rows that are not in spring season

In [None]:
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
import pandas as pd

# Convert the filter date string to timestamp
filter_date = "20/03/2014 00:00"
filter_timestamp = pd.to_datetime(filter_date, format="%d/%m/%Y %H:%M")

# Filter the DataFrame to remove rows before the specified date
spring_df = spring_df.filter(col("Start Date") >= filter_timestamp)

In [None]:
# TO DO: how many rows do we have in the autumn_df DF?
spring_df.count()

Inspecting for the missing values

In [None]:
from pyspark.sql.functions import col, sum

# Check for missing values in each column
missing_values = spring_df.agg(*[
    sum(col(column).isNull().cast("integer")).alias(column)
    for column in spring_df.columns
])

# Display the count of missing values in each column
missing_values.show()


Inspecting and removing outliers

In [None]:
import matplotlib.pyplot as plt

# Select the column to check for outliers
column_name = "Duration"

# Extract the column as a list
spring_durations = spring_df.select(column_name).rdd.flatMap(lambda x: x).collect()

# Create a box plot
plt.boxplot(spring_durations)
plt.xlabel(column_name)
plt.ylabel("Value")
plt.title("Box Plot - " + column_name)
plt.show()


In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame to select rows with duration below 0
negative_duration_df = spring_df.filter(col("Duration") < 0)

# Show the resulting rows
negative_duration_df.count()


In [None]:
summary_stats = spring_df.select("Duration").describe()
summary_stats.show()

In [None]:
seasons = ['Autumn', 'Spring']
autumn_avg_duration = autumn_df.agg({'Duration': 'mean'}).first()[0]
spring_avg_duration = spring_df.agg({'Duration': 'mean'}).first()[0]
ride_durations = [autumn_avg_duration, spring_avg_duration]

plt.bar(seasons, ride_durations)
plt.xlabel('Seasons')
plt.ylabel('Average Ride Duration')
plt.title('Comparison of Average Ride Durations in Autumn and Spring')
plt.show()

In [None]:
seasons = ['Autumn', 'Spring']
ride_durations = [autumn_durations, spring_durations]  
plt.boxplot(ride_durations, labels=seasons)
plt.xlabel('Seasons')
plt.ylabel('Ride Duration')
plt.title('Comparison of Ride Durations in Autumn and Spring')
plt.show()

Hypothesis Testing

In [None]:
from scipy.stats import ttest_ind


t_statistic, p_value = ttest_ind(autumn_durations, spring_durations, equal_var=True)

# Print the test result
print("T-Statistic:", t_statistic)
print("P-Value:", p_value)


T-Statistic: The t-statistic measures the difference between the means of the two groups (Autumn and Spring) relative to the variation within each group. In this case, the calculated t-statistic is approximately -27.74. The negative sign indicates that the average ride durations in Autumn are significantly shorter than those in Spring.

P-Value: The p-value represents the probability of observing a t-statistic as extreme as the one calculated, assuming that there is no difference between the groups (null hypothesis). In this case, the p-value is approximately 2.345e-169, which is extremely close to zero. A p-value this small suggests strong evidence against the null hypothesis and indicates that the observed difference in ride durations is highly unlikely to occur by chance alone.

Based on these results, we can draw the following conclusions:

The average ride durations in Autumn and Spring are significantly different.
The negative t-statistic suggests that the average ride durations in Autumn are significantly shorter than those in Spring.
The extremely small p-value provides strong evidence to reject the null hypothesis and supports the conclusion of a significant difference between Autumn and Spring.
These findings indicate that the season has an impact on the average ride durations, with Autumn and Spring exhibiting distinct patterns. 