# Run Uber Pipeline

This notebook provides the capability to run and demo some of our pipeline.

Most of our pipeline is hosted within Amazon Web Services and therefore this notebook provides and insight to the data nodes within our pipeline. However, the latter part of this notebook shows how we can pull data from PostgreSQL, join some of the tables, convert to parquet and run some queries too.

# Start pipeline

The first part of our pipeline starts on an EC2 instance where we collect data from various APIs and also via webscraping. This data is collected locally into EC2 before being pushed to Amazon S3. We created a custom API to receive instructions to start the pipeline. In the rare case this does not work, the pipeline is likely already running so please do continue to view the other items.

In [1]:
import requests

# Set up the API Gateway endpoint URL
url = '***REMOVED***'
response = requests.get(url)

# Print the response from the API
print(response.content)

b'{\n  "message": "Scheduler started",\n  "status": "success"\n}\n'


# Preview S3
In the below code you are able to connect to our S3 bucket and preview the files available in our S3 bucket:

In [2]:
!pip install boto3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting boto3
  Downloading boto3-1.26.97-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.5/135.5 KB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting botocore<1.30.0,>=1.29.97
  Downloading botocore-1.29.97-py3-none-any.whl (10.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.5/10.5 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting s3transfer<0.7.0,>=0.6.0
  Downloading s3transfer-0.6.0-py3-none-any.whl (79 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.6/79.6 KB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.26.97 botocore-1.29.97 jmespath-1.0.1 s3transfer-0.6.0


In [3]:
import boto3

# Replace "my-bucket-name" with the name of your S3 bucket
bucket_name = "***REMOVED***"

# AWS credentials
access_key = "***REMOVED***"
secret_key = "***REMOVED***"

s3 = boto3.client(
    "s3",
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key,
)

# List all the objects in the bucket
response = s3.list_objects_v2(Bucket=bucket_name)

# Print the names of all the files in the bucket
for obj in response.get("Contents", []):
    print(obj["Key"])

coordinates_hotel.json
coordinates_train.json
hotels.json
routes.json
train.json
tweets.json
uber.json
weather.json


# Preview MongoDB
Our pipeline contains a Lambda function triggered by AWS CloudWatch which runs every hour. This transfers the S3 files into several Mongo collections. Below you can preview the Mongo data and also some of the records in each collection.

In [4]:
# first install PyMongo to allow access to MongoDB
%%capture
!pip install pymongo ; 

In [5]:
import pymongo

# set up the MongoDB connection
client = pymongo.MongoClient("***REMOVED***")
db = client["group_db"]

# retrieve the list of collections in the database
collection_list = db.list_collection_names()

# print the collection names
print("Collections in group_db:")
for collection_name in collection_list:
    print(collection_name)

Collections in group_db:
coordinates_hotel
train
tweets
routes
uber
coordinates_train
hotels
weather


Verify two entries in each collection:

In [6]:
# retrieve the list of collections in the database
collection_list = db.list_collection_names()

# print the first 2 entries of every collection
for collection_name in collection_list:
    collection = db[collection_name]
    print(f"\nFirst 2 entries in {collection_name}:")
    for entry in collection.find().limit(2):
        print(entry)


First 2 entries in coordinates_hotel:
{'_id': ObjectId('641aefc10d7ca1afc2f9a021'), 'latitude': 51.5145337, 'longitude': -0.1595389, 'name': 'Citadines Trafalgar Square'}
{'_id': ObjectId('641aefc10d7ca1afc2f9a022'), 'latitude': 51.5145337, 'longitude': -0.1595389, 'name': 'The Guardsman - Preferred Hotels and Resorts'}

First 2 entries in train:
{'_id': ObjectId('641af064d3eea542f229dfd7'), 'Waterloo Station': {'latitude': 51.5031653, 'longitude': -0.1123051}, 'Paddington Station': {'latitude': 51.5166744, 'longitude': -0.1769328}, "King's Cross St. Pancras Station": {'latitude': 51.53154079999999, 'longitude': -0.1252147}, 'Liverpool Street Station': {'latitude': 51.51875159999999, 'longitude': -0.0814374}, 'Fenchurch Street Station': {'latitude': 51.5117156, 'longitude': -0.0790676}, 'London Bridge Station': {'latitude': 51.5042625, 'longitude': -0.0848024}, 'Victoria Station': {'latitude': 51.4952103, 'longitude': -0.1438979}}
{'_id': ObjectId('641b3d1dd3eea542f229dfdc'), 'Waterlo

# Preview PostgreSQL
We have a PostgreSQL database hosted on AWS Relational Database Service. This is setup in a schema as pictured below. Below you can preview some of the tables and also run some example queries.

In [7]:
import psycopg2

postgres_connection = "***REMOVED***"

# Replace "postgres_connection" with your PostgreSQL connection string
conn = psycopg2.connect(postgres_connection)

# Create a cursor object
cur = conn.cursor()

# Query the database for all table names
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")

# Print the names of all tables in the database
for table in cur.fetchall():
    print(table[0])


uber_price
weather_conditions
weather_location
weather_main
tweets
tweet_metadata
train_stations
hotel_info
hotel_location
routes
uber_ride


# Extract from PostgreSQL using Spark
Below we are going to setup Spark and extract data from Postgres into Spark dataframes

In [8]:
# remove old spark installations if needed
%%capture
!rm -dr spark* -qq > /dev/null

# check versions
%env SPARK_VERSION=spark-3.3.2
# %env HADOOP_VERSION=2.7
%env HADOOP_VERSION=3
!echo $SPARK_VERSION

!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz -qq > /dev/null
!tar xf $SPARK_VERSION-bin-hadoop$HADOOP_VERSION.tgz -qq > /dev/null

!pip install -q findspark > /dev/null ;
!pip install pyspark > /dev/null ; 

!echo $SPARK_VERSION
%env JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

!java -version
!python --version

!wget https://jdbc.postgresql.org/download/postgresql-42.3.2.jar

In [9]:
import os
import tarfile

spark_tgz_path = "spark-3.3.2-bin-hadoop3.tgz"
spark_extraction_path = "" 

# Extract the .tgz file
with tarfile.open(spark_tgz_path, "r:gz") as tar:
    tar.extractall(path=spark_extraction_path)

# Set SPARK_HOME environment variable
spark_home_path = os.path.join(spark_extraction_path, "spark-3.3.2-bin-hadoop3")
os.environ["SPARK_HOME"] = spark_home_path

import findspark
findspark.init()

from pprint import pprint


In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkWithPostgres')\
        .config("spark.driver.extraClassPath", "postgresql-42.3.2.jar")\
        .getOrCreate()

spark

In [11]:
import psycopg2
import pandas as pd

# create a PostgreSQL connection
conn = psycopg2.connect(
    host="***REMOVED***",
    dbname="postgres",
    user="postgres",
    password="***REMOVED***"
)

# get all table names
query = """
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
"""
tables = pd.read_sql_query(query, conn)["table_name"].tolist()

# get primary key columns for each table
cur = conn.cursor()
cur.execute("""
    SELECT tc.table_name, kcu.column_name
    FROM information_schema.table_constraints AS tc
    JOIN information_schema.key_column_usage AS kcu
      ON tc.constraint_name = kcu.constraint_name
      AND tc.table_schema = kcu.table_schema
    WHERE tc.constraint_type = 'PRIMARY KEY'
      AND tc.table_schema = 'public';
""")
primary_keys = dict(cur.fetchall())
cur.close()

# read all tables into a dictionary of DataFrames
dfs = {}
for table in tables:
    query = f"SELECT * FROM {table}"
    df = pd.read_sql_query(query, conn)
    dfs[table] = df

# display the data for each table
for table, df in dfs.items():
    print(f"Table: {table}")
    print(df.head())

# close the connection
conn.close()

#, index_col=primary_keys[table]



Table: uber_price
   id  ride_id             x           green         assist           access  \
0   1        1          None            None           None             None   
1   2        2  {Lux,£39-43}  {UberX,£16-17}  {Exec,£27-29}  {UberXL,£23-25}   
2   3        3  {Lux,£40-44}  {UberX,£16-17}  {Exec,£27-30}  {UberXL,£23-25}   
3   4        4  {Lux,£39-43}  {UberX,£16-17}  {Exec,£27-29}  {UberXL,£23-25}   
4   5        5  {Lux,£40-44}  {UberX,£16-17}  {Exec,£27-30}  {UberXL,£23-25}   

               pet          comfort                xl            exec  \
0             None             None              None            None   
1  {Assist,£16-17}  {Access,£16-17}  {Comfort,£19-21}  {Green,£16-17}   
2  {Assist,£16-17}  {Access,£16-17}  {Comfort,£19-21}  {Green,£16-17}   
3  {Assist,£16-17}  {Access,£16-17}  {Comfort,£19-21}  {Green,£16-17}   
4  {Assist,£16-17}  {Access,£16-17}  {Comfort,£19-21}  {Green,£16-17}   

                   lux  
0                 None  
1  {"Uber Pe



### Joining the Uber price table with the others

In [12]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema for the hotel_location table
hotels_loc_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("hotel_id", IntegerType(), True),
    StructField("lat", FloatType(), True),
    StructField("long", FloatType(), True),
])

# define the schema for the tweets table
tweets_schema = StructType([
    StructField("id", StringType()),
    StructField("text", StringType()),
    StructField('edit_history_tweet_ids', StringType())
])

# define the schema for the tweet_metadata table
tweet_metadata_schema = StructType([
    StructField("id", IntegerType()),
    StructField("newest_id", StringType()),
    StructField("oldest_id", StringType()),
    StructField("result_count", IntegerType()),
    StructField("tweet_id", IntegerType())
])
# define the schema for the tweet_metadata table
train_stations_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("lat", FloatType(), True),
    StructField("long", FloatType(), True),
])

