In [1]:
import sys
import os

# Get the directory where your notebook is running (which is /app/notebooks)
notebook_dir = os.path.dirname(os.path.abspath(''))

# Add the parent directory (/app) to the Python path
# This allows imports like 'from etl_development.my_etl_module import ...'
if '/app' not in sys.path:
    sys.path.insert(0, '/app') # Add /app to the beginning of the path for highest priority

print(f"Current Python Path: {sys.path}")

Current Python Path: ['/app', '/usr/local/lib/python39.zip', '/usr/local/lib/python3.9', '/usr/local/lib/python3.9/lib-dynload', '', '/usr/local/lib/python3.9/site-packages']


# 01 - Acquire Packages For Data Preparation Work
Includes:
 - loading required PACKAGES
 - provided DIRECTORY for filepaths and github/version control functionality for structuring data and frameworks (not fully used as had limited time)
 - FUNCTIONS (which would have been encapsulated within classes and appropriate github/version control directories - but with limited time only included here).
 - A data type/preparation function (TRANSFORMATION - fn_transform_cast)
 - A function which creates dimensional tables (TRANSFORMATION for Star Schemas - fn_create_dim_table)
 - A function that creates spark dataframes based on user suggestions (EXTRACTION/LOAD - fn_dataframe_selections)
 - A function that extends/creates attributes relevant to dates (TRANSFORMATION - fn_add_period_attributes)
 - Functions include testing, error checking, correction, and validation"

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, hour, month, date_format, count
import shutil # Integration with version control platforms (github, gitlab, etc.)
import pandas as pd # Import pandas
import datetime
from etl_development.joels_etl_class import JoelsETL

# Pathways to (1) data, 
# (2) output data quality/profiling used to examine and determine transformation requirements, 
# (3) star schema output (warehouse)
fpath_tenancy_data = '../src_data/sample.json'
fpath_etl_code = '/app/etl_development/'
fpath_data_quality_profile = '/app/data_quality_profiles/'
fpath_data_star_schema = '../data_star_schema_prep/ais_data_by_multi_periods'



In [52]:
#Test 5

#df.show(10, truncate=False)
#df.printSchema()

#df.describe()
#df.select("DateTime").("min", "25%", "75%", "max").show()
#df2 = df.withColumn("Date_Only",to_date("DateTime"))
#df.select("Date_Only", ").("min", "25%", "75%", "max").show()

# Add hierarchical partition columns
df_with_hour = df.withColumn("year", year(col("DateTime"))) \
                 .withColumn("month", month(col("DateTime"))) \
                 .withColumn("day", dayofmonth(col("DateTime"))) \
                 .withColumn("hour", hour(col("DateTime")))

# Save with hierarchical partitioning
df_with_hour.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day", "hour") \
    .parquet(fpath_data_star_schema)

print("Data partitioned by year/month/day/hour")

25/07/28 14:33:31 WARN TaskSetManager: Stage 62 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Data partitioned by year/month/day/hour


In [20]:
#Test 4

import json
import sys
from decimal import Decimal, ROUND_DOWN
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.sql.functions import col, from_unixtime, round as spark_round

