In [43]:
import pandas as pd
import polars as pl
from pathlib import Path
import duckdb
from src.enbridgescrape import formatOA, paths

In [2]:
metaPath = paths.downloads / 'MetaData'
configsPath = paths.configs 

In [3]:
dfList= []
for filePath in metaPath.iterdir():
    tempDf = pd.read_csv(filePath)
    dfList.append(tempDf)

In [4]:
df_TSP=pl.LazyFrame(pd.concat(dfList)[['TSP','TSP Name']].drop_duplicates())\
    .with_columns(
        pl.col('TSP Name').alias('TSP_Name')
    )

In [5]:
metaDf = pl.scan_csv( metaPath / "*.csv")

In [None]:
with duckdb.connect(configsPath / "GFPipes.db") as con:
    con.execute("""
                CREATE SEQUENCE IF NOT EXISTS GFPipeID_sequence
                INCREMENT BY 1
                MINVALUE 100
                MAXVALUE 999
                START WITH 100
                """)
    con.execute("""
        CREATE TABLE IF NOT EXISTS GFPipeID_table (
            id INTEGER PRIMARY KEY DEFAULT nextval('GFPipeID_sequence'),
            TSP INTEGER UNIQUE NOT NULL,
            TSP_Name VARCHAR UNIQUE NOT NULL
        );
    """)
    
    con.execute("""
        MERGE INTO GFPipeID_table AS target
        USING df_TSP AS source
        ON target.TSP = source.TSP and target.TSP_Name = source.TSP_Name
        WHEN MATCHED THEN
            UPDATE SET TSP = source.TSP, TSP_Name = source.TSP_Name
        WHEN NOT MATCHED THEN
            INSERT (TSP,TSP_Name) VALUES (source.TSP, source.TSP_Name);
    """)    

# with duckdb.connect(configsPath / "GFPipes.db") as con:
    # con.execute("INSERT INTO GFPipeID_table (TSP,TSP_Name ) SELECT * FROM df_TSP")

In [23]:
with duckdb.connect(configsPath / "GFPipes.db") as con:
    res =   pl.LazyFrame(data=con.execute("""
                                        SELECT * from GFPipeID_table
                                        """,
                                    ).fetchall(), 
                        schema=['id','TSP','TSP_Name'], orient='row')\
                        .with_columns(
                            pl.col('id').cast(pl.String).alias('GFPipeID'))\
                        .select(['GFPipeID','TSP'])

In [44]:
dfOA = formatOA()

