In [1]:
"""
For setting up local imports in an Ipython Shell
This is a workaround for ipython, dont need it for basic python scripts
"""
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)


In [47]:
import pandas as pd
import numpy as np
# from library.lib_aws import AddData
from library.lib_dyna import CardFunctions
from config import username, password, endpoint, data_path
import boto3
from library import lib_aws

# PLotting
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl

# Options
mpl.rcParams['figure.figsize'] = (15,5)
mpl.rcParams['axes.grid'] = False
plt.style.use('dark_background')
pd.set_option('display.max_rows', 1000)

In [3]:
import sqlalchemy
from sqlalchemy.orm import Session
# from config import username, password, endpoint
from io import StringIO 
import csv
import time
from geoalchemy2 import Geometry
import sys
import os

from geoalchemy2.shape import from_shape
from pyefd import elliptic_fourier_descriptors
from shapely.geometry import Polygon
from shapely.wkb import loads
import struct

# Helps with relative imports from outside
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Clean up strings
def node_clean(node_str):
    """
    Function that cleans up NodeID strings
    """
    node_str = " ".join(node_str.split())  # remove empty white spaces
    node_str = node_str.replace('#', "").strip().lower().title()  # remove # character, plus clean characters
    node_str =  node_str[0:-2] + node_str[-2:].upper() # last 2 characters whill alwsy be upper case
    return node_str

# DataBase Classes
class PostgresRDS(object):
    """
    Class Connects to a PostgreSQL DB with password access
    Need to input the database that needs to be connected to
    Note Set the username, password and endpoint in the config file via env variables
    """

    def __init__(self, db, verbose=0):
        self.engine = None
        self.Session = None
        self.db = db
        self.vprint = print if verbose != 0 else lambda *a, **k: None

    def connect(self):
        """
        Connects to the db and gives us the engine
        :return: engine
        """
        engine_config = {
            'sqlalchemy.url': 'postgresql+psycopg2://{user}:{pw}@{host}/{db}'.format(
                user=username,
                pw=password,
                host=endpoint,
                db=self.db
            ),
            'sqlalchemy.pool_pre_ping': True,
            'sqlalchemy.pool_recycle': 3600
        }

        engine = sqlalchemy.engine_from_config(engine_config, prefix='sqlalchemy.')
        self.Session = Session(engine)

        return engine

    def __enter__(self):
        self.engine = self.connect()
        self.vprint("Connected to {} DataBase".format(self.db))
        return self.engine

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.Session.close()
        self.engine.dispose()
        self.vprint("Connection Closed")


class AddData:
    """
    Class which has methods to add data into a postgres db
    """

    @staticmethod
    def psql_insert_copy(table, conn, keys, data_iter):
        """
        Execute SQL statement inserting data

        Parameters
        ----------
        table : pandas.io.sql.SQLTable
        conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
        keys : list of str
            Column names
        data_iter : Iterable that iterates the values to be inserted
        """
        # gets a DBAPI connection that can provide a cursor
        dbapi_conn = conn.connection
        with dbapi_conn.cursor() as cur:
            s_buf = StringIO()
            writer = csv.writer(s_buf)
            writer.writerows(data_iter)
            s_buf.seek(0)

            columns = ', '.join('"{}"'.format(k) for k in keys)
            if table.schema:
                table_name = '{}.{}'.format(table.schema, table.name)
            else:
                table_name = table.name

            sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
                table_name, columns)
            cur.copy_expert(sql=sql, file=s_buf)

    @staticmethod
    def add_data(df, db, table, schema=None, merge_type='append', card_col=None, index_col=None):
        """
        Method to add data to a postgres db
        :param df: Data in the form of a pandas DataFrame
        :param db: Database Name (str)
        :param table: Table Name (str)
        :param schema: Schema Name (Default is None, in this case will add to the public schema)
        :param merge_type: How to add data. Either 'append' or 'replace'. Default: 'append'
        :param card_col: If Card Columns are present. Default('None')
        :param index_col: If an index column is needed. Default('None')
        :return:
        """
        t0 = time.time()
        if card_col is not None:
            dtype_dict = {i: Geometry("POLYGON") for i in card_col}
        else:
            dtype_dict = None

        if index_col is not None:
            try:
                df.set_index(index_col, inplace=True)
            except KeyError:  # Index Column is already set
                pass

        with PostgresRDS(db=db) as engine:
            try:
                df.to_sql(table, con=engine, schema=schema, if_exists=merge_type, method=AddData.psql_insert_copy,
                          dtype=dtype_dict)
            except Exception as e:
                print(e)
                print("Data Not Added")
                return False

        t1 = time.time()
        print("Data {}ed on Table {} in time {:.2f}s".format(merge_type, table, t1 - t0))
        return True


# Checking how the hourly Imports look

## XDIAG

In [16]:
file_name = 'XDiagResults.E2E.20200912.1900.csv'
full_path = os.path.join(data_path, file_name)

temp_xdiag = pd.read_csv(full_path, error_bad_lines=False, parse_dates=['Date', 'AnalysisDate'])
temp_xdiag["NodeID"] = temp_xdiag["NodeID"].apply(node_clean)
temp_xdiag.head()