def process_with_typed_schema(file_path, max_memory_mb=2048):
    """Process with proper data types and conversions"""
    
    # Define schema with proper types
    schema = StructType([
        StructField("UTCTimeStamp", IntegerType(), True),  # Will convert to timestamp later
        StructField("Message_MessageID", IntegerType(), True),
        StructField("Message_UserID", IntegerType(), True),
        StructField("Message_Latitude", DoubleType(), True),
        StructField("Message_Longitude", DoubleType(), True),
        StructField("Message_SOG", DoubleType(), True)
        # Add more numeric fields as needed
    ])
    
    spark = SparkSession.builder \
        .appName("TypedProcessor") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    def truncate_decimal(value, decimal_places=6):
        """Truncate decimal to specified places"""
        if value is None:
            return None
        try:
            # Convert to Decimal for precise truncation
            decimal_val = Decimal(str(value))
            multiplier = Decimal(10 ** decimal_places)
            return float(decimal_val.quantize(Decimal('1') / multiplier, rounding=ROUND_DOWN))
        except:
            return None
    
    def safe_convert_int(value):
        """Safely convert to integer"""
        if value is None or value == '':
            return None
        try:
            return int(float(value))  # Handle strings like "123.0"
        except:
            return None
    
    def safe_convert_float(value):
        """Safely convert to float"""
        if value is None or value == '':
            return None
        try:
            return float(value)
        except:
            return None



    
    max_memory_bytes = max_memory_mb * 1024 * 1024
    chunk_records = []
    current_memory = 0
    all_dataframes = []
    chunk_number = 0
    total_processed = 0
    
    print(f"Processing with typed schema in chunks of ~{max_memory_mb}MB...")
    
    with open(file_path, 'r') as f:
        for line_num, line in enumerate(f, 1):
            try:
                record = json.loads(line.strip())
                message = record.get('Message', {})
                
                # Create properly typed record
                flat_record = {
                    'UTCTimeStamp': safe_convert_int(record.get('UTCTimeStamp')),
                    'Message_MessageID': safe_convert_int(message.get('MessageID')),
                    'Message_UserID': safe_convert_int(message.get('UserID')),
                    'Message_Latitude': truncate_decimal(safe_convert_float(message.get('Latitude')), 6),
                    'Message_Longitude': truncate_decimal(safe_convert_float(message.get('Longitude')), 6),
                    'Message_SOG': safe_convert_float(message.get('SOG'))
                }
                
                chunk_records.append(flat_record)
                current_memory += sys.getsizeof(str(flat_record))
                
                if current_memory >= max_memory_bytes:
                    chunk_number += 1
                    print(f"Processing chunk {chunk_number} with {len(chunk_records):,} records...")
                    
                    # Create DataFrame with typed schema
                    chunk_df = spark.createDataFrame(chunk_records, schema)
                    all_dataframes.append(chunk_df)
                    
                    total_processed += len(chunk_records)
                    print(f"  Chunk {chunk_number} processed. Total so far: {total_processed:,}")
                    
                    chunk_records = []
                    current_memory = 0
                
                if line_num % 100000 == 0:
                    print(f"  Read {line_num:,} lines...")
                    
            except Exception as e:
                if line_num <= 10:  # Show first 10 errors only
                    print(f"Error at line {line_num}: {e}")




    
    # Process final chunk
    if chunk_records:
        chunk_number += 1
        print(f"Processing final chunk {chunk_number} with {len(chunk_records):,} records...")
        chunk_df = spark.createDataFrame(chunk_records, schema)
        all_dataframes.append(chunk_df)
        total_processed += len(chunk_records)
    
    print(f"Total processed: {total_processed:,} records in {chunk_number} chunks")
    
    # Union all chunks
    if all_dataframes:
        print("Combining chunks and applying final transformations...")
        final_df = all_dataframes[0]
        for df in all_dataframes[1:]:
            final_df = final_df.union(df)
        
        # Convert UTCTimeStamp to datetime and ensure lat/lon precision
        final_df = final_df.withColumn(
            "DateTime", 
            from_unixtime(col("UTCTimeStamp")).cast(TimestampType())
        ).withColumn(
            "Message_Latitude", 
            spark_round(col("Message_Latitude"), 6)
        ).withColumn(
            "Message_Longitude", 
            spark_round(col("Message_Longitude"), 6)
        )
        
        # Reorder columns to put DateTime first
        column_order = ["DateTime", "UTCTimeStamp"] + [c for c in final_df.columns if c not in ["DateTime", "UTCTimeStamp"]]
        final_df = final_df.select(*column_order)
        
        print(f"Final DataFrame created with {final_df.count():,} records")
        return final_df
    
    return None

# Usage
spark = SparkSession.builder \
    .appName("TypedJSONProcessor") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Process your file
df = process_with_typed_schema("../src_data/sample.json", max_memory_mb=2048)

if df:
    print("\nSchema:")
    df.printSchema()
    
    print("\nSample data:")
    df.show(10, truncate=False)
    
    # Show data type verification
    print("\nData type verification:")
    sample_row = df.first()
    if sample_row:
        print(f"DateTime type: {type(sample_row['DateTime'])}")
        print(f"Latitude: {sample_row['Message_Latitude']} (type: {type(sample_row['Message_Latitude'])})")
        print(f"Longitude: {sample_row['Message_Longitude']} (type: {type(sample_row['Message_Longitude'])})")
        print(f"MessageID: {sample_row['Message_MessageID']} (type: {type(sample_row['Message_MessageID'])})")
    
    # Cache for reuse
    df.cache()
    print(f"\nDataFrame cached with {df.count():,} total records")