uber_ride_df = spark.createDataFrame(dfs['uber_ride'])
uber_price_df = spark.createDataFrame(dfs['uber_price'])
weather_conditions_df = spark.createDataFrame(dfs['weather_conditions'])
weather_location_df = spark.createDataFrame(dfs['weather_location'])
weather_main_df=spark.createDataFrame(dfs['weather_main'])
hotel_info_df=spark.createDataFrame(dfs['hotel_info'])
hotel_location_df=spark.createDataFrame(dfs['hotel_location'],schema =hotels_loc_schema)
tweets_df=spark.createDataFrame(dfs['tweets'], schema = tweets_schema)
#tweet_metadata_df=spark.createDataFrame(dfs['tweet_metadata'], schema =tweet_metadata_schema)
routes_df=spark.createDataFrame(dfs['routes'])
train_stations_df=spark.createDataFrame(dfs['train_stations'], schema=train_stations_schema)

### Merging uber prices with rides, routes and train stations 


In [13]:
# join the two DataFrames on id and tweet_id
uber_df = uber_price_df.join(uber_ride_df, uber_price_df.ride_id == uber_ride_df.id, 'left')
uber_df = uber_df.withColumnRenamed('id', 'uber_price_id')


In [14]:
uber_df2=uber_df.join(routes_df, uber_df.route_id == routes_df.id, 'left')

