In [1]:
import traceback

import psycopg2
from psycopg2 import extras
import numpy as np
import pandas as pd

In [50]:
def db_connect():
    print("[db_connect] connecting to database")
    conn = None
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="postgres",
            user="postgres",
            port="5432",
            password="agra12345",
            sslmode="disable")

    except (Exception, psycopg2.DatabaseError) as error:
        print("\n[db_connect] error while connecting to DB:\n")
        traceback.print_exc()
        if conn is not None:
            conn.close()

        raise error

    print("[db_connect] Successfully established connection")

    return conn

In [51]:
def select(connection, q, args=None):
    try:
        cursor = connection.cursor()
        cursor.execute(q, args)
        connection.commit()

        result = cursor.fetchall()
        cursor.close()

        return result
    except (Exception, psycopg2.DatabaseError) as error:
        cursor.close()
        connection.close()
        print("\n[select] an error occured:\n")
        traceback.print_exc()
        raise error


def get(connection, q, args=None):
    try:
        cursor = connection.cursor()
        cursor.execute(q, args)
        connection.commit()

        result = cursor.fetchone()
        cursor.close()

        return result
    except (Exception, psycopg2.DatabaseError) as error:
        cursor.close()
        connection.close()
        print("\n[select] an error occured:\n")
        traceback.print_exc()
        raise error


In [None]:
conn = db_connect()

In [24]:
def get_voyages(connection):
    q = """
        select departure_port, id, imo, departure_timestamp, sub_segment, arrival_port from lpg_voyages  where '2022-02-01' between departure_timestamp and arrival_timestamp;
    """
    try:
        result = select(connection, q)
    except Exception as e:
        raise e

    return result

In [25]:
voyages = get_voyages(conn)

In [None]:
voyages

In [None]:
df = pd.DataFrame(voyages, columns=[
        "departure_port",
        "voyage_id",
        "imo",
        "departure_timestamp",
        "sub_segment",
        "arrival_port"
    ])
df

In [28]:
def get_trajectories(connection, ids, timestamp):
    q = """
           select st_asgeojson(trajectory) as geojson, st_numpoints(trajectory) as length FROM (
             select st_makeline(position) as trajectory from (
            select position from lpg_statuses where imo=%s and timestamp between %s and '2022-02-01' order by timestamp asc
         ) ssq 
        ) sq;   
    """
    try:
        result = select(connection, q, (ids, timestamp))
    except Exception as e:
        raise e

    return result

In [29]:

def progressBar(prefix, current, total, barLength=20):
    percent = float(current) * 100 / total
    arrow = "=" * int(percent/100 * barLength - 1) + ">"
    spaces = " " * (barLength - len(arrow))

    print("%s [%s%s] %d %%" % (prefix, arrow, spaces, percent), end="\r")


In [None]:
df[df.isna().any(axis=1)]

In [None]:
for index, row in df.iterrows():
    try:
        progressBar("    progress:", index+1, len(df))
        trajectories = get_trajectories(conn, row["imo"], row["departure_timestamp"])
        print("Length ::", trajectories[0][1])
        if trajectories[0][1] is None:      
            print("trajectories::", trajectories[0][0])
        else:
            df.loc[index, "trajectory"] = trajectories[0][0]
            df.loc[index, "trajectory_length"] = int(trajectories[0][1])
            df.loc[index, "season"] = "autumn"
    except BaseException as e:
        raise e

In [None]:
df

In [34]:
df.to_csv(r'C:\Users\NISHU\Desktop\chemical_data.csv')

In [56]:
def insert_training_sample(connection, data):
    """ insert sample into external db """

    q = """
        INSERT into lpg_test_voyages(
            departure_port,
            voyage_id,
            imo,
            departure_timestamp,
            trajectory,
            trajectory_length,
            sub_segment,
            season,
            arrival_port
        ) VALUES %s
    """

    try:
        cursor = connection.cursor()
        extras.execute_values(
            cursor, q, data, template=None, page_size=100
        )
        connection.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        cursor.close()
        print("\n[execute] an error occured:\n")
        traceback.print_exc()
        raise error


In [57]:
def batch_insert(conn, df):
    # drop unwanted columns before insert
    df = df.dropna()
    # re-order columns for insert
    df = df[[
         "departure_port",
         "voyage_id",
         "imo",
         "departure_timestamp",
         "trajectory",
         "trajectory_length",
         "sub_segment",
         "season",
         "arrival_port"
    ]]

    print("    [batch_insert] inserting {:d} samples".format(len(df)))
    insert_training_sample(conn, df.values)


In [58]:
batch_insert(conn, df)

    [batch_insert] inserting 498 samples


In [70]:
def fetch_voyages_with_traj(connection):
    q = """
        SELECT
            id,
            departure_port,
            departure_lon,
            departure_lat,
            season,
            st_x(points) AS lon,
            st_y(points) AS lat,
            sub_segment,
            imo
        FROM (
            SELECT
                -- voyage info
                a.voyage_id as id,
                a.imo as imo,
                (st_dumppoints (a.trajectory)).geom AS points,

                -- departure port position and id
                st_x(b.position) as departure_lon,
                st_y(b.position) as departure_lat,
                b.locode as departure_port,

                a.sub_segment,
                a.season

            FROM "chemical_test_voyages" a

            LEFT JOIN ports as b ON (b.locode = a.departure_port)

        ) f
        ORDER BY id ASC
    """
            # WHERE a.id in %s

    # ids = tuple(i[0] for i in voyage_ids)
    try:
        result = select(connection, q)
    except Exception as e:
        raise e

    return result

In [53]:
def get_voyages(conn):
    try:
        # ids = fetch_voyages_to_process(conn, start_id, limit)
        voyages = fetch_voyages_with_traj(conn)
    except BaseException as e:
        raise e

    if voyages is None:
        return None

    data = parse_trajectories(voyages)

    return pd.DataFrame(data, columns=[
        "id",
        "imo",
        "season",
        "sub_segment",
        "departure_port",
        "departure_port_coords",
        "trajectory",
        "trajectory_length",
    ])

In [54]:
def parse_trajectories(data):
    voyages = {}
    for id, fp, flon, flat,sea, lon, lat, subs, imo in data:
        if id in voyages:
            voyages[id]["trajectory"].append([lon, lat])
            voyages[id]["trajectory_length"] += 1
        else:
            voyages[id] = {
                "id": id,
                "imo": imo,
                "season":sea,
                # "mmsi": mmsi,
                "sub_segment": subs,
                "departure_port": fp,
                "departure_port_coords": [flon, flat],
                "trajectory": [[lon, lat]],
                "trajectory_length": 1,
                # "arrival_terminal": at,
                # "departure_terminal": dt,
                # "destination_port_is_import": dpi,
                # "vessel_age": va,
            }

    return voyages.values()

In [None]:
get_voyages(conn)