25/07/28 10:30:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/07/28 10:30:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Processing with typed schema in chunks of ~2048MB...
  Read 100,000 lines...
  Read 200,000 lines...
  Read 300,000 lines...
  Read 400,000 lines...
  Read 500,000 lines...
  Read 600,000 lines...
  Read 700,000 lines...
  Read 800,000 lines...
  Read 900,000 lines...
  Read 1,000,000 lines...
  Read 1,100,000 lines...
  Read 1,200,000 lines...
  Read 1,300,000 lines...
  Read 1,400,000 lines...
  Read 1,500,000 lines...
  Read 1,600,000 lines...
  Read 1,700,000 lines...
  Read 1,800,000 lines...
  Read 1,900,000 lines...
  Read 2,000,000 lines...
  Read 2,100,000 lines...
  Read 2,200,000 lines...
  Read 2,300,000 lines...
  Read 2,400,000 lines...
  Read 2,500,000 lines...
  Read 2,600,000 lines...
  Read 2,700,000 lines...
  Read 2,800,000 lines...
  Read 2,900,000 lines...
  Read 3,000,000 lines...
  Read 3,100,000 lines...
  Read 3,200,000 lines...
  Read 3,300,000 lines...
  Read 3,400,000 lines...
  Read 3,500,000 lines...
  Read 3,600,000 lines...
  Read 3,700,000 lines...
  R

25/07/28 10:54:56 WARN TaskSetManager: Stage 5 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Final DataFrame created with 8,955,836 records

Schema:
root
 |-- DateTime: timestamp (nullable = true)
 |-- UTCTimeStamp: integer (nullable = true)
 |-- Message_MessageID: integer (nullable = true)
 |-- Message_UserID: integer (nullable = true)
 |-- Message_Latitude: double (nullable = true)
 |-- Message_Longitude: double (nullable = true)
 |-- Message_SOG: double (nullable = true)


Sample data:


25/07/28 10:55:29 WARN TaskSetManager: Stage 8 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
25/07/28 10:55:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 8 (TID 43): Attempting to kill Python Worker
                                                                                

+-------------------+------------+-----------------+--------------+----------------+-----------------+-----------+
|DateTime           |UTCTimeStamp|Message_MessageID|Message_UserID|Message_Latitude|Message_Longitude|Message_SOG|
+-------------------+------------+-----------------+--------------+----------------+-----------------+-----------+
|2020-05-05 00:00:00|1588636800  |18               |416004341     |-7.578556       |171.328116       |NULL       |
|2020-05-05 00:00:00|1588636800  |18               |503648300     |-22.647916      |152.895326       |NULL       |
|2020-05-05 00:00:00|1588636800  |18               |503671200     |-22.527651      |152.926453       |NULL       |
|2020-05-05 00:00:00|1588636800  |18               |512235000     |-40.675575      |173.762283       |NULL       |
|2020-05-05 00:00:00|1588636800  |18               |601224000     |-33.024055      |17.954858        |NULL       |
|2020-05-05 00:00:00|1588636800  |18               |664060000     |-34.78765    

25/07/28 10:55:34 WARN TaskSetManager: Stage 9 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
25/07/28 10:55:38 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 9 (TID 44): Attempting to kill Python Worker
                                                                                

DateTime type: <class 'datetime.datetime'>
Latitude: -7.578556 (type: <class 'float'>)
Longitude: 171.328116 (type: <class 'float'>)
MessageID: 18 (type: <class 'int'>)


25/07/28 10:55:39 WARN TaskSetManager: Stage 10 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
25/07/28 10:56:26 WARN TaskSetManager: Stage 11 contains a task of very large size (36174 KiB). The maximum recommended task size is 1000 KiB.
[Stage 11:>                                                         (0 + 8) / 8]


DataFrame cached with 8,955,836 total records


                                                                                

