# Liveability Variables

**Jupyter notebook that scrapes grocery stores, healthcare services and schools in each suburbs with outputs the information collected into a single parquet files. The parquet also include distance of each suburbs to Melbourne CBD**

Below are some functions that will help with scraping

Firstly we imported necessary libraries

In [3]:
import requests
import time
import os
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType

We then initialised a Spark session

In [4]:
from pyspark.sql import SparkSession
#Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Liveability")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/10/06 04:40:05 WARN Utils: Your hostname, DESKTOP-Q5SP5SI resolves to a loopback address: 127.0.1.1; using 172.20.36.110 instead (on interface eth0)
24/10/06 04:40:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/06 04:40:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/06 04:40:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/06 04:40:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


The API key and base URL for Google Places was added. Additionally, data containing postcode information was read using Apache Spark. This postcode dataframe would allow us to iteratively scrape information on available schoosl, groceries and healthcare for every postcode.

In [14]:
# Your Google Places API key
API_KEY = 'INPUT YOUR API KEY'

# Base URL for Google Places API
url = 'https://maps.googleapis.com/maps/api/place/textsearch/json'

# Load the postcode data (Assuming the file is correctly loaded into a DataFrame)
postcodes_sdf = spark.read.parquet('../data/postcodes/postcodes.parquet')

We then created necessary data directories to store data

In [5]:
# from the current directory , we create separate files for our variables
output_relative_dir = '../data/raw_variables/'
variables = ['Hospitals & Clinics', 'Schools', 'Groceries']

# check if it exists as it makedir will raise an error if it does exist
if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
    

As only the postcode and suburb names were required for scraping data, unnecessary columns of locality state , longitude and latitude were removed. Duplicate entries were also removed for a more seamless scraping process. Additionally, the postcodes were ordered in an ascnending  order.

In [6]:
# List of columns to remove
columns = ['locality', 'state', 'long', 'lat']
postcodes_sdf = postcodes_sdf.drop(*columns)
postcodes_sdf = postcodes_sdf.dropDuplicates()
postcodes_sdf = postcodes_sdf.orderBy('postcode')


We then created a schema for information of each metric that was required to be scraped. This included the :
- name
- address
- postcode
- rating

for grocery stores, healthcare and educational hubs. 

In [7]:
# Define schema for the Spark DataFrame
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Postcode", StringType(), True),
    StructField("Rating", StringType(), True),
])


To avoid Out of Memory errors from our limited RAM, additional codes were written to scrape the postcodes in chunks of 50.

In [9]:
def get_chunks(postcodes_sdf) -> dict:
    """function that splits up postcodes into chunks of 50 so that if we are kicked halfway during scraping we don't lose too much progress
    """
    chunk_dict = {}
    i = 3000
    j = 3050
    
    # iterate each postcode until the final postcode o  399&
    while i < 3997:
        
        # filter for all postcodes within a specified range
        temp = postcodes_sdf.filter((postcodes_sdf['postcode'] >= i) & (postcodes_sdf['postcode'] < j))

        # append the postcodes into a dictionary with their chunk name
        chunk_dict[f'chunk_{i}'] = temp
        j += 50
        i += 50

    return chunk_dict

chunk_dict = get_chunks(postcodes_sdf)

## Data Scraping 

One of the main problems we encountered when scraping for these buildings was having buildings (grocery stores/schools or hospitals) in nearby suburbs show up as a search. Hence, additional matching steps had to be taken to ensure that the postcode of these buildings match the postcode of the search.

In [10]:
#### Scraping task 1: schools
# Iterate through all variables and initialize a temporary dataframe
from pyspark.sql.types import StructType, StructField, StringType