In [15]:
uber_df2 = uber_df2.withColumnRenamed('id', 'routes_id')

In [16]:
uber_df3=uber_df2.join(train_stations_df, uber_df2.start_location_id == train_stations_df.id, 'left')

In [17]:
uber_df3 = uber_df3.withColumnRenamed('id', 'train_station_id')
uber_df3 = uber_df3.withColumnRenamed('name', 'train_station_name')
uber_df3 = uber_df3.withColumnRenamed('lat', 'train_station_lat')
uber_df3 = uber_df3.withColumnRenamed('long', 'train_station_long')

In [18]:
# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
uber_df3_pd = uber_df3.toPandas()
uber_df3_pd



Unnamed: 0,uber_price_id,ride_id,x,green,assist,access,pet,comfort,xl,exec,...,uber_price_id.1,route_id,time_stamp,routes_id,start_location_id,end_location_id,train_station_id,train_station_name,train_station_lat,train_station_long
0,26,26,"{Lux,£33-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£16-18}","{Green,£13-15}",...,26,140,2023-03-22 15:14:21,140,7,2,7,Victoria Station,51.495209,-0.143898
1,29,29,"{Lux,£34-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}",...,29,140,2023-03-22 10:48:26,140,7,2,7,Victoria Station,51.495209,-0.143898
2,65,65,"{Lux,£31-34}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-17}","{Green,£12-13}",...,65,140,2023-03-22 14:50:06,140,7,2,7,Victoria Station,51.495209,-0.143898
3,19,19,"{Lux,£31-34}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-17}","{Green,£12-13}",...,19,140,2023-03-22 14:49:02,140,7,2,7,Victoria Station,51.495209,-0.143898
4,54,54,"{Lux,£30-33}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-16}","{Green,£12-13}",...,54,140,2023-03-22 14:57:00,140,7,2,7,Victoria Station,51.495209,-0.143898
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
187,147,147,"{Lux,£33-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-21}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£16-18}","{Green,£13-15}",...,147,20,2023-03-19 15:08:11,20,1,2,1,Waterloo Station,51.503166,-0.112305
188,183,183,"{Lux,£34-38}","{UberX,£13-15}","{Exec,£23-26}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}",...,183,140,2023-03-22 14:57:00,140,7,2,7,Victoria Station,51.495209,-0.143898
189,152,152,"{Lux,£34-38}","{UberX,£13-15}","{Exec,£23-26}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}",...,152,20,2023-03-19 15:13:04,20,1,2,1,Waterloo Station,51.503166,-0.112305
190,148,148,"{Lux,£34-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}",...,148,20,2023-03-19 15:13:04,20,1,2,1,Waterloo Station,51.503166,-0.112305


