In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.executor.memory", "5g")
    .config("spark.driver.memory", "5g")
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/09/10 15:06:16 WARN Utils: Your hostname, Mok resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/09/10 15:06:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/10 15:06:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def save_df_to_parquet(name, sdf):
    '''Input: take a spark dataframe, and a desired file name

    A staging folder is created as creating a parquet creates meta data files which are unneeded
    for analysis. Relevent parquet files are then moved into the curated folder
    
    Output: save the new spark data frame into the curated folder ready for analysis'''
    
    #now we want to move this file into the curated data
    import shutil
    import os
    
    folder_path = "../data/staging"

    # Create the folder, including any intermediate directories
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        print(f"Folder '{folder_path}' created successfully!")
    else:
        None



    sdf.coalesce(1)\
        .write\
        .mode("overwrite")\
        .parquet(f'../data/staging/{name}')



    starting_directory = '../data/staging/'
    output_directory = "../data/raw/"

    input_directory = os.path.join(starting_directory, name)

    for file in os.listdir(input_directory):
        if file.endswith(".parquet"):
            
            #rename the file first since currently they have a part-00000 name
            part_file_path = os.path.join(input_directory, file)
            new_file_path = os.path.join(input_directory, name)
            os.rename(part_file_path, new_file_path)
                
            #move the file into the curated folder for models
            source_file_path = os.path.join(input_directory, name)
            destination_file_path = os.path.join(output_directory, name)
        
            shutil.move(source_file_path, destination_file_path)
            shutil.rmtree(folder_path)

    return

In [3]:
crime_raw_sdf = spark.read.parquet('../data/landing/crime.parquet')

# we want to remove the subdivisions and LGAS as these are not relevent features 
offence_division_raw = crime_raw_sdf.drop("Offence Subdivision", "Offence Subgroup", "Year ending", "Local Government Area")

                                                                                

In [None]:
#save this file as a parquet in the raw folder
save_df_to_parquet("offence_division_raw.parquet", offence_division_raw)

In [4]:
crime_raw_sdf = spark.read.parquet('../data/raw/offence_division_raw.parquet')

crime_raw_sdf.createOrReplaceTempView("offence_data")

# SQL query to group by 'Year', 'Suburb/Town Name', and 'Offence Division' and sum 'Offence Count'
query = """
    SELECT 
        Year,
        `Suburb/Town Name`,
        `Offence Division`,
        SUM(`Offence Count`) AS total_offence_count
    FROM offence_data
    GROUP BY Year, `Suburb/Town Name`, `Offence Division`
"""

# Execute the SQL query
result_df = spark.sql(query)

# Show the result
result_df.show()



                                                                                

+----+----------------+--------------------+-------------------+
|Year|Suburb/Town Name|    Offence Division|total_offence_count|
+----+----------------+--------------------+-------------------+
|2023|     Bakery Hill|E Justice procedu...|                  6|
|2023|    Lake Gardens|A Crimes against ...|                 12|
|2023| Trafalgar South|B Property and de...|                  3|
|2023|      Cheltenham|     C Drug offences|                 81|
|2023|  Echuca Village|A Crimes against ...|                  5|
|2023|           Patho|D Public order an...|                  4|
|2023|     Maryborough|A Crimes against ...|                221|
|2023|         Warrion|B Property and de...|                  2|
|2023|       Ellaswood|E Justice procedu...|                  3|
|2023|     Leitchville|D Public order an...|                  8|
|2023|      Lake Mundi|D Public order an...|                 11|
|2023|      Lethbridge|E Justice procedu...|                  8|
|2023|    Golden Gully|E 

In [5]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col

# Step 1: Assuming your DataFrame is already grouped by year, Suburb/Town Name, and Offence Division.
# Use the result DataFrame that already has total_offence_count.
# For this example, letâ€™s call the DataFrame `df_grouped`

# Step 2: Create a window specification that partitions by 'Suburb/Town Name' and 'Offence Division' and orders by 'Year'
window_spec = Window.partitionBy('Suburb/Town Name', 'Offence Division').orderBy('Year')

# Step 3: Use the lag function to get the previous year's total offence count
df_with_prev_year = result_df.withColumn(
    'prev_year_offence_count',
    lag('total_offence_count').over(window_spec)
)

# Step 4: Calculate the percentage change
df_with_percentage_change = df_with_prev_year.withColumn(
    'percentage_change',
    ((col('total_offence_count') - col('prev_year_offence_count')) / col('prev_year_offence_count')) * 100
)

# Step 5: Show the result
df_with_percentage_change.show()


+----+----------------+--------------------+-------------------+-----------------------+-------------------+
|Year|Suburb/Town Name|    Offence Division|total_offence_count|prev_year_offence_count|  percentage_change|
+----+----------------+--------------------+-------------------+-----------------------+-------------------+
|2018|        Abbeyard|A Crimes against ...|                  2|                   NULL|               NULL|
|2016|        Abbeyard|B Property and de...|                  1|                   NULL|               NULL|
|2019|        Abbeyard|B Property and de...|                  1|                      1|                0.0|
|2021|        Abbeyard|B Property and de...|                  1|                      1|                0.0|
|2014|        Abbeyard|D Public order an...|                  2|                   NULL|               NULL|
|2014|        Abbeyard|    F Other offences|                  1|                   NULL|               NULL|
|2015|        Abbey

In [7]:
df_with_percentage_change.count()

77258