Unnamed: 0,NodeID,Date,PumpCond1,PumpCond2,FillagePct,NetProd,FluidLevelXDiag,ElecCostPerBO,ElecCostMinTorquePerBO,ElectCostMinEnergyPerBO,...,ElecCostMonthly,GearboxTorqueRating,Friction,MotorLoad,DownholeCapacity24,DownholeCapacityRuntime,DownholeCapacityRuntimeFillage,AdditionalUplift,UpliftCalculationMissingRequirements,AdditionalUpliftGross
0,Hal Federal 41-11T,2020-08-10 02:34:00,,,100,0.0,1262.0,0.0,0.0,0.0,...,0.3,0,1.0,67.0,182.989,0.0,0.0,,,
1,Hal Federal 41-11T,2020-08-17 08:39:00,,,100,0.0,4010.0,0.0,0.0,0.0,...,0.36,0,1.5,67.0,222.42,0.0,0.0,,,
2,Ceynar 11-28B,2020-08-25 11:54:00,,,100,0.0,1197.0,0.0,0.0,0.0,...,0.0,0,3.5,33.0,102.316,0.0,0.0,,Unable to determine uplift opportunity. Missi...,
3,Kline Federal 31-18 15T,2020-03-31 23:17:00,,,100,0.0,4964.0,0.0,0.0,0.0,...,0.0,0,0.5,32.0,308.524,0.0,0.0,,Unable to determine uplift opportunity. Missi...,
4,Kline Federal 31-18 15T,2020-03-27 12:02:00,,,78,0.0,3747.0,0.0,0.0,0.0,...,0.0,0,2.0,32.0,254.235,0.0,0.0,,Unable to determine uplift opportunity. Missi...,


In [18]:
# Timestamp cleaning
max_time = pd.Timestamp(file_name.split('.')[2] + file_name.split('.')[3])  # The time the file was generated is the max time
min_time = max_time - pd.Timedelta('30 days')  # Will keep all those files are present in the last 15 days

bool_ = (temp_xdiag.Date >= min_time) & (temp_xdiag.Date <= max_time)
temp_xdiag = temp_xdiag.loc[bool_].copy()

temp_xdiag.sort_values(by=['Date', 'NodeID'], inplace=True)
temp_xdiag.reset_index(inplace=True, drop=True)
temp_xdiag.head()

Unnamed: 0,NodeID,Date,PumpCond1,PumpCond2,FillagePct,NetProd,FluidLevelXDiag,ElecCostPerBO,ElecCostMinTorquePerBO,ElectCostMinEnergyPerBO,...,ElecCostMonthly,GearboxTorqueRating,Friction,MotorLoad,DownholeCapacity24,DownholeCapacityRuntime,DownholeCapacityRuntimeFillage,AdditionalUplift,UpliftCalculationMissingRequirements,AdditionalUpliftGross
0,Hal Federal 41-11T,2020-08-17 08:39:00,,,100,0.0,4010.0,0.0,0.0,0.0,...,0.36,0,1.5,67.0,222.42,0.0,0.0,,,
1,Flinders Federal 5602 13-18H,2020-08-20 10:01:00,,,100,,,0.0,0.0,0.0,...,0.0,0,0.5,67.0,229.504,0.0,0.0,,Unable to determine uplift opportunity. Missi...,
2,John Federal 41-12B,2020-08-22 09:41:00,,,100,0.0,4553.0,0.0,0.0,0.0,...,0.28,0,3.0,33.0,175.973,0.0,0.0,,Unable to determine uplift opportunity. Missi...,
3,John Federal 41-12B,2020-08-22 10:35:00,,,100,0.0,5096.0,0.0,0.0,0.0,...,0.42,0,0.5,67.0,274.707,0.0,0.0,,,
4,Ceynar 11-28B,2020-08-25 11:54:00,,,100,0.0,1197.0,0.0,0.0,0.0,...,0.0,0,3.5,33.0,102.316,0.0,0.0,,Unable to determine uplift opportunity. Missi...,


In [85]:
%%time
"""
Testing out mutiple files
"""
str_match = "XDiagResults"
frames = []

for filename in os.listdir(data_path):
    if str_match in filename:
        path = os.path.join(data_path, filename)
        temp_df = pd.read_csv(path, error_bad_lines=False, parse_dates=['Date', 'AnalysisDate'])
        temp_df["NodeID"] = temp_df["NodeID"].apply(node_clean)
        
        # use only the correct timestamps
        max_dt = pd.Timestamp(filename.split('.')[2] + filename.split('.')[3])  # Get Upper limit of time
        min_dt = max_dt - pd.Timedelta('15 days')
        bool_ = (temp_df.Date <= max_dt) & (temp_df.Date >= min_dt)
        temp_df = temp_df[bool_]
        frames.append(temp_df)

xdiag = pd.concat(frames)
xdiag.drop_duplicates(subset=['NodeID', 'Date'], keep='last', inplace=True)
xdiag.sort_values(by=['NodeID', 'Date'], inplace=True)
xdiag.reset_index(inplace=True, drop=True)
xdiag.head()
    

Wall time: 3 s


