<a href="https://colab.research.google.com/github/Maestro2496/Machine-Learning/blob/main/BigDataAssessment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# check if this instance of the notebook already has files present
# and thus determine which steps required prior to reading in file and handling the data
!ls


In [None]:
# set-up spark (NB if Apache amend versions on download site we will need to amend path in wget command)
## NOTE that this version would make use of Hadoop if installed BUT that HDFS & Hadoop is not installed on our Colab
## (we are only using a single node (probably as a VM) so we will not be able to benefit from parallelism)
!clear
!echo welcome

!rm -f spark-3.4.[01]-bin-hadoop3.tgz*
!rm -rf spark-3.4.[01]-bin-hadoop3

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz
!tar -xf spark-3.4.2-bin-hadoop3.tgz

!ls -alt
print("standalone Spark is now installed")

In [None]:
# init spark (ensure SPARK_HOME set to same version as we download earlier)
!pip3 install findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
# the next line gives us 'local' mode. try 'local[2]' to use 2 cores or 'master:NNNN' to run on Spark standalone cluster at port NNNN
spark_conf = SparkConf().setMaster('local[2]').setAppName('MyApp')
sc = SparkContext(conf=spark_conf)
# see what we have by examining the Spark User Interface
from pyspark.sql import *
from pyspark.sql.functions import *
SparkSession.builder.getOrCreate()
##

In [None]:
## this is how one could upload a file into colab using the colab GUI (uncomment both lines if want to try it)

#from google.colab import files
#files.upload()


In [None]:
# get file for given year from TfL open data
!wget https://cycling.data.tfl.gov.uk/usage-stats/cyclehireusagestats-2014.zip
!unzip cyclehireusagestats-2014.zip

In [None]:
# at this point we have Spark initialised and we have a number of CSV files.
# NB you can try also download the zipfile to your host machine and try opening in Excel (Win)
# (in Linux, easiest to open a file manager GUI then double-click on .csv file to open associated spreadsheet app)

In [None]:
# read in file
!ls
file="./1. Journey*csv"
file_1="./10a. Journey*csv"
files = os.listdir("./")
csv_files = [file for file in files if file.endswith(".csv")]
spark = SparkSession.builder.appName("bikes").getOrCreate()
j_df = (spark.read.format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load(csv_files))

# show top 10
j_df.show(10)

## Data cleaning

In [None]:
j_df.columns

In [None]:
j_df.select("Duration").filter(col("Duration").isNull()).count()

In [None]:
cleaned_df = j_df.dropna(subset=["Duration"])
cleaned_df.select("Duration").filter(col("Duration").isNull()).count()

In [None]:
j_df.select([count(when(isnull(c), c)).alias(c) for c in j_df.columns]).show()

In [None]:
j_df = j_df.dropna()
j_df

In [None]:
j_df.select([count(when(isnull(c), c)).alias(c) for c in j_df.columns]).show()

## Total duration and Baylis Road duration total

In [None]:
from pyspark.sql.functions import col,sum, isnan, isnull, count, when
j_df.filter(col("StartStation Name") == "Baylis Road, Waterloo").count()

In [None]:
# Total duration
total_duration = j_df.select("Duration").agg(sum("Duration").alias("Duration_sum"))
total_duration.show()

In [None]:
baylis_filter = j_df.filter(col("StartStation Name") == "Baylis Road, Waterloo")

baylis_filter.agg(sum("Duration").alias("Duration_sum")).show()


## Aggregation (Group by Station name and order by Duration)


In [None]:
j_df.printSchema()
# j_df.groupby("StartStation Name").min("Duration").show()

In [None]:
j_df = j_df.withColumn("Duration", col("Duration").cast("int"))

In [None]:
j_df.groupby("StartStation Name").sum("Duration").orderBy("sum(Duration)", ascending=True)\
.filter(col("StartStation Name") ==  "Baylis Road, Waterloo").show()