In [19]:
uber_df3 = uber_df3.drop("uber_price_id",'route_id','ride_id','routes_id')


### Merging hotels related tables 

In [20]:
hotel_location_df = hotel_location_df.withColumnRenamed('id', 'hotel_loc_id')

In [21]:
hotel_info_df= hotel_info_df.withColumnRenamed('id', 'hotel_info_id')
hotel_info_df= hotel_info_df.withColumnRenamed('name', 'hotel_name')
hotel_info_df= hotel_info_df.withColumnRenamed('lat', 'hotel_lat')
hotel_info_df= hotel_info_df.withColumnRenamed('long', 'hotel_long')

In [22]:
hotels_df = hotel_location_df.join(hotel_info_df,hotel_location_df.hotel_id==hotel_info_df.hotel_info_id, 'left')
hotels_df.show()

+------------+--------+---+----+-------------+--------------------+--------------------+
|hotel_loc_id|hotel_id|lat|long|hotel_info_id|          hotel_name|       neighbourhood|
+------------+--------+---+----+-------------+--------------------+--------------------+
|           1|       1|NaN| NaN|            1|DoubleTree by Hil...|City of London, L...|
|           2|       2|NaN| NaN|            2|   Stayo Kew Gardens|                 Kew|
|           3|       3|NaN| NaN|            3|               BENQi|Kensington and Ch...|
|           4|       4|NaN| NaN|            4|              Studio|            Stanmore|
|           5|       5|NaN| NaN|            5|   The Resident Soho|Westminster Borou...|
|           6|       6|NaN| NaN|            6|Bromley Court Hot...|             Bromley|
|           7|       7|NaN| NaN|            7|Leonardo Hotel Lo...|             Croydon|
|           8|       8|NaN| NaN|            8|     The Hide London|              Hendon|
|           9|       

In [23]:
# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
hotels_pd = hotels_df.toPandas()

In [24]:
hotels_df = hotels_df.drop("hotel_info_id")
hotels_df.show()

+------------+--------+---+----+--------------------+--------------------+
|hotel_loc_id|hotel_id|lat|long|          hotel_name|       neighbourhood|
+------------+--------+---+----+--------------------+--------------------+
|           1|       1|NaN| NaN|DoubleTree by Hil...|City of London, L...|
|           2|       2|NaN| NaN|   Stayo Kew Gardens|                 Kew|
|           3|       3|NaN| NaN|               BENQi|Kensington and Ch...|
|           4|       4|NaN| NaN|              Studio|            Stanmore|
|           5|       5|NaN| NaN|   The Resident Soho|Westminster Borou...|
|           6|       6|NaN| NaN|Bromley Court Hot...|             Bromley|
|           7|       7|NaN| NaN|Leonardo Hotel Lo...|             Croydon|
|           8|       8|NaN| NaN|     The Hide London|              Hendon|
|           9|       9|NaN| NaN|The White Hart Hotel|Hampton Wick, Kin...|
|          10|      10|NaN| NaN|DoubleTree by Hil...|            Dartford|
|          11|      11|Na

### Merging weather related tables 

