Import the Necessary Libraries, Define Environment Variables, and Configure Logging

In [51]:
# Import Necessary Libraries
import os
import json
import re
import logging
import pandas as pd
from google.cloud import firestore, bigquery, storage
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import bigquery
from firebase_admin import credentials, firestore
from dotenv import load_dotenv
from google.auth.exceptions import RefreshError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, concat_ws, sha2, posexplode, substring
from google.oauth2 import service_account

# Load environment variables
load_dotenv()

project_id = os.getenv("GOOGLE_FIRESTORE_PROJECT_ID")
database_name = os.getenv("GOOGLE_FIRESTORE_DATABASE_NAME")
collection_name = os.getenv("GOOGLE_FIRESTORE_COLLECTION_NAME")
bigquery_project_id = os.getenv("GOOGLE_BIGQUERY_PROJECT_ID")
bigquery_dataset_id = os.getenv("GOOGLE_BIGQUERY_DATASET_NAME")
gcp_credentials_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

# Configure Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

Ensure the Credentials file exists

In [52]:
# current working directory
print (f"Current working directory: {os.getcwd()}")

# Get the parent directory
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))

print (f"Parent directory: {parent_dir}")

# path to the credentials file
gcp_credentials_path = os.path.join(parent_dir, "config/gcp_credentials.json")
print (f"Credentials path: {gcp_credentials_path}")

# path to query file
query_file_path = os.path.join(parent_dir, "database/create_tables.sql")
print (f"Query file path: {query_file_path}")

# Check if the credentials file exists
if not os.path.isfile(gcp_credentials_path):
    print(f"Error: Credentials file not found at {gcp_credentials_path}")
    exit(1)

# Print if the file cotain json
try:
    with open(gcp_credentials_path, 'r') as f:
        json_data = json.load(f)
        logging.info("Credentials file contains valid JSON.")
except json.JSONDecodeError:
    logging.error(f"Credentials file is not valid JSON.")
    exit(1)

# Ensure credentials file exists
if not os.path.exists(gcp_credentials_path):
    logging.error(f"Failed to initialize Firestore and BigQuery: File {gcp_credentials_path} was not found.")
    exit(1)

2025-03-02 09:09:09,857 - INFO - Credentials file contains valid JSON.


Current working directory: /workspaces/shipment-tracking-project/notebooks
Parent directory: /workspaces/shipment-tracking-project
Credentials path: /workspaces/shipment-tracking-project/config/gcp_credentials.json
Query file path: /workspaces/shipment-tracking-project/database/create_tables.sql


Initialize BigQuery Client

In [53]:
# Initialize BigQuery client
try:
    client = bigquery.Client.from_service_account_json(gcp_credentials_path)
    logging.info("BigQuery client initialized successfully!")
except RefreshError as e:
    logging.error(f"Failed to initialize BigQuery due to authentication error: {e}")
    exit(1)
except Exception as e:
    logging.error(f"Failed to initialize BigQuery: {e}")
    exit(1)

# Get table names from BigQuery dataset
try:
    query = f"SELECT table_name FROM `{bigquery_project_id}.{bigquery_dataset_id}.INFORMATION_SCHEMA.TABLES`"
    tables = client.query(query).to_dataframe()
    table_names = tables["table_name"].tolist()
    logging.info(f"Tables in BigQuery dataset: {table_names}")
except GoogleAPICallError as e:
    logging.error(f"BigQuery API error: {e}")
    exit(1)

2025-03-02 09:09:09,905 - INFO - BigQuery client initialized successfully!


2025-03-02 09:09:11,919 - INFO - Tables in BigQuery dataset: ['DocumentReferences', 'EquipmentEvent', 'Seals', 'TransportCall', 'EventLocation']


Initialize Firestore client

In [54]:
# Initialize Firestore client
try:
    credentials = service_account.Credentials.from_service_account_file(gcp_credentials_path)
    db = firestore.Client(project=project_id, database= database_name, credentials=credentials)
    logging.info("Firestore client initialized successfully!")