Unnamed: 0,NodeID,Date,PumpCond1,PumpCond2,FillagePct,NetProd,FluidLevelXDiag,ElecCostPerBO,ElecCostMinTorquePerBO,ElectCostMinEnergyPerBO,...,ElecCostMonthly,GearboxTorqueRating,Friction,MotorLoad,DownholeCapacity24,DownholeCapacityRuntime,DownholeCapacityRuntimeFillage,AdditionalUplift,UpliftCalculationMissingRequirements,AdditionalUpliftGross
0,Acadia 31-25H,2020-10-04 10:09:00,,,0,0.0,0.0,0.0,0.0,0.0,...,,0,,,,,,,,
1,Acadia 31-25H,2020-10-06 09:09:00,,,100,,,1.45,1.29,1.29,...,706.37,0,0.5,40.0,120.244,120.244,120.244,,,
2,Acadia 31-25H,2020-10-06 10:33:00,,,100,,,1.4,1.35,1.3,...,683.46,0,0.5,40.0,117.511,117.511,117.511,,,
3,Acadia 31-25H,2020-10-06 11:57:00,,,100,,,1.43,1.37,1.29,...,700.16,0,0.5,40.0,121.155,121.155,121.155,,,
4,Acadia 31-25H,2020-10-06 13:21:00,,,100,,,1.37,1.31,1.31,...,668.38,0,0.5,40.0,118.422,118.422,118.422,,,


In [86]:
%%time
"""
Importing the max dates present in the xspoc.xdiag table 
Droping values lying below the max data to avoid duplicates
"""

# query = """
# select "NodeID", max("Date") as "MaxDate"
# from xspoc.xdiag
# group by "NodeID"
# order by "NodeID"
# """

query = """
select * from xspoc.latestdata order by "NodeID";
"""

with lib_aws.PostgresRDS(db='oasis-prod') as engine:
    main_db = pd.read_sql(query, engine, parse_dates=['MaxDate'])

main_db.reset_index(inplace=True, drop=True)
main_db.head()

Wall time: 3min 9s


Unnamed: 0,NodeID,MaxDate
0,Aagvik 1-35H,2020-07-21 10:18:00
1,Acadia 31-25H,2020-10-04 17:56:00
2,Acklins 12-18H,2020-10-04 17:06:00
3,Aerabelle 5502 43-7T,2020-10-04 18:35:00
4,Ak Strangeland 43-12T,2020-10-04 18:19:00


In [87]:
len(xdiag)

13845

In [88]:
%%time
# Iterating over each well and dropping values less than the correspongind max date

for i in main_db.index:
    well = main_db.loc[i, 'NodeID']
    mx_dt = main_db.loc[i, 'MaxDate']
    bool_ = (xdiag.NodeID == well) & (xdiag.Date <= mx_dt)
    xdiag = xdiag[~bool_]

Wall time: 8.71 s


In [89]:
len(xdiag)

13780

In [90]:
xdiag.groupby('NodeID').agg({'Date': ['min', 'max']})

Unnamed: 0_level_0,Date,Date
Unnamed: 0_level_1,min,max
NodeID,Unnamed: 1_level_2,Unnamed: 2_level_2
Acadia 31-25H,2020-10-06 09:09:00,2020-10-07 10:21:00
Acklins 12-18H,2020-10-06 09:40:00,2020-10-07 06:37:00
Aerabelle 5502 43-7T,2020-10-04 19:38:00,2020-10-07 11:05:00
Ak Strangeland 43-12T,2020-10-04 19:18:00,2020-10-07 11:22:00
Al 44-23B,2020-10-04 18:41:00,2020-10-07 10:55:00
Alder 43-8H,2020-10-04 19:18:00,2020-10-07 11:28:00
Amazing Grace Federal 11-2H,2020-10-06 10:05:00,2020-10-07 10:36:00
Amelia Federal 41-11B,2020-10-06 12:20:00,2020-10-06 12:20:00
Anders Madson 14-25 12T,2020-10-04 19:38:00,2020-10-07 06:33:00
Anders Madson 14-25 13BX,2020-10-06 08:07:00,2020-10-07 11:04:00


In [91]:
xdiag.reset_index(inplace=True, drop=True)
xdiag.head()

Unnamed: 0,NodeID,Date,PumpCond1,PumpCond2,FillagePct,NetProd,FluidLevelXDiag,ElecCostPerBO,ElecCostMinTorquePerBO,ElectCostMinEnergyPerBO,...,ElecCostMonthly,GearboxTorqueRating,Friction,MotorLoad,DownholeCapacity24,DownholeCapacityRuntime,DownholeCapacityRuntimeFillage,AdditionalUplift,UpliftCalculationMissingRequirements,AdditionalUpliftGross
0,Acadia 31-25H,2020-10-06 09:09:00,,,100,,,1.45,1.29,1.29,...,706.37,0,0.5,40.0,120.244,120.244,120.244,,,
1,Acadia 31-25H,2020-10-06 10:33:00,,,100,,,1.4,1.35,1.3,...,683.46,0,0.5,40.0,117.511,117.511,117.511,,,
2,Acadia 31-25H,2020-10-06 11:57:00,,,100,,,1.43,1.37,1.29,...,700.16,0,0.5,40.0,121.155,121.155,121.155,,,
3,Acadia 31-25H,2020-10-06 13:21:00,,,100,,,1.37,1.31,1.31,...,668.38,0,0.5,40.0,118.422,118.422,118.422,,,
4,Acadia 31-25H,2020-10-06 14:45:00,,,100,,,1.4,1.32,1.32,...,681.13,0,0.5,40.0,120.244,120.244,120.244,,,


In [92]:
success = AddData.add_data(df=xdiag, 
                           db='oasis-prod', 
                           schema='xspoc',
                           table='xdiag', 
                           merge_type='append', 
                           index_col='Date') 