In [19]:
#Test 3
def get_dynamic_schema_with_types(file_path, sample_lines=1000):
    """Analyze first N lines to build typed schema dynamically"""
    
    field_samples = {}
    
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            if i >= sample_lines:
                break
            try:
                record = json.loads(line.strip())
                message = record.get('Message', {})
                
                # Collect all field samples
                for key, value in record.items():
                    if key != 'Message':
                        field_samples.setdefault(key, []).append(value)
                
                for key, value in message.items():
                    field_name = f"Message_{key}"
                    field_samples.setdefault(field_name, []).append(value)
                    
            except:
                continue
    
    # Build schema based on samples
    schema_fields = []
    
    for field_name, samples in field_samples.items():
        non_null_samples = [s for s in samples if s is not None]
        
        if field_name == 'UTCTimeStamp':
            schema_fields.append(StructField(field_name, IntegerType(), True))
        elif 'Latitude' in field_name or 'Longitude' in field_name:
            schema_fields.append(StructField(field_name, DoubleType(), True))
        elif any(isinstance(s, (int, float)) for s in non_null_samples):
            # Check if it's integer or float
            if all(isinstance(s, int) or (isinstance(s, float) and s.is_integer()) 
                   for s in non_null_samples if isinstance(s, (int, float))):
                schema_fields.append(StructField(field_name, IntegerType(), True))
            else:
                schema_fields.append(StructField(field_name, DoubleType(), True))
        else:
            schema_fields.append(StructField(field_name, StringType(), True))
    
    return StructType(schema_fields), field_samples.keys()

# Use dynamic schema detection
schema, field_names = get_dynamic_schema_with_types("../src_data/sample.json", 1000)
print("Detected schema:")
for field in schema.fields:
    print(f"  {field.name}: {field.dataType}")

Detected schema:
  UTCTimeStamp: IntegerType()
  Message_MessageID: IntegerType()
  Message_RepeatIndicator: IntegerType()
  Message_UserID: IntegerType()
  Message_Valid: IntegerType()
  Message_Spare1: IntegerType()
  Message_Sog: DoubleType()
  Message_PositionAccuracy: IntegerType()
  Message_Longitude: DoubleType()
  Message_Latitude: DoubleType()
  Message_Cog: DoubleType()
  Message_TrueHeading: IntegerType()
  Message_Timestamp: IntegerType()
  Message_Spare2: IntegerType()
  Message_ClassBUnit: IntegerType()
  Message_ClassBDisplay: IntegerType()
  Message_ClassBDsc: IntegerType()
  Message_ClassBBand: IntegerType()
  Message_ClassBMsg22: IntegerType()
  Message_AssignedMode: IntegerType()
  Message_Raim: IntegerType()
  Message_CommunicationStateIsItdma: IntegerType()
  Message_CommunicationState: IntegerType()
  Message_Name: StringType()
  Message_Type: IntegerType()
  Message_Dimension: StringType()
  Message_FixType: IntegerType()
  Message_Dte: IntegerType()
  Message_Sp

In [18]:
#Test 2

import json

def count_valid_json_records(file_path):
    """Count valid JSON records and track errors"""
    valid_count = 0
    error_count = 0
    
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            try:
                json.loads(line.strip())
                valid_count += 1
            except json.JSONDecodeError:
                error_count += 1
                if error_count <= 5:  # Show first 5 errors
                    print(f"Invalid JSON at line {i+1}: {line[:100]}...")
    
    print(f"Valid JSON records: {valid_count}")
    print(f"Invalid JSON lines: {error_count}")
    print(f"Total lines: {valid_count + error_count}")
    
    return valid_count, error_count

# Count valid records
valid, invalid = count_valid_json_records("../src_data/sample.json")

Valid JSON records: 8955836
Invalid JSON lines: 0
Total lines: 8955836


In [17]:

#TEST 1

import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

