In [None]:
# Option 1: Install from git repository
%pip install git+https://github.com/yourusername/petrinex-python-api.git --quiet

# Option 2: Install from local wheel (if uploaded to DBFS)
# %pip install /dbfs/FileStore/wheels/petrinex-0.1.0-py3-none-any.whl --quiet

# Restart Python kernel to use the newly installed package
dbutils.library.restartPython()


In [None]:
from petrinex import PetrinexVolumetricsClient
from pyspark.sql import functions as F
from datetime import datetime, timedelta

# Initialize the Petrinex client
client = PetrinexVolumetricsClient(
    spark=spark,
    jurisdiction="AB",      # Alberta
    file_format="CSV"
)

print("âœ“ Petrinex client initialized successfully")


 copy the 

In [None]:
# Check what files have been updated in the last 30 days
cutoff_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")

files = client.list_updated_after(cutoff_date)

print(f"Found {len(files)} file(s) updated after {cutoff_date}\n")
print("Production Month | Updated Date        | URL")
print("-" * 100)

for f in files[:10]:  # Show first 10
    print(f"{f.production_month:15} | {str(f.updated_ts):19} | {f.url}")

if len(files) > 10:
    print(f"\n... and {len(files) - 10} more files")


'

In [None]:
# Define the cutoff date (e.g., load data updated in 2026)
updated_after = "2026-01-01"

print(f"Loading data updated after {updated_after}...")
print("This may take a few minutes depending on the number of files...\n")

# Read data using the pandas-based method (UC-friendly)
df = client.read_updated_after_as_spark_df_via_pandas(
    updated_after,
    pandas_read_kwargs={
        "dtype": str,           # Force all columns to string (avoid mixed types)
        "encoding": "latin1"    # Handle special characters properly
    },
    add_provenance_columns=True,  # Add tracking columns
    union_by_name=True            # Handle schema drift across months
)

# Cache the DataFrame for better performance
df.cache()

row_count = df.count()
print(f"âœ“ Loaded {row_count:,} rows")
print(f"âœ“ Columns: {len(df.columns)}")


woul

In [None]:
# Display schema
print("DataFrame Schema:")
print("=" * 80)
df.printSchema()


In [None]:
# Show sample data
print("\nSample Data (first 10 rows):")
print("=" * 80)
display(df.limit(10))


In [None]:
# Check provenance columns
print("Data Provenance:")
print("=" * 80)

provenance_df = df.select(
    "production_month",
    "file_updated_ts"
).distinct().orderBy("production_month")

display(provenance_df)


## 6. Data Quality Checks


In [None]:
# Count records by production month
print("Records by Production Month:")
print("=" * 80)

monthly_counts = df.groupBy("production_month") \
    .agg(F.count("*").alias("record_count")) \
    .orderBy("production_month")

display(monthly_counts)


## 7. Write to Delta Table

Write the data to a Delta table for further analysis.


In [None]:
# Define catalog, schema, and table names
catalog_name = "main"  # or your catalog name
schema_name = "petrinex"  # or your schema name
table_name = "volumetrics_raw"

full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

# Create schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")
print(f"âœ“ Schema {catalog_name}.{schema_name} ready")

# Add year and month columns for partitioning
df_final = df.withColumn(
    "data_loaded_at",
    F.current_timestamp()
).withColumn(
    "year",
    F.substring(F.col("production_month"), 1, 4)
).withColumn(
    "month",
    F.substring(F.col("production_month"), 6, 2)
)

# Write as Delta table with partitioning
(
    df_final.write
    .format("delta")
    .mode("overwrite")  # Use "append" for incremental loads
    .partitionBy("year", "month")
    .option("overwriteSchema", "true")
    .saveAsTable(full_table_name)
)

print(f"âœ“ Data written to {full_table_name}")


## 8. Verify and Query the Table


In [None]:
# Read back from the table
result_df = spark.table(full_table_name)

print(f"Table: {full_table_name}")
print("=" * 80)
print(f"Total rows: {result_df.count():,}")
print(f"Partitions: year, month")

# Query the table
query_result = spark.sql(f"""
    SELECT 
        production_month,
        COUNT(*) as record_count,
        MIN(data_loaded_at) as loaded_at
    FROM {full_table_name}
    GROUP BY production_month
    ORDER BY production_month DESC
""")

print("\nData Summary by Production Month:")
display(query_result)


## Summary

âœ… **Completed Steps:**
1. Installed petrinex package
2. Listed available files from Petrinex
3. Loaded data using UC-friendly pandas method (no ANY FILE privilege needed)
4. Performed data quality checks
5. Wrote to Delta table with year/month partitioning
6. Verified the results

**Next Steps:**
- Schedule this notebook as a job for regular updates
- Implement incremental loading based on `file_updated_ts`
- Add data validation and alerting
- Create downstream analytics tables
- Build dashboards and reports

**Key Features Used:**
- âœ… Unity Catalog compatible (pandas-based read)
- âœ… Automatic schema alignment across months
- âœ… Provenance tracking (source files, update dates)
- âœ… Delta table with partitioning for performance


In [None]:
# Cleanup: Unpersist cached DataFrames
df.unpersist()

print("âœ… Notebook execution complete!")
print(f"ðŸ“Š Data available at: {full_table_name}")