Data appended on Table xdiag in time 28.18s


## Xdiag Rod

In [None]:
file_name = 'XDiagRodResults.E2E.20200815.1500.csv'
full_path = os.path.join(data_path, file_name)

temp_xr = pd.read_csv(full_path, error_bad_lines=False, parse_dates=['Date'])
temp_xr.NodeID = temp_xr.NodeID.apply(node_clean)

# Timestamp cleaning
max_time = pd.Timestamp(file_name.split('.')[2] + file_name.split('.')[3])
min_time = max_time - pd.Timedelta('1 hour')

temp_xr.head()

In [None]:
bool_ = (temp_xr.Date <= max_time) 
# & (temp_xr.Date >= min_time)
temp_xr[bool_].sort_values(by=['Date'], ascending=False)

## Well Tests

- Data is coming in once everyday. 
- For a specific day (d) we get data from d-1 and d-2.
- The datapoints from d-2 will be dropped as they will be duplicates which had already come the previous dat.

In [None]:
file_name = 'WellTests.E2E.20200813.1300.csv'
full_path = os.path.join(data_path, file_name)

temp_welltests = pd.read_csv(full_path, error_bad_lines=False, parse_dates=['TestDate'])
import_dt = file_name.split('.')[2]
import_dt = pd.Timestamp(import_dt) - pd.Timedelta('1 day')
# Basic Well Name Cleaning
# temp_welltests["NodeID"] = (temp_welltests["NodeID"].str.replace("#", "")  # remove #
#                                        .str.replace('\s+', ' ', regex=True)  # remove multiple spaces if present
#                                        .str.strip()  # Remove trailing whitespaces
#                                        .str.lower()  # lower all character
#                                        .str.title()  # Uppercase first letter of each word
#                                        .map(lambda x: x[0:-2] + x[-2:].upper()))  # last 2 characters should always be upper case

temp_welltests = temp_welltests[temp_welltests.TestDate == import_dt]
temp_welltests

In [None]:
# OG timestamps
# data_dist = temp_welltests.groupby(["NodeID"]).agg({"TestDate": [min, max, "count"]})

# display(data_dist)

t2 = temp_welltests[temp_welltests.TestDate == '2020-08-12'].copy()
t2.sort_values(by=['NodeID'], inplace=True)
t2.drop_duplicates(subset=['NodeID', 'TestDate'])
t2.reset_index(drop=True, inplace=True)


## Card Data

In [None]:
file_name = 'CardData.E2E.20200816.1200.csv'
full_path = os.path.join(data_path, file_name)

temp_card = pd.read_csv(full_path, error_bad_lines=False, parse_dates=['Date'])
temp_card.NodeID = temp_card.NodeID.apply(node_clean)

# Timestamp cleaning
max_time = pd.Timestamp(file_name.split('.')[2] + file_name.split('.')[3])
min_time = max_time - pd.Timedelta('1 hour')
bool_ = (temp_card.Date <= max_time) & (temp_card.Date >= min_time)
temp_card = temp_card[bool_]

# Basic sort and dropping duplicates
temp_card.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
temp_card.sort_values(by=['NodeID', 'Date'], inplace=True)
temp_card.reset_index(inplace=True, drop=True)
temp_card.head()



In [None]:
temp_card[temp_cols]

## WELLTESTS

In [None]:
"""
Test out different conditions
1. Finding out a min_time condition to remove repeatable values
"""
local_file_path = r'C:\Users\rai_v\OneDrive\Python Coursera\local-data\oasis'

In [None]:
# XDIAG Rod
str_match = "XDiagRod"
frames = []
for filename in os.listdir(local_file_path):
    if str_match in filename:
        path = os.path.join(local_file_path, filename)
        temp_df = pd.read_csv(path, error_bad_lines=False)
        temp_df.loc[:, 'Date'] = pd.to_datetime(temp_df.loc[:, 'Date'])
        max_time = pd.Timestamp(filename.split('.')[2] + filename.split('.')[3])
        temp_df = temp_df[temp_df['Date'] <= max_time]
        temp_df['ImportDate'] = max_time
        frames.append(temp_df)

data = pd.concat(frames)

In [None]:
"""
Selecting only RodNum 1 and dropping some columns for making the data set smaller
"""
data_cut = data[data.RodNum == 1].copy()
data_cut.sort_values(by=['NodeID', 'Date'], inplace=True)
data_cut.reset_index(inplace=True, drop=True)
data_cut = data_cut[['NodeID', 'Date', 'ImportDate']]
# data_cut.drop(columns = ['Grade', 'Length', 'Diameter', 'RodGuideID', 'DragFrictionCoefficient', 'GuideCountPerRod'], inplace=True)

In [None]:
data_cut.NodeID.unique()

In [None]:
data_cut['Time Diff'] = data_cut.ImportDate - data_cut.Date

In [None]:
data_cut[data_cut.NodeID == "ACADIA 31-25H"]

In [None]:
tdiff = data_cut['Time Diff']

In [None]:
x = tdiff[tdiff <= pd.Timedelta('3 days')]

sns.distplot(x)

In [None]:
data_cut.to_csv("new_xdiagrod_sampled.csv")

In [None]:
# XdiagRod
file1 = 'XDiagRodResults.E2E.20200815.2300.csv'