In [25]:
weather_conditions_df = weather_conditions_df.withColumnRenamed('id', 'weather_conditions_id')

In [26]:
weather_location_df = weather_location_df.withColumnRenamed('id', 'weather_loc_id')
weather_location_df.show()

+--------------+-----------+-------+------------+-------------------+
|weather_loc_id|location_id|main_id|condition_id|                 dt|
+--------------+-----------+-------+------------+-------------------+
|       2634341|         10|    803|         803|2023-03-22 15:14:21|
|       2640692|         13|    803|         803|2023-03-22 15:13:19|
|       3345437|         15|    803|         803|2023-03-22 15:13:19|
|       2637221|         16|    803|         803|2023-03-22 15:14:21|
|       2643744|    2643744|    803|         803|2023-03-22 15:09:42|
|       2643743|    2643743|    803|         803|2023-03-22 15:05:29|
+--------------+-----------+-------+------------+-------------------+



In [27]:
weather_df1 = weather_conditions_df.join(weather_location_df,weather_conditions_df.weather_conditions_id==weather_location_df.condition_id, 'left')
weather_df1.show()

+---------------------+-----------+----------+------+--------------+-----------+-------+------------+-------------------+
|weather_conditions_id|temperature|feels_like|clouds|weather_loc_id|location_id|main_id|condition_id|                 dt|
+---------------------+-----------+----------+------+--------------+-----------+-------+------------+-------------------+
|                  803|      10.91|      9.75|    75|       2643743|    2643743|    803|         803|2023-03-22 15:05:29|
|                  803|      10.91|      9.75|    75|       2643744|    2643744|    803|         803|2023-03-22 15:09:42|
|                  803|      10.91|      9.75|    75|       2637221|         16|    803|         803|2023-03-22 15:14:21|
|                  803|      10.91|      9.75|    75|       3345437|         15|    803|         803|2023-03-22 15:13:19|
|                  803|      10.91|      9.75|    75|       2640692|         13|    803|         803|2023-03-22 15:13:19|
|                  803| 

In [28]:
weather_main_df = weather_main_df.withColumnRenamed('id', 'weather_main_id')

In [29]:
weather_df = weather_df1.join(weather_main_df,weather_df1.main_id==weather_main_df.weather_main_id, 'left')

In [30]:
# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
weather_pd = weather_df.toPandas()

In [31]:
weather_df = weather_df.drop("weather_main_id",'weather_conditions_id','weather_loc_id','main_id')
weather_df.show()

+-----------+----------+------+-----------+------------+-------------------+------+-------------+
|temperature|feels_like|clouds|location_id|condition_id|                 dt|  main|         desc|
+-----------+----------+------+-----------+------------+-------------------+------+-------------+
|      10.91|      9.75|    75|    2643743|         803|2023-03-22 15:05:29|Clouds|broken clouds|
|      10.91|      9.75|    75|    2643744|         803|2023-03-22 15:09:42|Clouds|broken clouds|
|      10.91|      9.75|    75|         16|         803|2023-03-22 15:14:21|Clouds|broken clouds|
|      10.91|      9.75|    75|         15|         803|2023-03-22 15:13:19|Clouds|broken clouds|
|      10.91|      9.75|    75|         13|         803|2023-03-22 15:13:19|Clouds|broken clouds|
|      10.91|      9.75|    75|         10|         803|2023-03-22 15:14:21|Clouds|broken clouds|
+-----------+----------+------+-----------+------------+-------------------+------+-------------+



In [32]:
uber_weather_df = uber_df3.join(weather_df,uber_df3.train_station_id==weather_df.location_id, 'left')

### Merging the Uber table with hotels and weather 

In [33]:
uber_hotels_df = uber_df3.join(hotels_df,uber_df3.end_location_id==hotels_df.hotel_loc_id, 'left')

In [34]:
# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
uber_hotels_pd = uber_hotels_df.toPandas()

In [35]:
uber_df_final = uber_hotels_df.join(weather_df,uber_hotels_df.train_station_id==weather_df.location_id, 'left')

In [36]:
# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
uber_final_pd = uber_df_final.toPandas()

## Extract prices

In [37]:
def extract_average_price(text):
    match = re.search(r'£(\d+)-(\d+)', text)
    if match:
        return (int(match.group(1)) + int(match.group(2))) / 2
    else:
        return None