ColumnNotFoundError: unable to find column "GFLOC"; valid columns: ["CycleDesc", "EffGasDayTime", "Loc", "LocName", "LocZn", "FlowInd", "LocPurpDesc", "IT", "AllQtyAvail", "DesignCapacity", "OperatingCapacity", "TotalScheduledQuantity", "OperationallyAvailableCapacity", "TSP", "LocSegment", "QtyReason", "Timestamp"]

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'sink' <---
SELECT [col("Cycle_Desc").alias("CycleDesc"), col("Eff_Gas_Day").alias("EffGasDayTime"), col("Loc"), col("Loc_Name").alias("LocName"), col("Loc_Zn").alias("LocZn"), col("Flow_Ind_Desc").alias("FlowInd"), col("Loc_Purp_Desc").alias("LocPurpDesc"), col("IT"), col("All_Qty_Avail").alias("AllQtyAvail"), col("Total_Design_Capacity").alias("DesignCapacity"), col("Operating_Capacity").alias("OperatingCapacity"), col("Total_Scheduled_Quantity").alias("TotalScheduledQuantity"), col("Operationally_Available_Capacity").alias("OperationallyAvailableCapacity"), col("TSP"), col("LocSegment"), col("QtyReason"), col("Timestamp")]
   WITH_COLUMNS:
   [col("Eff_Gas_Day").python_udf(), col("Total_Design_Capacity").python_udf(), col("Operating_Capacity").python_udf(), col("Total_Scheduled_Quantity").python_udf(), col("Operationally_Available_Capacity").python_udf(), col("Flow_Ind_Desc").python_udf(), null.cast(String).alias("LocSegment"), null.cast(String).alias("QtyReason"), 2025-11-13 21:47:23.008923.alias("Timestamp"), col("TSP")] 
    FILTER [([([([([([([([([([([([(col("Cycle_Desc").is_not_null()) & (col("Eff_Gas_Day").is_not_null())]) & (col("Loc").is_not_null())]) & (col("Loc_Name").is_not_null())]) & (col("Loc_Zn").is_not_null())]) & (col("Flow_Ind_Desc").is_not_null())]) & (col("Loc_Purp_Desc").is_not_null())]) & (col("IT").is_not_null())]) & (col("All_Qty_Avail").is_not_null())]) & (col("Total_Design_Capacity").is_not_null())]) & (col("Operating_Capacity").is_not_null())]) & (col("Total_Scheduled_Quantity").is_not_null())]) & (col("Operationally_Available_Capacity").is_not_null())]
    FROM
      SELECT [col("Cycle_Desc"), col("Eff_Gas_Day"), col("Loc"), col("Loc_Name"), col("Loc_Zn"), col("Flow_Ind_Desc"), col("Loc_Purp_Desc"), col("IT"), col("All_Qty_Avail"), col("Total_Design_Capacity"), col("Operating_Capacity"), col("Total_Scheduled_Quantity"), col("Operationally_Available_Capacity"), col("TSP")]
        Csv SCAN [/Users/saijagadeeshyadavalli/Coding/Particles/GasFundies/enbridgeGitDump/downloads/enbridge/OA/AG_OA_20221114_INTRDY_2022-11-15_0900.csv, ... 1588 other sources]
        PROJECT */21 COLUMNS

In [10]:
from datetime import datetime

import functools
import operator

OA_path = paths.downloads / 'OA'
selectedCols = ['Cycle_Desc', 'Eff_Gas_Day', 'Loc', 'Loc_Name', 'Loc_Zn', 'Flow_Ind_Desc', 'Loc_Purp_Desc', 'IT', 'All_Qty_Avail',
                'Total_Design_Capacity', 'Operating_Capacity', 'Total_Scheduled_Quantity', 'Operationally_Available_Capacity']


In [11]:
def batchDateParse(inSeries: pl.Series) -> pl.Series:
    return pl.Series(map(lambda inString: datetime.strptime(inString, "%m-%d-%Y").date(), inSeries))


def batchFloatParse(inSeries: pl.Series) -> pl.Series:
    return pl.Series(map(lambda inString: float(inString.replace(',', '')), inSeries))


def batchFIMapper(inSeries: pl.Series) -> pl.Series:
    flow_Map = {
        'Delivery': 'D',
        'Receipt': 'R',
        'Storage Injection': 'D',
        'Storage Withdrawal': 'R'
    }
    return pl.Series(map(lambda inString: flow_Map.get(inString, 'D'), inSeries))



In [33]:
df = pl.scan_csv(OA_path / "*_OA_*.csv",
                    glob=True,
                    has_header=True,
                    )\
    .select([*selectedCols, 'TSP'])\
    .with_columns(pl.col("TSP").cast(pl.Int64))\
    .filter(
    ~functools.reduce(
        operator.or_,
        map(lambda col: pl.col(col).is_null(), selectedCols)
    ))\
    .join(res, on='TSP', how='left')

