final outputs:

#### ERP + household + school_count + PTV -> by sa2
SA2_info
- ../data/curated/sa2_info.csv

#### ERP + price_index + interest_rate + price_index -> year, SA2
history_data
- ../data/curated/history_data.csv

#### Rent data:
- ../data/curated/rent.csv

#### PTV data
- ../data/raw/PTV/public_trans.csv


In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import re
from itertools import compress
import pandas as pd
import geopandas as gpd
import zipfile


spark = (
    SparkSession.builder.appName("Assignment_2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

INPUT_DIR = "../data/raw/"
OUTPUT_DIR = "../data/curated/"


headers = {"accept": "text/csv"}

22/09/14 22:02:57 WARN Utils: Your hostname, Bruce-PC resolves to a loopback address: 127.0.1.1; using 172.21.205.150 instead (on interface eth0)
22/09/14 22:02:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/14 22:02:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/14 22:03:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
def gpd_station_merge(poly_gdf, file_path, by_id_name = "SA2_CODE21",\
    station_id_name = "STOP_ID", method={"STOP_ID": "count"}):
    """
        A function used to merge shape file in path: {file_path} to a 
        geopandas dataframe {poly_gdf} with POLYGON geometry. 
        poly_gdf: a geopandas.GeoDataFrame object contains POLYGON geometry
        file_path: a String of file path to read target shape file
        by_id_name: a String of id name to perform groupby option
        station_id_name: a String of id name stated in the readed gdf
        method: a Dict of operations to perform after groupby
    """

    ### read station file
    station_gdf = gpd.read_file(file_path)

    # metro bus station feature selection
    station_gdf = station_gdf[[station_id_name, "geometry"]]
    


    # merge tabels
    join_gdf = gpd.sjoin(poly_gdf, station_gdf, how="left")
    join_gdf = join_gdf.groupby(by_id_name).agg(method)
    
    return join_gdf

## SA2_info
#### ERP + household + school_count + PTV -> by sa2
Read and prepare all data sets and create TempView

In [3]:
# read school file
school_sdf = spark.read.csv(f"{INPUT_DIR}ACARA/School_location/School_location.csv", header=True)

# count school by SA2
school_count = school_sdf.groupBy("SA2").agg({
    "School Name": "count"
})
school_count = school_count.withColumnRenamed( "count(School Name)", "school_count")
school_count.show(1, vertical = True, truncate=100)
school_count.createOrReplaceTempView("school")



# read ERP file and create tempview
ERP_sdf = spark.read.csv(f"{INPUT_DIR}ABS/ERP/ERP.csv", header=True)
ERP_sdf = ERP_sdf.filter(F.col("year") == 2021)
ERP_sdf.show(1, vertical = True, truncate=100)
ERP_sdf.createOrReplaceTempView("ERP")



# read median household income file and create tempview
household_sdf = spark.read.csv(f"{INPUT_DIR}ABS/Household_income/Household_income.csv", header=True)
household_sdf.show(2, vertical = True, truncate=100)
household_sdf.createOrReplaceTempView("household")

# read PTV station file and create tempview
ptv_sdf = spark.read.csv(f"{INPUT_DIR}PTV/public_trans.csv", header=True)
ptv_sdf.show(1, vertical = True, truncate=100)
ptv_sdf.createOrReplaceTempView("ptv")

                                                                                

-RECORD 0-----------------
 SA2          | 209031212 
 school_count | 9         
only showing top 1 row

-RECORD 0---------------
 SA2        | 201011481 
 year       | 2021      
 population | 9656      
only showing top 1 row

-RECORD 0------------------
 SA2           | 213051589 
 median_income | 1862      
-RECORD 1------------------
 SA2           | 209041437 
 median_income | 1979      
only showing top 2 rows

-RECORD 0----------------------------------------------------------------------------------------------------------------
 SA2_CODE21       | 201011001                                                                                            
 geometry         | POLYGON ((143.78282104711133 -37.566657808073295, 143.75557764214773 -37.56346721632544, 143.7480... 
 metrobus_count   | 0                                                                                                    
 metrotrain_count | 0                                                                     

Merge data

In [4]:
# inner join
print(school_count.columns)
print(ERP_sdf.columns)
print(household_sdf.columns)
combine_sdf = spark.sql("""
    SELECT  school.SA2, school.school_count, 
        ERP.population AS ERP_population, median_income, 
        metrobus_count, metrotrain_count, metrotram_count, 
        regbus_count, regcoach_count, regtrain_count, skybus_count
    FROM school
    INNER JOIN ERP ON school.SA2 = ERP.SA2
    INNER JOIN household ON school.SA2 = household.SA2
    INNER JOIN ptv ON school.SA2 = ptv.SA2_CODE21
""")
combine_sdf.show(1, vertical = True, truncate=100)
combine_sdf.write.option("header", True).mode("overwrite").csv(
    f"{OUTPUT_DIR}sa2_info.csv"
)

['SA2', 'school_count']
['SA2', 'year', 'population']
['SA2', 'median_income']


                                                                                

-RECORD 0---------------------
 SA2              | 202011018 
 school_count     | 13        
 ERP_population   | 14951     
 median_income    | 1267      
 metrobus_count   | 0         
 metrotrain_count | 0         
 metrotram_count  | 0         
 regbus_count     | 142       
 regcoach_count   | 2         
 regtrain_count   | 1         
 skybus_count     | 0         
only showing top 1 row



## history_data

#### ERP + interest_rate + price_index -> year, SA2

Read and prepare all data sets and create TempView

In [5]:
# read ERP file and create tempview
ERP_sdf = spark.read.csv(f"{INPUT_DIR}ABS/ERP/ERP.csv", header=True)
ERP_sdf = ERP_sdf.filter(F.col("year") == 2021)
ERP_sdf.show(1, vertical = True, truncate=100)
ERP_sdf.createOrReplaceTempView("ERP")

# read population projection file and create tempview
interest_sdf = spark.read.csv(f"{INPUT_DIR}rba/interest_rate/interest_rate.csv", header=True)
interest_sdf = interest_sdf.filter(F.col("year") == 2021)
interest_sdf.show(1, vertical = True, truncate=100)
interest_sdf.createOrReplaceTempView("interest")

# read property price index file and create tempview
index_sdf = spark.read.csv(f"{INPUT_DIR}ABS/Price_index/Price_index.csv", header=True)
index_sdf = index_sdf.filter(F.col("year") == 2021)
index_sdf.show(1, vertical = True, truncate=100)
index_sdf.createOrReplaceTempView("index")

-RECORD 0---------------
 SA2        | 201011481 
 year       | 2021      
 population | 9656      
only showing top 1 row

22/09/14 11:43:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , year, bond
 Schema: _c0, year, bond
Expected: _c0 but found: 
CSV file: file:///home/bruce/projects/ass2/data/raw/rba/interest_rate/interest_rate.csv
-RECORD 0----
 _c0  | 112  
 year | 2021 
 bond | 0.93 

-RECORD 0-----------
 year        | 2021 
 price_index | 185  



Merge data

In [6]:

# inner join
print(ERP_sdf.columns)
print(interest_sdf.columns)
print(index_sdf.columns)
combine_sdf = spark.sql("""
    SELECT  SA2, ERP.year, population, bond, price_index
    FROM ERP
    INNER JOIN interest ON ERP.year = interest.year
    INNER JOIN index ON ERP.year = index.year
""")
combine_sdf.show(1, vertical = True, truncate=100)
combine_sdf.write.option("header", True).mode("overwrite").csv(
    f"{OUTPUT_DIR}history_data.csv"
)

['SA2', 'year', 'population']
['_c0', 'year', 'bond']
['year', 'price_index']
-RECORD 0----------------
 SA2         | 201011481 
 year        | 2021      
 population  | 9656      
 bond        | 0.93      
 price_index | 185       
only showing top 1 row



## Match SA2 to Rental data

In [7]:
# read local files
rent_df = pd.read_csv(f"../data/raw/rent/rent_raw.csv").reset_index()


rent_gdf = gpd.GeoDataFrame(rent_df, geometry=gpd\
        .points_from_xy(rent_df["Longitude"], rent_df["Latitude"]))
boundary_gdf = gpd.read_file(f"../data/raw/ABS/digitalBoundary/SA2_2021_AUST_GDA2020.shp")
rent_gdf["geometry"] = rent_gdf["geometry"].set_crs("epsg:7844")

# feature selction on boundary gdf
boundary_gdf = boundary_gdf.loc[boundary_gdf["STE_NAME21"] == "Victoria"]
boundary_gdf = boundary_gdf[["SA2_CODE21", "geometry"]]
boundary_gdf["SA2_CODE21"] = boundary_gdf["SA2_CODE21"].astype("int64")

# assgin sa2 to rent gdf
join_gdf = gpd.sjoin(rent_gdf, boundary_gdf, how="right")
join_gdf = join_gdf.dropna()
join_gdf["index_left"] = join_gdf["index_left"].astype("int")
rent_gdf = rent_gdf.loc[join_gdf["index_left"]]
join_gdf = join_gdf.reset_index()
rent_gdf["SA2"] = join_gdf[["SA2_CODE21"]]
rent_gdf["SA2"] = rent_gdf["SA2"].astype("int64")
rent_df = rent_df.loc[rent_gdf["index"]]
rent_df["SA2"] = rent_gdf["SA2"]

rent_df["rent_id"] = range(rent_df.shape[0])
rent_df = rent_df.set_index("rent_id").drop(["index"], axis=1)

rent_df = rent_df.dropna().drop_duplicates()
print(rent_df.head())
rent_df.to_csv(f"../data/curated/rent.csv", header=True)


          rent  bedroom  baths  parking  \
rent_id                                   
0        300.0        2      1        1   
1        510.0        1      1        1   
2        330.0        1      1        1   
3        620.0        3      2        1   
4        560.0        3      2        1   

                                                       url   Latitude  \
rent_id                                                                 
0        https://www.domain.com.au/3-180-williamson-str... -36.768411   
1        https://www.domain.com.au/3203-118-kavanagh-st... -36.768411   
2        https://www.domain.com.au/6-13-laura-street-mo... -36.768411   
3        https://www.domain.com.au/14-patterson-street-... -36.768411   
4        https://www.domain.com.au/6-gent-street-yarrav... -36.768411   

          Longitude                     geometry        SA2  
rent_id                                                      
0        144.290465  POINT (144.29047 -36.76841)  202011020  


## PTV data

In [10]:
output_dir = "../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/"

# unzip zip file
with zipfile.ZipFile(f"../data/raw/PTV.zip", "r") as zip_ref:
    zip_ref.extractall(f"../data/raw/PTV/")

In [11]:
### read shape file and make geometry readable
boundary_gdf = gpd.read_file("../data/raw/ABS/digitalBoundary/\
SA2_2021_AUST_GDA2020.shp")
boundary_gdf = boundary_gdf.loc[boundary_gdf["STE_NAME21"] == "Victoria"]

# digital boundary feature selection
boundary_gdf = boundary_gdf.reset_index()[["SA2_CODE21", "geometry"]].set_index("SA2_CODE21")
print(boundary_gdf.shape)
print(boundary_gdf.head(1))


(524, 1)
                                                     geometry
SA2_CODE21                                                   
201011001   POLYGON ((143.78282 -37.56666, 143.75558 -37.5...


In [12]:
mix_gdf = boundary_gdf.sort_values(by=["SA2_CODE21"])
file_paths = [
    f"{output_dir}PTV_METRO_BUS_STOP.shp",
    f"{output_dir}PTV_METRO_TRAIN_STATION.shp",
    f"{output_dir}PTV_METRO_TRAM_STOP.shp",
    f"{output_dir}PTV_REGIONAL_BUS_STOP.shp",
    f"{output_dir}PTV_REGIONAL_COACH_STOP.shp",
    f"{output_dir}PTV_REGIONAL_TRAIN_STATION.shp",
    f"{output_dir}PTV_SKYBUS_STOP.shp"
]

col_names = ["metrobus_count", "metrotrain_count", "metrotram_count",
    "regbus_count", "regcoach_count", "regtrain_count", "skybus_count"]


for col_name, file_path in zip(col_names, file_paths):
    print(col_name, file_path)
    cur_gdf = gpd_station_merge(boundary_gdf, file_path).rename({"STOP_ID": col_name}, axis=1)
    cur_gdf = cur_gdf.sort_values(by = col_name)
    mix_gdf = pd.concat([mix_gdf, cur_gdf], axis=1)

print(mix_gdf.shape)
print(mix_gdf.head())
mix_gdf.to_csv("../data/raw/PTV/public_trans.csv")

metrobus_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_METRO_BUS_STOP.shp
metrotrain_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_METRO_TRAIN_STATION.shp
metrotram_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_METRO_TRAM_STOP.shp
regbus_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_REGIONAL_BUS_STOP.shp
regcoach_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_REGIONAL_COACH_STOP.shp
regtrain_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_REGIONAL_TRAIN_STATION.shp
skybus_count ../data/raw/PTV/ll_gda2020/esrishape/whole_of_dataset/victoria/PUBLIC_TRANSPORT/PTV_SKYBUS_STOP.shp
(524, 8)
                                                     geometry  metrobus_count  \
SA2_CODE21                                                        