tmax1 = pd.Timestamp(file1.split('.')[2] + file1.split('.')[3])
xrod1 = pd.read_csv(os.path.join(local_file_path, file1), parse_dates=['Date'])
xrod1.head()

In [None]:
file2 = 'XDiagRodResults.E2E.20200816.0000.csv'

tmax2 = pd.Timestamp(file2.split('.')[2] + file2.split('.')[3])
xrod2 = pd.read_csv(os.path.join(local_file_path, file2), parse_dates=['Date'])
xrod2.head()

In [None]:
xrod1.sort_values(by=['Date'], ascending=False).head(10)

In [None]:
xrod2[xrod2.Date <= tmax2].sort_values(by=['Date', "NodeID", "RodNum"], ascending=False)

# Generalizing

In [4]:
# Helper fucntions
# Dyna card functions
def get_dyna(card_arr):
    """
    Transforms Hexadecimal Dyna Card Value into Position and Load value
    :param card_arr: Hexadecimal Array
    :return: Position, Load 2D array
    """
    if pd.isnull(card_arr):
        pos = [0, 0, 0]
        load = [0, 0, 0]

    else:
        test_card = card_arr.strip()
        mid = len(test_card) / 2
        mid = int(mid)

        load = []
        pos = []

        for i in range(0, mid, 8):
            load_temp = test_card[i:i + 8]
            load_int = struct.unpack('f', bytes.fromhex(load_temp))[0]
            load.append(load_int)

            pos_temp = test_card[mid + i:mid + i + 8]
            pos_int = struct.unpack('f', bytes.fromhex(pos_temp))[0]
            pos.append(pos_int)

    return np.column_stack(([pos, load]))


def hex_to_wkb(card_arr):
    """
    Transforms the Hexadecimal based card into a WKB element
    Helps store the data in a postgis db
    :param card_arr: Hexadecimal Card Value
    :return: WKB card value
    """
    xy = CardFunctions.get_dyna(card_arr)

    try:
        polygon = Polygon(xy)
        wkb_element = from_shape(polygon)
    except Exception as e:
        print(e)
        wkb_element = np.nan

    return wkb_element

In [5]:
db = 'oasis-dev'
schema = 'stream'

def node_clean(node_str):
    """
    Function that cleans up NodeID strings
    """
    node_str = " ".join(node_str.split())  # remove empty white spaces
    node_str = node_str.replace('#', "").strip().lower().title()  # remove # character, plus clean characters
    node_str =  node_str[0:-2] + node_str[-2:].upper() # last 2 characters whill alwsy be upper case
    return node_str


class OasisStream:

    def __init__(self, file_path=data_path):
        self.file_path = file_path  # Location of sftp files default to env variable 
        
        self.str_match = 'None' 
        self.files = []
        self.add_success = False
        self.transfer_success = False
        self.del_success = False
    
    def import_well_tests(self, table_name):
        """
        This function will import welltests into a pandas df and then upload it into a PostgresDB
        :param table_name: The table in postgres where we need to add the data
        """
        self.str_match = 'WellTests'
        self.files = []  # If runnnig imports empty files array
        self.add_success = False  # if this file is to be run, reinitialize this variable
        frames = []
        for filename in os.listdir(self.file_path):
            if self.str_match in filename:
                self.files.append(filename)  # add filename to files array
                path = os.path.join(self.file_path, filename)  # full location of file to import
                temp_df = pd.read_csv(path, error_bad_lines=False, parse_dates=['TestDate'])
                import_dt = file_name.split('.')[2]  # get the day files were generated
                import_dt = pd.Timestamp(import_dt) - pd.Timedelta('1 day')  # this is the timestamp we need from the file
                temp_df = temp_df[temp_df.TestDate == import_dt]
                temp_df.NodeID = temp_df.NodeID.apply(node_clean)  # clean up NodeID strings
                temp_df.drop_duplicates(subset=['NodeID', 'TestDate'], inplace=True)
                frames.append(temp_df)
        try:
            data = pd.concat(frames)
        except ValueError:
            return print("WellTests file not present")
        
        data.drop_duplicates(subset=['NodeID', 'TestDate'], inplace=True)
        data.sort_values(by=['NodeID', 'TestDate'], inplace=True)
        data.reset_index(inplace=True, drop=True)
        
        self.add_success = AddData.add_data(df=data, db=db, schema=schema,
                                            table=table_name, merge_type='replace',
                                            index_col='TestDate')     
    
    def card_data(self, table_name, cols, card_cols):
        """
        Imports card data into table_name in Postgres
        """
        self.str_match = 'CardData'
        self.files = []
        self.add_success = False
        
        frames = []
        for filename in os.listdir(self.file_path):
            if self.str_match in filename:
                self.files.append(filename)
                path = os.path.join(self.file_path, filename)
                temp_df = pd.read_csv(path, error_bad_lines=False, parse_dates=['Date', 'AnalysisDate'])  # import
                temp_df = temp_df[cols]
                temp_df.NodeID = temp_df.NodeID.apply(node_clean) # clean nodeid
                
                # use only the correct timestamps
                max_dt = pd.Timestamp(filename.split('.')[2] + filename.split('.')[3])  # Get Upper limit of time
                min_dt = max_dt - pd.Timedelta('1 hour')
                bool_ = (temp_df.Date <= max_dt) & (temp_df.Date >= min_dt)
                temp_df = temp_df[bool_]
                temp_df.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
                frames.append(temp_df)
        try:
            data = pd.concat(frames)
        except ValueError:
            return print("CardData files not present")
        
        data.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
        data.sort_values(by=['NodeID', 'Date'], inplace=True)
        data.reset_index(inplace=True, drop=True)
        
        # Modifying card columns
        data.fillna(np.nan, inplace=True)
        try:
            for col in card_cols:  # Converting the hex columns to a wkb format
                data.loc[:, col] = data.loc[:, col].apply(hex_to_wkb)
        except Exception as e:
            print(e) 

        self.add_success = AddData.add_data(df=data, db=db, schema=schema,
                                            table=table_name, merge_type='replace', card_col=card_cols,
                                            index_col='Date') 
            
    def transfer_s3(self, location):
        """
        Transfer raw csv files to s3
        :param location: Location in s3 bucket of csv files
        """
        s3 = boto3.resource('s3')

        if self.add_success is False:
            return print("Data hasn't been added to RDS DB")

        for filename in self.files:
            s3.meta.client.upload_file(os.path.join(self.file_path, filename),
                                       "et-oasis", location + filename)
        self.transfer_success = True

    def del_files(self):

        if self.transfer_success is False:
            return print("First Transfer the files")
        if self.add_success is False:
            return print("Add Data to db first")
        
        try:
            for filename in self.files:
                os.remove(os.path.join(self.file_path, filename))
        except Exception as e:
            print(e)
            return print("Files have already been transfered")

        self.del_success = True
        
        return None
            
    def result(self):
        print('Files Worked on:',*self.files, sep='\n')
        print(f'Data added to DB                    : {self.add_success}')
        print(f'Files Transfered to S3              : {self.transfer_success}')
        print(f'Files deleted from {self.file_path} : {self.del_success}')