def read_and_flatten_json_lines(file_path, num_lines=100):
    """Read first N lines and flatten the nested structure"""
    flattened_records = []
    
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            if i >= num_lines:
                break
            try:
                # Parse the JSON line
                record = json.loads(line.strip())
                
                # Flatten the structure
                flat_record = {
                    'UTCTimeStamp': record.get('UTCTimeStamp'),
                    'MessageID': record.get('Message', {}).get('MessageID'),
                    'UserID': record.get('Message', {}).get('UserID'),
                    'Latitude': record.get('Message', {}).get('Latitude'),
                    'Longitude': record.get('Message', {}).get('Longitude'),
                    'SOG': record.get('Message', {}).get('SOG')
                }
                
                flattened_records.append(flat_record)
                
            except json.JSONDecodeError as e:
                print(f"Skipping invalid JSON at line {i+1}: {e}")
            except Exception as e:
                print(f"Error processing line {i+1}: {e}")
    
    return flattened_records

# Create Spark session
spark = SparkSession.builder \
    .appName("FlattenedAIS") \
    .getOrCreate()

# Read and flatten first 100 lines
flattened_data = read_and_flatten_json_lines("../src_data/sample.json", 100)

print(f"Successfully processed {len(flattened_data)} records")

# Define schema for the flattened data
schema = StructType([
    StructField("UTCTimeStamp", StringType(), True),
    StructField("MessageID", StringType(), True),
    StructField("UserID", StringType(), True),
    StructField("Latitude", StringType(), True),
    StructField("Longitude", StringType(), True),
    StructField("SOG", StringType(), True)
])

# Create DataFrame from the flattened data
df = spark.createDataFrame(flattened_data, schema)

# Show the results
df.show(10, truncate=False)
df.printSchema()
print(f"Total records in DataFrame: {df.count()}")

25/07/28 10:00:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Successfully processed 100 records


                                                                                

+------------+---------+---------+-------------------+-------------------+----+
|UTCTimeStamp|MessageID|UserID   |Latitude           |Longitude          |SOG |
+------------+---------+---------+-------------------+-------------------+----+
|1588636800  |18       |416004341|-7.578556666666667 |171.32811666666666 |NULL|
|1588636800  |18       |503648300|-22.647916666666667|152.89532666666668 |NULL|
|1588636800  |18       |503671200|-22.527651666666667|152.92645333333334 |NULL|
|1588636800  |18       |512235000|-40.675575         |173.76228333333333 |NULL|
|1588636800  |18       |601224000|-33.024055         |17.954858333333334 |NULL|
|1588636800  |18       |664060000|-34.78765          |70.76879000000001  |NULL|
|1588636800  |19       |398701904|-31.747185         |164.2803133333333  |NULL|
|1588636800  |19       |58015872 |-7.881150000000001 |-167.76720166666667|NULL|
|1588636800  |19       |98511881 |-5.960218333333333 |75.88409666666666  |NULL|
|1588636800  |1        |209761000|-23.20

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Create Spark session
spark = SparkSession.builder \
    .appName("ReadJSONSample") \
    .getOrCreate()

# Method 1: Read all data then limit (simple but reads entire file)
df = spark.read \
    .option("inferSchema", "false") \
    .option("multiLine", "true") \
    .option("samplingRatio", 0.1) \
    .json(fpath_tenancy_data) \
    .limit(10)

df.printSchema()
df.show()

root
 |-- active: string (nullable = true)
 |-- age: string (nullable = true)
 |-- department: string (nullable = true)
 |-- email: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- id: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)



                                                                                