In [38]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Define the UDF
extract_average_price_udf = udf(lambda x: extract_average_price(x) if x else None, FloatType())

In [39]:
spark.udf.register("extract_average_price_udf", extract_average_price_udf)

<function __main__.<lambda>(x)>

In [40]:
#uber_hotels_df['green_avg_price'] = uber_hotels_df['green'].astype(str).apply(extract_average_price)

uber_hotels_df = uber_hotels_df.withColumn('green_avg_price', extract_average_price_udf(uber_hotels_df['green']))
uber_hotels_df = uber_hotels_df.withColumn('x_avg_price', extract_average_price_udf(uber_hotels_df['x']))
uber_hotels_df = uber_hotels_df.withColumn('assist_avg_price', extract_average_price_udf(uber_hotels_df['assist']))
uber_hotels_df = uber_hotels_df.withColumn('access_avg_price', extract_average_price_udf(uber_hotels_df['access']))
uber_hotels_df = uber_hotels_df.withColumn('pet_avg_price', extract_average_price_udf(uber_hotels_df['pet']))
uber_hotels_df = uber_hotels_df.withColumn('comfort_avg_price', extract_average_price_udf(uber_hotels_df['comfort']))
uber_hotels_df = uber_hotels_df.withColumn('xl_avg_price', extract_average_price_udf(uber_hotels_df['xl']))
uber_hotels_df = uber_hotels_df.withColumn('exec_avg_price', extract_average_price_udf(uber_hotels_df['exec']))
uber_hotels_df = uber_hotels_df.withColumn('lux_avg_price', extract_average_price_udf(uber_hotels_df['lux']))

## View for some queries

In [41]:
from time import time

# import Pandas-on-Spark
import pyspark.pandas as ps

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
uber_hotels_pd_new = uber_hotels_df.toPandas()

In [42]:
uber_hotels_df.createOrReplaceTempView("uber")

In [43]:
uber_hotels_df

DataFrame[x: string, green: string, assist: string, access: string, pet: string, comfort: string, xl: string, exec: string, lux: string, time_stamp: timestamp, start_location_id: bigint, end_location_id: bigint, train_station_id: int, train_station_name: string, train_station_lat: float, train_station_long: float, hotel_loc_id: int, hotel_id: int, lat: float, long: float, hotel_name: string, neighbourhood: string, green_avg_price: float, x_avg_price: float, assist_avg_price: float, access_avg_price: float, pet_avg_price: float, comfort_avg_price: float, xl_avg_price: float, exec_avg_price: float, lux_avg_price: float]

# Convert to Parquet
Convert to a Parquet file below

In [44]:
#use the existing uber_df_final to save data to parquet 
uber_df_final.write.mode("overwrite").format("parquet").save("uber.parquet")
uber_parquet=spark.read.parquet("uber.parquet")

# make a view
uber_parquet.createOrReplaceTempView("uber_parquet")

## Example Query 1: 
Average uber price for rides starting at Waterloo Station

In [45]:
uber_hotels_df.createOrReplaceTempView("uber")
start_time6=time()
average_price_query = """
SELECT
    train_station_name,
    AVG(
        COALESCE(
            extract_average_price_udf(green),
            extract_average_price_udf(x),
            extract_average_price_udf(assist),
            extract_average_price_udf(access),
            extract_average_price_udf(pet),
            extract_average_price_udf(comfort),
            extract_average_price_udf(xl),
            extract_average_price_udf(exec),
            extract_average_price_udf(lux)
        )
    ) as average_price
FROM
    uber
WHERE
    train_station_name = 'Waterloo Station'
GROUP BY
    train_station_name
"""
end_time6=time()

waterloo_avg_price = spark.sql(average_price_query)
waterloo_avg_price.show()

+------------------+-------------+
|train_station_name|average_price|
+------------------+-------------+
|  Waterloo Station|         14.4|
+------------------+-------------+



In [46]:
print("Time spent to query table in PostgreSQL: ",end_time6 - start_time6)

Time spent to query table in PostgreSQL:  8.392333984375e-05


In [47]:
start_time7=time()
Waterloo_Station = spark.sql(average_price_query)
end_time7=time()

In [48]:
print("Time spent to query table in Parquet: ",end_time7 - start_time7)