In [6]:
stream = OasisStream()
# Well Tests
stream.import_well_tests(table_name='well_tests')
stream.transfer_s3(location="backup/wellTests/")
stream.del_files()
stream.result()

# card
cols_to_keep = [
    "NodeID",
    "Date",
    "AnalysisDate",
    "SPM",
    "StrokeLength",
    "Runtime",
    "FillBasePct",
    "Fillage",
    "SecondaryPumpFillage",
    "POCDownholeCardB",
    "SurfaceCardB",
    "DownholeCardB",
    "PredictedCardB",
    "TorquePlotMinEnergyB",
    "TorquePlotMinTorqueB",
    "PermissibleLoadUpB"
]
card_columns = [
    'POCDownholeCardB',
    'SurfaceCardB',
    'DownholeCardB',
    'PredictedCardB',
    'TorquePlotMinEnergyB',
    'TorquePlotMinTorqueB',
    'PermissibleLoadUpB'
]
stream.card_data(table_name='card', cols=cols_to_keep, card_cols=card_columns)
stream.transfer_s3(location="backup/card/")
stream.result()


WellTests file not present
Data hasn't been added to RDS DB
First Transfer the files
Files Worked on:
Data added to DB                    : False
Files Transfered to S3              : False
Files deleted from C:\Users\rai_v\OneDrive\Python Coursera\local-data\oasis\sftp-files : False
Data replaceed on Table card in time 69.25s
Files Worked on:
CardData.E2E.20200817.0100.csv
CardData.E2E.20200817.0200.csv
CardData.E2E.20200817.0300.csv
CardData.E2E.20200817.0400.csv
CardData.E2E.20200817.0500.csv
CardData.E2E.20200817.0600.csv
CardData.E2E.20200817.0700.csv
CardData.E2E.20200817.0800.csv
CardData.E2E.20200817.0900.csv
CardData.E2E.20200817.1000.csv
CardData.E2E.20200817.1100.csv
CardData.E2E.20200817.1200.csv
CardData.E2E.20200817.1300.csv
CardData.E2E.20200817.1400.csv
CardData.E2E.20200817.1500.csv
CardData.E2E.20200817.1600.csv
CardData.E2E.20200817.1700.csv
CardData.E2E.20200817.1800.csv
CardData.E2E.20200817.1900.csv
CardData.E2E.20200817.2000.csv
CardData.E2E.20200817.2100.csv
Car

## XDIAG

In [None]:
xdiagrod = OasisStream(str_match='XDiagRod', table_name='xdiagrod_test')
xdiagrod.add_to_db()
xdiagrod.transfer_s3(location="backup/xdiagRodResults/")
xdiagrod.result()

In [None]:
xr_cols_drop = [
    'PumpCond1',
    'PumpCond2',
    'MonthlyElecCost',
    'MinEnergyElecBO',
    'MinTorqueElecBO',
    'CurrentElecBO',
    'AvgDHDSLoad',
    'AvgDHUSLoad',
    'AvgDHDSPOLoad',
    'AvgDHUSPOLoad',
    'DownholeAnalysisLocale',
    'RodAnalysisLocale',
    'SurfaceAnalysisLocale',
    'InputAnalysisLocale'
]

In [None]:
xdiagres = OasisStream(str_match='XDiagResults', table_name='xdiagresults_test', drop_cols=xr_cols_drop)
xdiagres.add_to_db()
xdiagres.transfer_s3(location="backup/xdiagresults/")
xdiagres.result()