def variables_scrape(chunk, file_suffix):
    """function that obtains the chunk containing 50 postcodes to scrape. It outputs the the data into a parquet file namesd after the file suffix specified.
    """

    # A schema containing the necessary variables was set up 
    variables = ['Hospitals & Clinics', 'Schools', 'Groceries']
    schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Postcode", StringType(), True),
    StructField("Rating", StringType(), True),])
    
    # Initialize an empty dataframe 
    variable_metadata = spark.createDataFrame([], schema)

    # Iterate all the variables
    for variable in variables:

        # Loop through each row in the dataframe
        for row in chunk.collect():
            postcode = row['postcode']
                
            print(f'searching for {variable} in {postcode}')
            # Define the search query using postcode
            params = {
                'query': f'{variable} in {postcode}, Victoria, Australia',
                'key': API_KEY,
                'type': {variable},
                'region': 'AU'
            }

            response = requests.get(url, params=params)
                
            # Check if the response was successful
            if response.status_code == 200:
                print(response.json())
                results = response.json().get('results', [])
                print(results)
                    
                # Write each place's details to the CSV file
                for place in results:
                    print(place)
                    address = place.get('formatted_address')
                    status = place.get('business_status')
                    
                    # If there is a particular building available in a postcode, append the building into the matching postcode
                    if (f'{postcode}' in address) & (status == 'OPERATIONAL'):
                        print('match found')
                        name = place.get('name')
                        rating = place.get('rating', 'N/A')
                        row = [(name, address, postcode, rating)]
                        row_df = spark.createDataFrame(row, schema)
                        variable_metadata = variable_metadata.union(row_df)
                    
                # Introduce a short delay to avoid hitting rate limits of the API
                time.sleep(1)  # 1-second delay between requests
            else:
                print(f"{variable}: Error fetching data for postcode {postcode}: {response.status_code}, {response.text}")
            print(f'searching for {variable} in {postcode}')

        # Output all the data that was scraped. 
        try: 
            variable_metadata.write.mode("overwrite").parquet(f'../data/raw_variables/{variable}/{variable}_{file_suffix}.parquet')
            print(f"Data successfully written for {variable}")
        except Exception as e:
            print(f"An error occured: {e}")

In order to split up the scraping task, we divided the all task by having each person scrape 4 chunks (50 postcodes per chunk/ 200 postcodes in total). 

In [11]:
def run_chunk(starting_chunk: int) -> None:
    """Function that scrapes domain.com.au in chunks of 25 postcodes 7 times (split amongst group members)
    
    Parameters:
    starting_chunk - starting chunk number that we want

    Return:
    None 
    """
    i = starting_chunk
    
    while i < starting_chunk + 200: 
        variables_scrape(chunk_dict[f"chunk_{i}"], i) #i.split("_")[1])
        i += 50

**Down below we split up the scraping process between team members to scrape more efficiently and minimise loss of time due to hardware limitations **

In [None]:
### Davyn 
starting_chunk = 3150
run_chunk(starting_chunk)

In [None]:
### Arpan
starting_chunk = 3000 + 200
run_chunk(starting_chunk)

In [None]:
### Rachel
starting_chunk = 3000 + 400
run_chunk(starting_chunk)

In [None]:
### Nathan
starting_chunk = 3000 + 600
run_chunk(starting_chunk)

In [None]:
### Pris
starting_chunk = 3000 + 800
run_chunk(starting_chunk)

**Combining Data Frame for each categories (i.e. groceries store, healthcare services and schools)**

All the data for each of the variable was comalesced into one single parquet file. This resulted in 3 separate parquet files for each variable.

In [6]:
sdf = spark.read.parquet('../data/raw_variables/Groceries/*')
# Create new parquet of raw data
sdf \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/scraped/groceries_data.parquet')

                                                                                

In [18]:
sdf = spark.read.parquet('../data/raw_variables/Hospitals & Clinics/*')
# Create new parquet of raw data
sdf \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/scraped/Hospitals_&_Clinics_data.parquet')

                                                                                

In [19]:
sdf = spark.read.parquet('../data/raw_variables/Schools/*')
# Create new parquet of raw data
sdf \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/scraped/Schools_data.parquet')

