In [34]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

import sys
current_dir = os.getcwd()
scripts_path = os.path.join(current_dir, '../../scripts')
sys.path.append(os.path.abspath(scripts_path))
import PTV_preprocess_function as process



In [22]:
# Create a spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
)

In [23]:
# Create folder to save data after simple preprocessing
preprocess = 'preprocessed'
unprepro = 'un_preprocess'
raw_dir = '../../data/raw/PTV/'

if not os.path.exists(os.path.join(raw_dir, preprocess)):
    os.makedirs(os.path.join(raw_dir, preprocess))

In [24]:
# define variables
input_dir = f"{raw_dir}/{unprepro}"
output_dir = f"{raw_dir}/{preprocess}"

# Union dataset

In [25]:
# initialize an empty dataframe to store merged data
all_data = None
trans_type = []

for file in os.listdir(input_dir):

    # read file
    file_path = os.path.join(input_dir, file)
    df = spark.read.parquet(file_path)
        
    # create new feature - transportation_type
    transportation_type = file.replace('.parquet', '')
    trans_type.append(transportation_type)
    
    df = df.withColumn('public_transportation_type', lit(transportation_type))

    # union dataset
    if all_data is None:
        all_data = df
    else:
        all_data = all_data.union(df)

# save as parquet
all_data.write.mode('overwrite').parquet(f"{input_dir}/PTV_union.parquet")



In [26]:
# read dataset
parquet_path = f"{input_dir}/PTV_union.parquet"
sdf = spark.read.parquet(parquet_path)

In [27]:
sdf.printSchema()

root
 |-- stop_id: integer (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- stop_lat: double (nullable = true)
 |-- stop_lon: double (nullable = true)
 |-- public_transportation_type: string (nullable = true)



In [28]:
process.print_dataset_shape("original dataset shape: ", sdf)

original dataset shape:  - Shape: (28316 rows, 5 columns)


In [29]:
sdf.describe().show()

+-------+------------------+--------------------+-------------------+------------------+--------------------------+
|summary|           stop_id|           stop_name|           stop_lat|          stop_lon|public_transportation_type|
+-------+------------------+--------------------+-------------------+------------------+--------------------------+
|  count|             28316|               28316|              28316|             28316|                     28316|
|   mean|22286.645359514056|                NULL|-37.691457563928594|144.92578657647985|                      NULL|
| stddev|15951.415071739373|                NULL| 0.6274758685737729|0.8152798667253476|                      NULL|
|    min|                 4|0-Bourke St/Sprin...|   -38.777123382502|  138.595751207359|          Metropolitan Bus|
|    max|             52183|opp 68 The Elms B...|  -34.1652286164826|   150.17814893804|            Regional Train|
+-------+------------------+--------------------+-------------------+---

# preprocessing

In [30]:
# check missing value
process.check_missing_values(sdf)

{}

In [31]:
# check duplicate record
duplicate_rows = df.groupBy(df.columns).count().filter("count > 1")

if duplicate_rows.count() > 0:
    duplicate_rows.show()
else:
    print("no duplicate")

no duplicate


In [38]:
sdf.groupBy("public_transportation_type").count().show()


+--------------------------+-----+
|public_transportation_type|count|
+--------------------------+-----+
|          Metropolitan Bus|18612|
|              Regional Bus| 6854|
|         Metropolitan Tram| 1626|
|            Regional Coach|  893|
|        Metropolitan Train|  221|
|            Regional Train|  110|
+--------------------------+-----+



In [None]:
spark.stop()