In [None]:
welltest = OasisStream(str_match='WellTests', table_name='welltest_test', date_col='TestDate')
welltest.add_to_db()
welltest.transfer_s3(location="backup/wellTests/")
# welltest.del_files()
welltest.result()

## Card Data

In [None]:
def get_dyna(card_arr):
    """
    Transforms Hexadecimal Dyna Card Value into Position and Load value
    :param card_arr: Hexadecimal Array
    :return: Position, Load 2D array
    """
    if pd.isnull(card_arr):
        pos = [0, 0, 0]
        load = [0, 0, 0]

    else:
        test_card = card_arr.strip()
        mid = len(test_card) / 2
        mid = int(mid)

        load = []
        pos = []

        for i in range(0, mid, 8):
            load_temp = test_card[i:i + 8]
            load_int = struct.unpack('f', bytes.fromhex(load_temp))[0]
            load.append(load_int)

            pos_temp = test_card[mid + i:mid + i + 8]
            pos_int = struct.unpack('f', bytes.fromhex(pos_temp))[0]
            pos.append(pos_int)

    return np.column_stack(([pos, load]))


def hex_to_wkb(card_arr):
    """
    Transforms the Hexadecimal based card into a WKB element
    Helps store the data in a postgis db
    :param card_arr: Hexadecimal Card Value
    :return: WKB card value
    """
    xy = CardFunctions.get_dyna(card_arr)

    try:
        polygon = Polygon(xy)
        wkb_element = from_shape(polygon)
    except Exception as e:
        print(e)
        wkb_element = np.nan

    return wkb_element

## Backed up Data

### Card Data

In [None]:
import struct
from geoalchemy2.shape import from_shape
from shapely.geometry import Polygon
from shapely.wkb import loads
from pyefd import elliptic_fourier_descriptors

In [None]:
local_file_path = r'C:\Users\rai_v\OneDrive\Python Coursera\local-data\oasis'
file_name = 'CardData.E2E.20200728.1100.csv'
time_max = pd.Timestamp(file_name.split('.')[2] + file_name.split('.')[3])

In [None]:
# global 
cols_to_drop = [
    'SurfaceCard',
    'DownholeCard',
    'PredictedCard',
    'PocDHCard',
    'CorrectedCard',
    'TorquePlotMinEnergy',
    'TorquePlotMinTorque',
    'TorquePlotCurrent',
    'POCDownholeCard',
    'ElectrogramCardB'
]

In [None]:
%%time
data = pd.read_csv(os.path.join(local_file_path, file_name), parse_dates=['Date'], usecols=['NodeID', 'Date', 'POCDownholeCardB', 'SurfaceCardB'])
# data.drop(columns=cols_to_drop, inplace=True)
data = data[data.Date <= time_max]  # Drop points which are greater than the import timestamp
data.sort_values(by=['NodeID', 'Date'], inplace=True)
data.reset_index(inplace=True, drop=True)
data.tail(10)

In [None]:
np.round(data.isnull().sum(axis=0)/len(data) * 100)

In [None]:
"""
Test 
"""
# Convert columns
data_test = data.loc[0:10].copy()

card_cols = [
    'SurfaceCardB',
    'POCDownholeCardB'
]

for c in card_cols:
    data_test.loc[:,c] = data_test.loc[:,c].apply(hex_to_wkb)
    
display(data_test.head())

# Adding data to db
AddData.add_data(df=data_test, db='oasis-dev', table='testcards', schema='stream', merge_type='replace', card_col=card_cols, index_col='Date')

In [None]:
"""
For entire data
"""
card_cols = [
    'SurfaceCardB',
    'POCDownholeCardB'
]

for c in card_cols:
    print(c)
    data.loc[:, c] = data.loc[:, c].apply(hex_to_wkb)
  

In [None]:
data.head()

In [None]:
AddData.add_data(df=data, db='oasis-dev', table='testcards', schema='stream', merge_type='replace', card_col=card_cols, index_col='Date') 

### Test Data

In [None]:
frames = []
files = []
for filename in os.listdir(data_path):
    if 'WellTests' in filename:
        files.append(filename)
        path = os.path.join(data_path, filename)
        print(path)
        temp_df = pd.read_csv(path, error_bad_lines=False)
        frames.append(temp_df)

try:
    df = pd.concat(frames)
except ValueError:
    print("Files were not Present")
    


### XDiagRod

We have 2 file to use
- XDiagRodResults.E2E.20200728.1123  --> Which has data from May 25th to July 28th
- XDiagRodResults.E2E.20200810.1503  --> Which has data from July 28th to Aug 10

In [None]:
# Xdiagrod Results
file_path = r"C:\Users\rai_v\OneDrive\Python Coursera\local-data\oasis\back"

file1 = 'XDiagRodResults.E2E.20200810.1503.csv'
tmax1 = pd.Timestamp(file1.split('.')[2] + file1.split('.')[3])

file2 = 'XDiagRodResults.E2E.20200728.1123.csv'
tmax2 = pd.Timestamp(file2.split('.')[2] + file2.split('.')[3])