24/10/03 22:30:06 WARN TaskSetManager: Stage 84 contains a task of very large size (1457 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

**Count the number of buildings based on their postcode**

The number of available schools, grocery stores and healthcare hubs are calculated for each suburb

In [None]:
school_sdf = spark.read.parquet('../data/scraped/Schools_data.parquet')
# Group by Postcode and count the number of schools
schools_per_postcode = school_sdf.groupBy('Postcode').agg(F.count('Name').alias('School_Count'))
schools_per_postcode

In [None]:
groceries_sdf = spark.read.parquet('../data/scraped/groceries_data.parquet')
# Group by Postcode and count the number of groceriess
groceries_per_postcode = groceries_sdf.groupBy('Postcode').agg(F.count('Name').alias('groceries_Count'))
groceries_per_postcode

In [None]:
hc_sdf = spark.read.parquet('../data/scraped/Hospitals_&_Clinics_data.parquet')
# Group by Postcode and count the number of hcs
hc_per_postcode = hc_sdf.groupBy('Postcode').agg(F.count('Name').alias('Number of Healcare'))
hc_per_postcode

**Combining into a single file**

All of this was then joined into a single dataframe

In [None]:
# Perform a join on Postcode column to combine all three DataFrames
combined_df = schools_per_postcode \
    .join(groceries_per_postcode, on='Postcode', how='outer') \
    .join(hc_per_postcode, on='Postcode', how='outer')

combined_df

**Session to get longitude and latitude for each postcode**

The latitude and longitude for each postcode was obtained to calculate their proximity to the city

In [13]:
# Define the UDF to get latitude and longitude from Google API
def get_geolocation(postcode):
    url = f"https://maps.googleapis.com/maps/api/geocode/json?address={postcode},Victoria,Australia&key={API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        result = response.json()
        if result['results']:
            location = result['results'][0]['geometry']['location']
            return location['lat'], location['lng']
    return None, None

# Split the function into two UDFs: one for latitude, one for longitude
def get_latitude(postcode):
    lat, lng = get_geolocation(postcode)
    return lat

def get_longitude(postcode):
    lat, lng = get_geolocation(postcode)
    return lng


In [None]:
# Register UDFs with PySpark
get_latitude_udf = udf(get_latitude, FloatType())
get_longitude_udf = udf(get_longitude, FloatType())

# Assuming you have a DataFrame `combined_update_sdf` with a 'Postcode' column
# For example:
combined_update_sdf = combined_df

# Add latitude and longitude columns to your DataFrame
combined_update_sdf = combined_update_sdf.withColumn('Latitude', get_latitude_udf(combined_update_sdf['Postcode']))
combined_update_sdf = combined_update_sdf.withColumn('Longitude', get_longitude_udf(combined_update_sdf['Postcode']))

# Show the updated DataFrame with geolocation data
combined_update_sdf

**Get the suburb name from each postcode using Google Maps API**

In [17]:
# Define the function to get suburb/locality using Google API
def get_suburb_name(postcode):
    url = f"https://maps.googleapis.com/maps/api/geocode/json?address={postcode},Victoria,Australia&key={API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        result = response.json()
        if result['results']:
            for component in result['results'][0]['address_components']:
                if 'locality' in component['types']:  # Extract the locality (suburb)
                    return component['long_name']
    return None

In [None]:
# Register the UDF with PySpark
get_suburb_name_udf = udf(get_suburb_name, StringType())

# Assuming you have a DataFrame 'combined_df' with 'Postcode' column
# Apply the UDF to add suburb names to your DataFrame
combined_sdf_with_names = combined_update_sdf.withColumn('Suburb', get_suburb_name_udf(combined_df['Postcode']))

# Show the DataFrame with suburb names
combined_sdf_with_names

**Perform data merge merged dataframe with geolocation dataframe**

In [53]:
geolocation_sdf =combined_sdf_with_names
merged_sdf = spark.read.parquet('../data/curated/merged_df.parquet')
liveability_sdf = merged_sdf \
            .join(geolocation_sdf, on='Postcode', how='outer')

**Perform calculation distance to Melbourne CBD from each suburbs and remove distance to CBD when it less than 1KM**

In [54]:
# Define the Haversine formula using PySpark
def haversine(lat1, lon1, lat2, lon2):
    # Convert degrees to radians
    lat1 = F.radians(lat1)
    lon1 = F.radians(lon1)
    lat2 = F.radians(lat2)
    lon2 = F.radians(lon2)
    
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = F.sin(dlat / 2) ** 2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon / 2) ** 2
    c = 2 * F.atan2(F.sqrt(a), F.sqrt(1 - a))
    
    # Radius of Earth in kilometers
    r = 6371.0
    return c * r

In [55]:
# Define Melbourne CBD's coordinates
melbourne_lat, melbourne_lon = -37.8136, 144.9631

In [56]:
# Apply the Haversine formula to the Spark DataFrame
liveability_sdf = liveability_sdf.withColumn(
    "distance_to_melbourne_km",
    haversine(F.col("Latitude"), F.col("Longitude"), F.lit(melbourne_lat), F.lit(melbourne_lon))
)

In [57]:
# Add condition to replace distances < 1 km with 0
liveability_sdf = liveability_sdf.withColumn(
    "distance_to_melbourne_km",
    F.when(F.col("distance_to_melbourne_km") < 1, 0).otherwise(F.col("distance_to_melbourne_km"))
)

In [59]:
# Calculate school per capita and add a new column
liveability_sdf = liveability_sdf.withColumn(
    "school_per_capita", 
    F.col("School_Count") / F.col("total population - 2021")
)

**Save the final dataframe of liveability index to a parquet file**

In [61]:
# Save the final dataframe into a parquet file
liveability_sdf \
    .coalesce(1) \
    .write \
    .mode('overwrite') \
    .parquet('../data/scraped/liveability_data.parquet')

In [62]:
# Stop spark session
spark.stop()