Time spent to query table in Parquet:  0.060457468032836914


## Example Query 2: 
Show 10 journeys starting from London Waterloo

In [49]:
start_time=time()
Waterloo_Station = spark.sql("SELECT * FROM uber where train_station_name=='Waterloo Station' LIMIT 10")
end_time=time()

In [50]:
print("Time spent to query table in PostgreSQL: ",end_time - start_time)

Time spent to query table in PostgreSQL:  0.04909324645996094


In [51]:
start_time3=time()
Waterloo_Station = spark.sql("SELECT * FROM uber_parquet where train_station_name=='Waterloo Station' LIMIT 10")
end_time3=time()

In [52]:
print("Time spent to query table in Parquet: ",end_time3 - start_time3)

Time spent to query table in Parquet:  0.0355222225189209


## Example Query 3: 
What is the average price of certain Uber Green rides

In [53]:
start_time2=time()
spark.sql("SELECT green, AVG(green_avg_price) AS green_avg_price FROM uber GROUP BY green").show()
end_time2=time()

+--------------+---------------+
|         green|green_avg_price|
+--------------+---------------+
|          null|           null|
|{UberX,£13-15}|           14.0|
|{UberX,£16-17}|           16.5|
|{UberX,£12-13}|           12.5|
+--------------+---------------+



In [54]:
print("Time spent to query table in PostgreSQL: ",end_time2 - start_time2)

Time spent to query table in PostgreSQL:  1.8801546096801758


# Metric Comparison

We can see from the above metrics that Parquet is significantly faster than SparkSQL

# Show in Pandas
To complete the process and to show the data is ready for machine larning we will present in Pandas below

In [55]:
uber_hotels_pd_new

Unnamed: 0,x,green,assist,access,pet,comfort,xl,exec,lux,time_stamp,...,neighbourhood,green_avg_price,x_avg_price,assist_avg_price,access_avg_price,pet_avg_price,comfort_avg_price,xl_avg_price,exec_avg_price,lux_avg_price
0,"{Lux,£33-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£16-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-22 15:14:21,...,Kew,14.0,35.0,24.0,21.0,14.0,14.0,17.0,14.0,19.0
1,"{Lux,£34-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-22 10:48:26,...,Kew,14.0,35.5,24.0,21.0,14.0,14.0,17.5,14.0,19.0
2,"{Lux,£31-34}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-17}","{Green,£12-13}","{""Uber Pet"",£17-19}",2023-03-22 14:50:06,...,Kew,12.5,32.5,22.0,19.0,12.5,12.5,16.0,12.5,18.0
3,"{Lux,£31-34}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-17}","{Green,£12-13}","{""Uber Pet"",£17-19}",2023-03-22 14:49:02,...,Kew,12.5,32.5,22.0,19.0,12.5,12.5,16.0,12.5,18.0
4,"{Lux,£30-33}","{UberX,£12-13}","{Exec,£21-23}","{UberXL,£18-20}","{Assist,£12-13}","{Access,£12-13}","{Comfort,£15-16}","{Green,£12-13}","{""Uber Pet"",£17-19}",2023-03-22 14:57:00,...,Kew,12.5,31.5,22.0,19.0,12.5,12.5,15.5,12.5,18.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
187,"{Lux,£33-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-21}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£16-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-19 15:08:11,...,Kew,14.0,35.0,24.0,20.5,14.0,14.0,17.0,14.0,19.0
188,"{Lux,£34-38}","{UberX,£13-15}","{Exec,£23-26}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-22 14:57:00,...,Kew,14.0,36.0,24.5,21.0,14.0,14.0,17.5,14.0,19.0
189,"{Lux,£34-38}","{UberX,£13-15}","{Exec,£23-26}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-19 15:13:04,...,Kew,14.0,36.0,24.5,21.0,14.0,14.0,17.5,14.0,19.0
190,"{Lux,£34-37}","{UberX,£13-15}","{Exec,£23-25}","{UberXL,£20-22}","{Assist,£13-15}","{Access,£13-15}","{Comfort,£17-18}","{Green,£13-15}","{""Uber Pet"",£18-20}",2023-03-19 15:13:04,...,Kew,14.0,35.5,24.0,21.0,14.0,14.0,17.5,14.0,19.0


The data has now been through the entire pipeline and is ready for machine learning