In [None]:
xdiag1 = pd.read_csv(os.path.join(file_path, file1), error_bad_lines=False, parse_dates=['Date'])
print(f"# initial: {len(xdiag1)}")
xdiag1 = xdiag1[xdiag1.Date <= tmax1]
print(f"# removing bad dates: {len(xdiag1)}")
xdiag1.sort_values(by=['NodeID', 'Date'], inplace=True)
xdiag1.drop_duplicates(subset=['NodeID', 'Date', 'RodNum'], inplace=True)
print(f"# removing duplicates: {len(xdiag1)}")
xdiag1.reset_index(inplace=True, drop=True)
xdiag1.groupby("NodeID").agg({"Date": [min, max, "count"]})

In [None]:
xdiag2 = pd.read_csv(os.path.join(file_path, file2), error_bad_lines=False, parse_dates=['Date'])
print(f"# initial: {len(xdiag2)}")
xdiag2 = xdiag2[xdiag2.Date <= tmax2]
print(f"# removing bad dates: {len(xdiag2)}")
xdiag2.sort_values(by=['NodeID', 'Date'], inplace=True)
xdiag2.drop_duplicates(subset=['NodeID', 'Date', 'RodNum'], inplace=True)
print(f"# removing duplicates: {len(xdiag2)}")
xdiag2.reset_index(inplace=True, drop=True)
xdiag2.groupby("NodeID").agg({"Date": [min, max, "count"]})['Date']

In [None]:
# Merge the data
xdiagrod = pd.concat([xdiag2, xdiag1])
xdiagrod.sort_values(by=['NodeID', 'Date'], inplace=True)
print(f'Size is {len(xdiagrod)}')
xdiagrod.drop_duplicates(subset=['NodeID', 'Date', 'RodNum'], inplace=True)
print(f'Size is {len(xdiagrod)}')
xdiagrod.reset_index(inplace=True, drop=True)

In [None]:
xdiagrod.head()

In [None]:
AddData.add_data(df=xdiagrod, db='oasis-dev', table='xdiagrod', schema='stream', merge_type='replace', card_col=None, index_col='Date') 

## XDiagResults

In [None]:
# Xdiag Results
file_path = r"C:\Users\rai_v\OneDrive\Python Coursera\local-data\oasis\back"

file1 = 'XDiagResults.E2E.20200728.1116.csv'
tmax1 = pd.Timestamp(file1.split('.')[2] + file1.split('.')[3])

file2 = 'XDiagResults.E2E.20200810.1500.csv'
tmax2 = pd.Timestamp(file2.split('.')[2] + file2.split('.')[3])

In [None]:
xdiag1 = pd.read_csv(os.path.join(file_path, file1), error_bad_lines=False, parse_dates=['Date'])
print(f"# initial: {len(xdiag1)}")
xdiag1 = xdiag1[xdiag1.Date <= tmax1]
print(f"# removing bad dates: {len(xdiag1)}")
xdiag1.sort_values(by=['NodeID', 'Date'], inplace=True)
xdiag1.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
print(f"# removing duplicates: {len(xdiag1)}")
xdiag1.reset_index(inplace=True, drop=True)
xdiag1.groupby("NodeID").agg({"Date": [min, max, "count"]})

In [None]:
xdiag2 = pd.read_csv(os.path.join(file_path, file2), error_bad_lines=False, parse_dates=['Date'])
print(f"# initial: {len(xdiag2)}")
xdiag2 = xdiag2[xdiag2.Date <= tmax2]
print(f"# removing bad dates: {len(xdiag2)}")
xdiag2.sort_values(by=['NodeID', 'Date'], inplace=True)
xdiag2.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
print(f"# removing duplicates: {len(xdiag2)}")
xdiag2.reset_index(inplace=True, drop=True)
xdiag2.groupby("NodeID").agg({"Date": [min, max, "count"]})['Date']

In [None]:
# Merge the data
xr = pd.concat([xdiag2, xdiag1])
xr.sort_values(by=['NodeID', 'Date'], inplace=True)
print(f'Size is {len(xr)}')
xr.drop_duplicates(subset=['NodeID', 'Date'], inplace=True)
print(f'Size is {len(xr)}')
xr.reset_index(inplace=True, drop=True)

In [None]:
# list of columns with only null vales
xr_null = xr.isnull().sum(axis=0)/len(xr)
xr_null[xr_null == 1].index.to_list()

In [None]:
xr_cols_drop = [
    'PumpCond1',
    'PumpCond2',
    'MonthlyElecCost',
    'MinEnergyElecBO',
    'MinTorqueElecBO',
    'CurrentElecBO',
    'AvgDHDSLoad',
    'AvgDHUSLoad',
    'AvgDHDSPOLoad',
    'AvgDHUSPOLoad',
    'DownholeAnalysisLocale',
    'RodAnalysisLocale',
    'SurfaceAnalysisLocale',
    'InputAnalysisLocale'
]
xr.drop(columns = xr_cols_drop, inplace=True)

In [None]:
xr.sort_values(by=['NodeID', 'Date'], inplace=True)
xr.reset_index(inplace=True, drop=True)
xr.tail()

In [None]:
AddData.add_data(df=xr, db='oasis-dev', table='xdiagresults', schema='stream', merge_type='replace', card_col=None, index_col='Date') 

## Well Tests

In [None]:
# Card Data
file_path = r"C:\Users\rai_v\OneDrive\Python Coursera\oasis\data\back\WellTests.E2E.20200728.1116.csv"
well_test = pd.read_csv(file_path, parse_dates=['TestDate'])
display(well_test.isnull().sum(axis=0)/well_test.shape[0] * 100)
display(well_test.head())


In [None]:
pd.Timestamp