except RefreshError as e:
    logging.error(f"Failed to initialize Firestore due to authentication error: {e}")
    exit(1)
except Exception as e:
    logging.error(f"Failed to initialize Firestore: {e}")
    exit(1)

2025-03-02 09:09:11,966 - INFO - Firestore client initialized successfully!


# Snowflake Schema Strcuture Generation and Testing with Fetched Data from Firestore

In [55]:
# Read 5 random documents from Firestore collection
random_documents = db.collection(collection_name).limit(20).get()

# Take those to JSON format
json_data = [doc.to_dict() for doc in random_documents]

In [56]:
# Initialize Spark session
spark = SparkSession.builder.appName("NestedJSONProcessing").getOrCreate()

In [57]:
# Create a DataFrame from the JSON data
df = spark.read.json(spark.sparkContext.parallelize([json_data]), multiLine=True)
df.printSchema()

root
 |-- documentReferences: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- documentReferenceType: string (nullable = true)
 |    |    |-- documentReferenceValue: string (nullable = true)
 |-- equipmentEventTypeCode: string (nullable = true)
 |-- equipmentReference: string (nullable = true)
 |-- eventClassifierCode: string (nullable = true)
 |-- eventCreatedDateTime: string (nullable = true)
 |-- eventDateTime: string (nullable = true)
 |-- eventID: string (nullable = true)
 |-- eventLocation: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- floor: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- postCode: string (nullable = true)
 |    |    |-- stateRegion: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- streetNumber: string (nullable = true

In [58]:
# Flatten the main event fields
df_events = df.select(
    col("eventID").alias("eventID"),
    col("eventCreatedDateTime").alias("eventCreatedDateTime"),
    col("eventClassifierCode").alias("eventClassifierCode"),
    col("equipmentEventTypeCode").alias("equipmentEventTypeCode"),
    col("equipmentReference").alias("equipmentReference"),
    col("eventDateTime").alias("eventDateTime"),
    col("eventType").alias("eventType")
)

df_events.show(truncate=False)

+------------------------------------+--------------------------+-------------------+----------------------+------------------+--------------------------+---------+
|eventID                             |eventCreatedDateTime      |eventClassifierCode|equipmentEventTypeCode|equipmentReference|eventDateTime             |eventType|
+------------------------------------+--------------------------+-------------------+----------------------+------------------+--------------------------+---------+
|00006ba7-4179-4b4d-a8b9-7020e4227ff4|2024-06-20T08:12:59.656226|ACT                |UNLOAD                |APZU2386924       |2024-06-20T08:12:59.656226|EQUIPMENT|
|00052089-881b-4774-9131-cef97a6e9e7f|2024-05-13T08:12:59.276063|EST                |UNLOAD                |APZU3785346       |2024-05-13T08:12:59.276063|EQUIPMENT|
|0005ee8f-8e98-4015-bd9c-5165a17c66e2|2024-04-18T08:13:00.066059|EST                |ARRIVAL               |APZU7792130       |2024-04-18T08:13:00.066059|EQUIPMENT|
|0007d2dd-

In [59]:
# Extract document references with eventID initially
df_document_references = df.select(
    col("eventID").alias("eventID"),
    posexplode(col("documentReferences")).alias("documentIndex", "document_reference")
).select(
    col("eventID"),
    col("documentIndex"),
    col("document_reference.documentReferenceType").alias("documentReferenceType"),
    col("document_reference.documentReferenceValue").alias("documentReferenceValue")
)

# Generate documentReferenceID using documentReferenceType and documentReferenceValue with Truncate the SHA-256 hash to 16 characters
df_document_references = df_document_references.withColumn(
    "documentReferenceID",
    substring(
        sha2(
            concat_ws("_", col("documentReferenceType"), col("documentReferenceValue")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)


# Join with df_events to add documentReferenceID
df_events = df_events.join(
    df_document_references.select("eventID", "documentReferenceID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_document_references (Avoid duplicates) after removing eventID
df_document_references = df_document_references.drop("eventID").dropDuplicates(["documentReferenceID"])

# Reorder columns to ensure documentReferenceID is the first column in df_document_references
df_document_references = df_document_references.select(
    "documentReferenceID",
    *[col for col in df_document_references.columns if col!= "documentReferenceID"]
)

# Reorder columns to ensure eventID is the first column in df_events
df_events = df_events.select(
    "eventID",
    *[col for col in df_events.columns if col != "eventID"]
)

# Show results
df_document_references.show(truncate=False)

+-------------------+-------------+---------------------+------------------------------------+
|documentReferenceID|documentIndex|documentReferenceType|documentReferenceValue              |
+-------------------+-------------+---------------------+------------------------------------+
|0acc32d764707cf6   |0            |BL                   |51770229-8fb7-4874-b277-05916fbd81d4|
|15b270dff3f2dd74   |0            |BL                   |902ae94a-2c8d-481d-a64c-e3e0d0e40552|
|1fc10abdc9ce7953   |0            |BL                   |b510ce35-82e2-418c-be24-23537f216701|
|280bc9e6e5f967fc   |0            |BL                   |55b4e35a-2f9f-4e24-adba-75d6316ebe6c|
|329e6e52e69e4ab5   |0            |BL                   |8a106b97-5dd4-45ad-bb61-a85015e3f230|
|3c0308545bb818fa   |0            |BL                   |0aed64c3-f490-4193-a65b-922a9cf3c95b|
|3ccd9dded5297f21   |0            |BL                   |c484abf3-1e98-4956-b91e-7706ff838e03|
|43354dd846307153   |0            |BL             

In [60]:
#  Extract eventLocation
df_event_location = df.select(
    col("eventID").alias("eventID"),
    col("eventLocation.latitude").alias("latitude"),
    col("eventLocation.longitude").alias("longitude"),
    col("eventLocation.unlocationCode").alias("unlocationCode")
)

# Extract eventTransportCall
df_event_transport_call = df.select(
    col("eventID").alias("eventID"),
    col("transportCall.transportCallID").alias("transportCallID"),
    col("transportCall.carrierServiceCode").alias("carrierServiceCode"),
    col("transportCall.exportVoyageNumber").alias("exportVoyageNumber"),
    col("transportCall.importVoyageNumber").alias("importVoyageNumber"),
    col("transportCall.modeOfTransport").alias("modeOfTransport")
)

# Extract eventAddress
df_event_address = df.select(
    col("eventID").alias("eventID"),
    col("eventLocation.address.stateRegion").alias("stateRegion"),
    col("eventLocation.address.streetNumber").alias("streetNumber"),
    col("eventLocation.address.street").alias("street"),
    col("eventLocation.address.country").alias("country"),
    col("eventLocation.address.floor").alias("floor"),
    col("eventLocation.address.city").alias("city"),
    col("eventLocation.address.postCode").alias("postCode"),
    col("eventLocation.address.name").alias("name")
)

# Generate addressID using stateRegion, streetNumber, street, country, floor, city, postCode & name with Truncate the SHA-256 hash to 16 characters
df_event_address = df_event_address.withColumn(
    "addressID",
    substring(
        sha2(
            concat_ws("_", col("stateRegion"), col("streetNumber"), col("street"), col("country"), col("floor"), col("city"), col("postCode"), col("name")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)

# Extract eventFacility
df_event_facility = df.select(
    col("eventID").alias("eventID"),
    col("eventLocation.facilityCodeListProvider").alias("facilityCodeListProvider"),
    col("eventLocation.facilityCode").alias("facilityCode"),
    col("transportCall.facilityTypeCode").alias("facilityTypeCode"),
)

# Generate facilityID using facilityCodeListProvider,facilityCode & facilityTypeCode with Truncate the SHA-256 hash to 16 characters
df_event_facility = df_event_facility.withColumn(
    "facilityID",
    substring(
        sha2(
            concat_ws("_", col("facilityCodeListProvider"), col("facilityCode"), col("facilityTypeCode")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)

# Extract vessel
df_vessel = df.select(
    col("eventID").alias("eventID"),
    col("transportCall.vessel.vesselCallSignNumber").alias("vesselCallSignNumber"),
    col("transportCall.vessel.vesselFlag").alias("vesselFlag"),
    col("transportCall.vessel.vesselIMONumber").alias("vesselIMONumber"),
    col("transportCall.vessel.vesselName").alias("vesselName"),
    col("transportCall.vessel.vesselOperatorCarrierCode").alias("vesselOperatorCarrierCode"),
    col("transportCall.vessel.vesselOperatorCarrierCodeListProvider").alias("vesselOperatorCarrierCodeListProvider"),
)

# Generate vesselID using vesselCallSignNumber, vesselFlag, vesselIMONumber, vesselName, vesselOperatorCarrierCode & vesselOperatorCarrierCodeListProvider with Truncate the SHA-256 hash to 16 characters
df_vessel = df_vessel.withColumn(
    "vesselID",
    substring(
        sha2(
            concat_ws("_", col("vesselCallSignNumber"), col("vesselFlag"), col("vesselIMONumber"), col("vesselName"), col("vesselOperatorCarrierCode"), col("vesselOperatorCarrierCodeListProvider")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)

# Join with df_transport_call to add vesselID
df_event_transport_call = df_event_transport_call.join(
    df_vessel.select("eventID", "vesselID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_vessel (Avoid duplicates) after removing eventID
df_vessel = df_vessel.drop("eventID").dropDuplicates(["vesselID"])

# Reorder columns to ensure vesselID is the first column in df_vessel
df_vessel = df_vessel.select(
    "vesselID",
    *[col for col in df_vessel.columns if col!= "vesselID"]
)

# Join with df_event_transport_call to add facilityID
df_event_transport_call = df_event_transport_call.join(
    df_event_facility.select("eventID", "facilityID"),
    "eventID",
    "left"
)

# Join with df_event_location to add facilityID
df_event_location = df_event_location.join(
    df_event_facility.select("eventID", "facilityID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_event_facility (Avoid duplicates) after removing eventID
df_event_facility = df_event_facility.drop("eventID").dropDuplicates(["facilityID"])

# Reorder columns to ensure facilityID is the first column in df_event_facility
df_event_facility = df_event_facility.select(
    "facilityID",
    *[col for col in df_event_facility.columns if col!= "facilityID"]
)


# Join with df_events to add addressID
df_event_location = df_event_location.join(
    df_event_address.select("eventID", "addressID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_event_address (Avoid duplicates) after removing eventID
df_event_address = df_event_address.drop("eventID").dropDuplicates(["addressID"])

# Reorder columns to ensure addressID is the first column in df_event_address
df_event_address = df_event_address.select(
    "addressID",
    *[col for col in df_event_address.columns if col!= "addressID"]
)

# Join with df_event to add transportCallID (Not Generated as it already exists in the table)
df_events = df_events.join(
    df_event_transport_call.select("eventID", "transportCallID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_event_transport_call (Avoid duplicates) after removing event
df_event_transport_call = df_event_transport_call.drop("eventID").dropDuplicates(["transportCallID"])

# Reorder columns to ensure transportCallID is the first column in df_event_transport_call
df_event_transport_call = df_event_transport_call.select(
    "transportCallID",
    *[col for col in df_event_transport_call.columns if col!= "transportCallID"]
)

# Generate locationID using latitude, longitude, unlocationCode, facilityID & addressID with Truncate the SHA-256 hash to 16 characters
df_event_location = df_event_location.withColumn(
    "locationID",
    substring(
        sha2(
            concat_ws("_", col("latitude"), col("longitude"), col("unlocationCode"), col("facilityID"), col("addressID")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)

# Join with df_events to add locationID
df_events = df_events.join(
    df_event_location.select("eventID", "locationID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_event_location (Avoid duplicates) after removing eventID
df_event_location = df_event_location.drop("eventID").dropDuplicates(["locationID"])

# Reorder columns to ensure locationID is the first column in df_event_location
df_event_location = df_event_location.select(
    "locationID",
    *[col for col in df_event_location.columns if col!= "locationID"]
)

# Show results
df_event_location.show(truncate=False)
df_event_facility.show(truncate=False)
df_event_address.show(truncate=False)
df_event_transport_call.show(truncate=False)
df_vessel.show(truncate=False)

+----------------+----------+-----------+--------------+----------------+----------------+
|locationID      |latitude  |longitude  |unlocationCode|facilityID      |addressID       |
+----------------+----------+-----------+--------------+----------------+----------------+
|20295a786296cb50|-71.662949|-163.743302|XYZ123        |ebe17dd288980f4f|fb0841e219ac3c59|
|294aff39498a61ec|46.461771 |24.302402  |XYZ123        |c8b105d32b7aee8e|bb446de5477bb6a7|
|382bb63ebf34a320|14.587769 |142.955459 |XYZ123        |c8b105d32b7aee8e|ede446daad851e9e|
|3dc0406f66c9b097|75.61647  |134.232339 |XYZ123        |c8b105d32b7aee8e|3459f3efd37cd1d8|
|5121df9891c01c6f|-55.313004|104.079924 |XYZ123        |ebe17dd288980f4f|eb2da1adc1b6f947|
|5b5ea0ebd3fa9f20|-23.124252|78.769813  |XYZ123        |c8b105d32b7aee8e|05821b5424c8b7d4|
|7e2cbf0de8bfcbe2|-20.181494|155.9459   |XYZ123        |41d8f6d811d2cce1|8205ef90ab27a362|
|830c09ec3016f92c|78.033694 |114.864879 |XYZ123        |ebe17dd288980f4f|ccfe1ef0a6c99c61|

In [61]:
# Explode the 'seals' array with position
df_seals = df.select(
    col("eventID").alias("eventID"),
    posexplode("seals").alias("sealIndex", "seal")
)

# Extract seal fields
df_seals = df_seals.select(
    col("eventID"),
    col("sealIndex"),  # Add seal index column
    col("seal.sealNumber").alias("sealNumber"),
    col("seal.sealSource").alias("sealSource"),
    col("seal.sealType").alias("sealType")
)

# Generate sealID using sealNumber, sealSource, sealType with Truncate the SHA-256 hash to 16 characters
df_seals = df_seals.withColumn(
    "sealID",
    substring(
        sha2(
            concat_ws("_", col("sealNumber"), col("sealSource"), col("sealType")),
            256
        ),
        1, 16  # Truncate the SHA-256 hash to 16 characters
    )
)

# Join with df_event to add sealID
df_events = df_events.join(
    df_seals.select("eventID", "sealID"),
    "eventID",
    "left"
)

# Ensure uniqueness in df_seals (Avoid duplicates) after removing eventID
df_seals = df_seals.drop("eventID").dropDuplicates(["sealID"])

# Reorder columns to ensure sealID is the first column in df_seals
df_seals = df_seals.select(
    "sealID",
    *[col for col in df_seals.columns if col!= "sealID"]
)

# Show results
df_events.show(truncate=False)
df_seals.show(truncate=False)

                                                                                

+------------------------------------+--------------------------+-------------------+----------------------+------------------+--------------------------+---------+-------------------+------------------------------------+----------------+----------------+
|eventID                             |eventCreatedDateTime      |eventClassifierCode|equipmentEventTypeCode|equipmentReference|eventDateTime             |eventType|documentReferenceID|transportCallID                     |locationID      |sealID          |
+------------------------------------+--------------------------+-------------------+----------------------+------------------+--------------------------+---------+-------------------+------------------------------------+----------------+----------------+
|00006ba7-4179-4b4d-a8b9-7020e4227ff4|2024-06-20T08:12:59.656226|ACT                |UNLOAD                |APZU2386924       |2024-06-20T08:12:59.656226|EQUIPMENT|1fc10abdc9ce7953   |0c582809-f5b3-4641-bf50-c5f724ed11f0|8645fda6667