In [41]:
df.with_columns(
        pl.col("Eff_Gas_Day").map_batches(
            batchDateParse, return_dtype=pl.Date),
        pl.col("Total_Design_Capacity").map_batches(
            batchFloatParse, return_dtype=pl.Float64),
        pl.col("Operating_Capacity").map_batches(
            batchFloatParse, return_dtype=pl.Float64),
        pl.col("Total_Scheduled_Quantity").map_batches(
            batchFloatParse, return_dtype=pl.Float64),
        pl.col("Operationally_Available_Capacity").map_batches(
            batchFloatParse, return_dtype=pl.Float64),
        pl.col("Flow_Ind_Desc").map_batches(
            batchFIMapper, return_dtype=pl.String),
        pl.lit(None).cast(pl.String).alias('LocSegment'),
        pl.lit(None).cast(pl.String).alias('QtyReason'),
        pl.lit(datetime.now()).cast(
            pl.Datetime).alias('Timestamp')
    )\
        .rename({
            'Cycle_Desc': 'CycleDesc',
            'Eff_Gas_Day': 'EffGasDayTime',
            'Loc_Name': 'LocName',
            'Loc_Zn': 'LocZn',
            'Flow_Ind_Desc': 'FlowInd',
            'Loc_Purp_Desc': 'LocPurpDesc',
            'All_Qty_Avail': 'AllQtyAvail',
            'Total_Design_Capacity': 'DesignCapacity',
            'Operating_Capacity': 'OperatingCapacity',
            'Total_Scheduled_Quantity': 'TotalScheduledQuantity',
            'Operationally_Available_Capacity': 'OperationallyAvailableCapacity',
            # 'TSP_Name': 'PipelineName'
        })\
        .select([
            'EffGasDayTime',
            # 'PipelineName',
            'CycleDesc',
            'LocPurpDesc',
            'Loc',
            'LocName',
            'LocZn',
            'LocSegment',
            'DesignCapacity',
            'OperatingCapacity',
            'TotalScheduledQuantity',
            'OperationallyAvailableCapacity',
            'IT',
            'FlowInd',
            'AllQtyAvail',
            'QtyReason',
            'Timestamp',
            'GFPipeID',
            'TSP'
        ]).collect()

EffGasDayTime,CycleDesc,LocPurpDesc,Loc,LocName,LocZn,LocSegment,DesignCapacity,OperatingCapacity,TotalScheduledQuantity,OperationallyAvailableCapacity,IT,FlowInd,AllQtyAvail,QtyReason,Timestamp,GFPipeID,TSP
date,str,str,str,str,str,str,f64,f64,f64,f64,str,str,str,str,datetime[μs],str,i64
2022-11-14,"""INTRDY_2022-11-15_0900""","""Delivery Location""","""70004""","""Dominion Energy Transmission, …","""2""",,593830.0,600490.0,9556.0,590934.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2022-11-14,"""INTRDY_2022-11-15_0900""","""Delivery Location""","""70011""","""Columbia Gas of PA. - Eagle,…","""3""",,291789.0,444405.0,125122.0,319283.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2022-11-14,"""INTRDY_2022-11-15_0900""","""Delivery Location""","""70017""","""Indiana Gas - Greensburg, IN""","""2""",,21573.0,21573.0,2008.0,19565.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2022-11-14,"""INTRDY_2022-11-15_0900""","""Delivery Location""","""70018""","""Indiana Gas - Seymour, IN""","""2""",,57331.0,58910.0,400.0,58510.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2022-11-14,"""INTRDY_2022-11-15_0900""","""Delivery Location""","""70020""","""Equitrans - Waynesburg, PA""","""2""",,5011.0,5488.0,0.0,5488.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
2025-11-13,"""INTRDY_2025-11-13_1801""","""Delivery Location""","""79980""","""UNIONTOWN NORTH""","""2""",,677000.0,677000.0,20420.0,656580.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2025-11-13,"""INTRDY_2025-11-13_1801""","""Receipt Location""","""79980""","""UNIONTOWN NORTH""","""2""",,552000.0,552000.0,20420.0,531580.0,"""N""","""R""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2025-11-13,"""INTRDY_2025-11-13_1801""","""Delivery Location""","""79993""","""NEXUS INTERCONNECT WITH TETLP,…","""2""",,637559.0,637559.0,0.0,637559.0,"""N""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
2025-11-13,"""INTRDY_2025-11-13_1801""","""Delivery Location""","""79995""","""NEXUS INTERCONNECT WITH TETLP …","""2""",,6.335806e6,6.335806e6,273545.0,6.062261e6,"""Y""","""D""","""Y""",,2025-11-13 21:13:56.225080,"""112""",7932908