+------+---+-----------+--------------------+----------+---+--------------------+-------------+------+--------------------+
|active|age| department|               email| hire_date| id|            location|         name|salary|              skills|
+------+---+-----------+--------------------+----------+---+--------------------+-------------+------+--------------------+
|  true| 28|Engineering|john.smith@email.com|2022-03-15|001| {New York, USA, NY}|   John Smith| 75000|[Python, SQL, Doc...|
|  true| 32|  Marketing|sarah.johnson@ema...|2021-07-22|002|{San Francisco, U...|Sarah Johnson| 68000|[Analytics, SEO, ...|
| false| 26|     Design| mike.chen@email.com|2023-01-10|003|   {Austin, USA, TX}|    Mike Chen| 62000|[UI/UX, Figma, Pr...|
|  true| 29|Engineering|emma.wilson@email...|2020-11-05|004|  {Seattle, USA, WA}|  Emma Wilson| 82000|[Java, Kubernetes...|
|  true| 35|      Sales|david.brown@email...|2019-09-18|005|  {Chicago, USA, IL}|  David Brown| 71000|[CRM, Negotiation...|
|  true|

In [14]:
spark.stop()

# 02 - Setup Spark to ingest and ETL non-geospatial data
Includes:
- creating a spark session
- accessing tenancy data from original file with ...
- a pre-formed schema (non-parsing) to EXTRACT data (as is)
- basic TRANSFORMATION/PREPARATION of non-geospatial datasets
- examination of data quality leveraging YData Quality Framework tool (and saving to \"data quality profile\" folder in interactive html format."

In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("ReadCSVLocal") \
        .getOrCreate()

etl_processor = JoelsETL(spark)
# Define the schema explicitly
# All columns are declared as StringType to load all and minimise parsing errors
# due to mixed types or special values like "-1" or "NA" within quoted fields.
# Casting occurs later
"""
df = spark.read \
    .option("header", "true")  # Set to "true" if your CSV has a header row
    .option("inferSchema", "false") # Crucial: This makes all columns StringType
    .csv(csv_file_path)
"""

schema = StructType([
    StructField("Time Frame", StringType(), True),
    StructField("Location Id", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Lodged Bonds", StringType(), True),
    StructField("Active Bonds", StringType(), True),
    StructField("Closed Bonds", StringType(), True),
    StructField("Median Rent", StringType(), True),
    StructField("Geometric Mean Rent", StringType(), True),
    StructField("Upper Quartile Rent", StringType(), True),
    StructField("Lower Quartile Rent", StringType(), True),
    StructField("Log Std Dev Weekly Rent", StringType(), True)
])

try:
    # EXTRACTION PHASE - Check if the file exists
    if not os.path.exists(fpath_tenancy_data):
        print(f"Error: File not found at {fpath_tenancy_data}")
    else:
        print(f"Attempting to read data from local file: {fpath_tenancy_data}")
        # EXTRACT file into a Spark DataFrame using the defined schema
        df_spark = spark.read.csv(
            f"file://{fpath_tenancy_data}",
            header=True,
            schema=schema,  # Use the explicitly defined schema
            quote='"',      # Specify that double quotes are used for quoting
            escape='"'      # Specify that double quotes are used for escaping
        )

# Review the EXTRACTED DATA
        print("DataFrame head:")
        df_spark.show()
        print("DataFrame schema:")
        df_spark.printSchema()

        # TRANSFORMATIONS
        df_spark_tfm = etl_processor.fn_transform_cast(df_spark, ["Time Frame"], "date")
        df_spark_tfm = etl_processor.fn_transform_cast(df_spark_tfm,
         ["Location Id", "Lodged Bonds", "Active Bonds", "Closed Bonds",
          "Median Rent", "Upper Quartile Rent", "Lower Quartile Rent",
          "Geometric Mean Rent"], "integer")
        df_spark_tfm = etl_processor.fn_transform_cast(df_spark_tfm, ["Log Std Dev Weekly Rent"], "double")

        # Select both transformed and orginal fields to perform data quality reports on
        selected_fields = ["Time Frame", "tfm_Time Frame",
                         "Location Id", "tfm_Location Id",
                         "Location",
                         "Lodged Bonds", "tfm_Lodged Bonds",
                         "Active Bonds", "tfm_Active Bonds",
                         "Closed Bonds", "tfm_Closed Bonds",
                         "Median Rent", "tfm_Median Rent",
                         "Geometric Mean Rent", "tfm_Geometric Mean Rent",
                         "Upper Quartile Rent", "tfm_Upper Quartile Rent",
                         "Lower Quartile Rent", "tfm_Lower Quartile Rent",
                         "Log Std Dev Weekly Rent", "tfm_Log Std Dev Weekly Rent"]
        df_spark_tfm = etl_processor.fn_dataframe_selections(df_spark_tfm, selected_fields)

        # - DATA QUALITY CHECK REPORT
        # - Convert 'Time Frame' column to datetime objects in pandas
        df_pandas = df_spark_tfm.toPandas()
        df_pandas['tfm_Time Frame'] = pd.to_datetime(df_pandas['tfm_Time Frame'])
        ##report = ProfileReport(df_pandas, title="Profiling pyspark DataFrame")
        # - Get the current date and time then save the Data Quality Report to github to eyeball (review manually)
        ##now = datetime.datetime.now()
        ##report.to_file(os.path.join(fpath_data_quality_profile, now.strftime("DataProfile_%Y%m%d_%H%M.html")))

except Exception as e:
    print(f"An error occurred during Spark processing: {e}")

/usr/local/lib/python3.11/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/27 16:34:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Attempting to read data from local file: /app/src_data/
DataFrame head:


                                                                                

+----------+-----------+--------------------+------------+------------+------------+-----------+-------------------+-------------------+-------------------+-----------------------+
|Time Frame|Location Id|            Location|Lodged Bonds|Active Bonds|Closed Bonds|Median Rent|Geometric Mean Rent|Upper Quartile Rent|Lower Quartile Rent|Log Std Dev Weekly Rent|
+----------+-----------+--------------------+------------+------------+------------+-----------+-------------------+-------------------+-------------------+-----------------------+
|1993-02-01|        -99|                 ALL|       9,144|       91635|       7,119|        150|                151|                200|                120|                   0.43|
|1993-02-01|         -1|                  NA|         390|        4932|         336|        130|                128|                165|                100|                   0.46|
|1993-02-01|          1|    Northland Region|         177|        1770|         141|        120

                                                                                

# 03 - Intermediate ETL of non-geospatial data structures
- Dim tables for DATETIME (dim_period including new date/time attributes) and location (dim_location) created
- Fact table (fact_tenancy) is created"

In [4]:
df_facttenancyorder = [
    "tfm_Time Frame",
    "tfm_Location Id",
    "tfm_Lodged Bonds",
    "tfm_Active Bonds",
    "tfm_Closed Bonds",
    "tfm_Median Rent",
    "tfm_Geometric Mean Rent",
    "tfm_Upper Quartile Rent",
    "tfm_Lower Quartile Rent",
    "tfm_Log Std Dev Weekly Rent"
    ]
df_fact_tenancy = etl_processor.fn_dataframe_selections(df_spark_tfm, df_facttenancyorder)

# Create df_dim_location
location_cols = ["tfm_Location Id", "Location"]
df_dim_location = etl_processor.fn_create_dim_table(df_spark_tfm, location_cols)

# Show the location dimension
print("df_dim_location:")
df_dim_location.show()
df_dim_location.printSchema()

# Create df_dim_period (basic) - original core of distinct time periods
period_cols = ["tfm_Time Frame"]
df_dim_period = etl_processor.fn_create_dim_table(df_spark_tfm, period_cols)
df_dim_period = etl_processor.fn_add_period_attributes(df_dim_period, "tfm_Time Frame")

# Show the period dimension with added attributes
print("\ndf_dim_period:")
df_dim_period.show()
df_dim_period.printSchema()


df_dim_location:


                                                                                

+---------------+--------------------+
|tfm_Location Id|            Location|
+---------------+--------------------+
|            -99|                 ALL|
|             -1|                  NA|
|              1|    Northland Region|
|              2|     Auckland Region|
|              3|      Waikato Region|
|              4|Bay of Plenty Region|
|              5|     Gisborne Region|
|              6|  Hawke's Bay Region|
|              7|     Taranaki Region|
|              8|Manawatu-Wanganui...|
|              9|   Wellington Region|
|             12|   West Coast Region|
|             13|   Canterbury Region|
|             14|        Otago Region|
|             15|    Southland Region|
|             16|       Tasman Region|
|             17|       Nelson Region|
|             18|  Marlborough Region|
+---------------+--------------------+

root
 |-- tfm_Location Id: integer (nullable = true)
 |-- Location: string (nullable = true)


df_dim_period:


                                                                                

+--------------+----+--------------+-----------------+
|tfm_Time Frame|Year|Annual Quarter|Financial Quarter|
+--------------+----+--------------+-----------------+
|    1993-02-01|1993|        1993Q1|           1992Q3|
|    1993-03-01|1993|        1993Q1|           1992Q3|
|    1993-04-01|1993|        1993Q2|           1992Q4|
|    1993-05-01|1993|        1993Q2|           1992Q4|
|    1993-06-01|1993|        1993Q2|           1992Q4|
|    1993-07-01|1993|        1993Q3|           1993Q1|
|    1993-08-01|1993|        1993Q3|           1993Q1|
|    1993-09-01|1993|        1993Q3|           1993Q1|
|    1993-10-01|1993|        1993Q4|           1993Q2|
|    1993-11-01|1993|        1993Q4|           1993Q2|
|    1993-12-01|1993|        1993Q4|           1993Q2|
|    1994-01-01|1994|        1994Q1|           1993Q3|
|    1994-02-01|1994|        1994Q1|           1993Q3|
|    1994-03-01|1994|        1994Q1|           1993Q3|
|    1994-04-01|1994|        1994Q2|           1993Q4|
|    1994-

# 04 - ETL of geospatial data structures
- Quality checking and creation of geospatial data structure (df_dim_geospatial)
- Joining of geospatial and location based data table (df_dim_locationV2)
- Parquet files generated for all star schema tables (reasoning below):
    - dim_location, dim_geospatial, dim_period, and fact_tenancy
    - useful for time travel
    - Supports both current and future/potential architectures (including icebergs, delta tables, AI development, and support for improved performance in application of analysis and data science methodologies)
    - high compression and early initial transformations provide for better resource usage (i.e. processing, storage, memory)"

In [None]:
import fiona
from shapely.geometry import shape

fpath_data_geospatialdata = '/content/CommerceCommission202505/src_data/statsnz-regional-council-2023-clipped-generalised-SHP'
data = []

try:
    with fiona.open(fpath_data_geospatialdata, 'r') as source:
        for feature in source:
            attributes = dict(feature['properties'])
            geometry = shape(feature['geometry'])

            # Option 1: Get centroid (for point-based maps in Power BI)
            if geometry.geom_type in ['Point', 'Polygon', 'LineString', 'MultiPolygon', 'MultiLineString', 'MultiPoint']:
                 # Check if geometry is valid before calculating centroid
                 if geometry.is_valid:
                    centroid = geometry.centroid
                    attributes['centroid_longitude'] = centroid.x
                    attributes['centroid_latitude'] = centroid.y
                 else:
                    print(f"Warning: Invalid geometry found for feature {feature.get('id', 'unknown')}. Cannot calculate centroid.")
                    attributes['centroid_longitude'] = None
                    attributes['centroid_latitude'] = None
            else:
                 attributes['centroid_longitude'] = None
                 attributes['centroid_latitude'] = None

            # Option 2: Convert to WKT (for use with custom Power BI visuals)
            if geometry.is_valid:
                attributes['geometry_wkt'] = geometry.wkt
            else:
                attributes['geometry_wkt'] = None # Assign None if geometry is invalid

            data.append(attributes)

    # Create Spark DataFrame and manipulate data
    df_dim_geospatial = spark.createDataFrame(data)
    df_dim_geospatial = fn_transform_cast(df_dim_geospatial, ["REGC2023_V"], "integer")
    geospatial_cols = ["tfm_REGC2023_V", "centroid_longitude",  "centroid_latitude"]
    df_dim_geospatial = fn_dataframe_selections(df_dim_geospatial, geospatial_cols)

    # Join Geospatial data to Location data
    df_dim_locationV2 = df_dim_location.join(df_dim_geospatial, col("tfm_Location Id") == col("tfm_REGC2023_V"), "left")
    location_cols = ["tfm_Location Id", "Location", "centroid_longitude",  "centroid_latitude"]
    df_dim_locationV2 = fn_dataframe_selections(df_dim_locationV2, location_cols).orderBy("tfm_Location Id")

    # Review datasets
    df_dim_locationV2.show()
    df_dim_locationV2.printSchema()

    # Save to parquet files
    df_fact_tenancy.write.parquet(os.path.join(fpath_data_star_schema, "fact_tenancy.parquet"), mode="overwrite")
    df_dim_locationV2.write.parquet(os.path.join(fpath_data_star_schema,"dim_location.parquet"), mode="overwrite")
    df_dim_period.write.parquet(os.path.join(fpath_data_star_schema, "dim_period.parquet"), mode="overwrite")
    df_dim_geospatial.write.parquet(os.path.join(fpath_data_star_schema, "dim_geospatial.parquet"), mode="overwrite")

except fiona.errors.DriverError as e:
    print(f"Error opening shapefile: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

spark.stop()