# ETL init
## Imports

In [1]:
import requests
import time
from datetime import datetime
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from bson.json_util import dumps
import access
# MongoDB connection details
USER = "OneInit"
PWD = "<tkfzDtcnf844"
HOST = "194.58.102.147"
URI = "mongodb://{}:{}@{}:27017/".format(USER,PWD,HOST)

# External data resource
cntr_number = 'NYKU9733409'
bill_number = "OSAB42346500"
URL = "https://ecomm.one-line.com/ecom/CUP_HOM_3301GS.do"

## Functions
### 1.Log function

In [12]:
def log(message):
    """Log function to log errors."""
    timestamp = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
    with open("etl.log", "a") as f:
        f.write(timestamp + " " + message + "\n")

In [20]:
log("UPDATE")

### 2.Check database entry

In [13]:
def check_record(bill_number):
    """Check that init and tracking database does not have container record yet."""
    conn = MongoClient(access.init)
    query = {"blNo": bill_number, "trackEnd": None}
    try:
        conn.admin.command("ping")
        init = conn.one.init.count_documents(query)
        tracking = conn.one.tracking.count_documents(query)
        if init == 0 and tracking == 0:
            conn.close()
            return True
        else:
            conn.close()
            log(f"[ETL Init] [Check record]"\
                + f" [Record already exists for {bill_number}]")
            return False
    except ConnectionFailure:
        log("[ETL Init] [Check record]"\
            + f" [DB Connection failure for {bill_number}]")
        conn.close()
        return False
    except BaseException as err:
        log("[ETL Init] [Check record]"\
            + f" [{err.details} for {bill_number}]")
        conn.close()
        return False

In [12]:
check_record("OSAB42346500")

False

### 3.Extract

In [14]:
def extract_container_details(bill_number):
    """Post request to extract container details."""
    if isinstance(bill_number, str):
        # Create payload for get request
        payload = {
            '_search': 'false', 'nd': str(time.time_ns())[:-6],
            'rows': '10000', 'page': '1', 'sidx': '',
            'sord': 'asc', 'f_cmd': '121', 'search_type': 'A',
            'search_name': bill_number, 'cust_cd': '',
        }
        # Run request and fetch json data
        r = requests.get(URL, params=payload)
        data = r.json()
        # Extract container details data
        if "list" in data:
            container_details = data["list"][0]
            # Remove unnecessary data
            if "hashColumns" in container_details:
                del container_details["hashColumns"]
            return container_details
        else:
            log(f"[ETL Init] [Extract container details]"\
                + f" [No details data for {bill_number}]")
            return False
    else:
        log("[ETL Init] [Extract container details]"\
            + f" [Wrong argument type {bill_number}]")
        return False

In [16]:
cntr_details = extract_container_details("OSAB42346500")
#cntr_details

In [15]:
def extract_schedule_details(cntr_details):
    """Extract schedule details."""
    if cntr_details:
        # Create payload for get request
        payload = {
            '_search': 'false', 'f_cmd': '125', 'cntr_no': cntr_details["cntrNo"],
            'bkg_no': '', 'cop_no': cntr_details["copNo"]
        }
        # Run request and fetch json data
        r = requests.get(URL, params=payload)
        data = r.json()
        # Extract container schedule data
        if "list" in data:
            schedule_details = data["list"]
            if "hashColumns" in schedule_details[0]:
                del schedule_details[0]["hashColumns"]
            return schedule_details
        else:
            log("[ETL Init] [Extract schedule details]"\
                + f" [No schedule for container {cntr_details['cntrNo']}]")
            return False
    else: 
        return False

In [10]:
schedule_details = extract_schedule_details(cntr_details)

[{'maxRows': 0,
  'models': [],
  'vslCd': '',
  'no': '1',
  'copNo': 'COSA1827848695',
  'eventDt': '2021-09-28 15:31',
  'vslEngNm': '',
  'placeNm': 'NAGOYA, AICHI, JAPAN',
  'skdVoyNo': '',
  'yardNm': 'NAGOYA - NISHI 4-KU RYUTSU VAN POOL',
  'copDtlSeq': '1011',
  'skdDirCd': '',
  'actTpCd': 'A',
  'statusNm': 'Empty Container Release to Shipper',
  'statusCd': 'MOTYDO',
  'nodCd': 'JPNGO11',
  'vvd': '',
  'lloydNo': '',
  'hashFields': []},
 {'maxRows': 0,
  'models': [],
  'vslCd': '',
  'no': '2',
  'copNo': 'COSA1827848695',
  'eventDt': '2021-09-29 14:48',
  'vslEngNm': '',
  'placeNm': 'NAGOYA, AICHI, JAPAN',
  'skdVoyNo': '',
  'yardNm': 'TCB (TOBISHIMA CONTAINER BERTH)',
  'copDtlSeq': '1031',
  'skdDirCd': '',
  'actTpCd': 'A',
  'statusNm': 'Gate In to Outbound Terminal',
  'statusCd': 'FOTMAD',
  'nodCd': 'JPNGO07',
  'vvd': '',
  'lloydNo': '',
  'hashColumns': [],
  'hashFields': []},
 {'maxRows': 0,
  'models': [],
  'vslCd': 'NVST',
  'no': '3',
  'copNo': 'COSA1

In [None]:
#schedule_details

In [16]:
def extract(bill_number):
    """Extract container and schedule details and
    return one document."""
    cntr_details = extract_container_details(bill_number)
    schedule_details = extract_schedule_details(cntr_details)
    if cntr_details and schedule_details:
        return {"container": cntr_details,
                "schedule": schedule_details,
                "number": bill_number}
    else:
        log("[ETL Init] [Extract phase]"\
            + f" [No data for {bill_number}]")
        return False

In [18]:
raw_data = extract('NYKU9733409')
#raw_data

### 4.Transform

In [7]:
def transform(data):
    """Transforms raw data for database load."""
    # Check data argument
    if not data:
        log("[ETL Init] [Transform phase]"\
            + f" [No raw data]")
        return False
    
    # Check contnainer keys and extract container info
    cntr_keys = ["cntrNo", "cntrTpszNm", "copNo", "blNo"]
    if set(cntr_keys).issubset(set(data["container"])):
        result = {
            "cntrNo": data["container"]["cntrNo"],
            "cntrType": data["container"]["cntrTpszNm"],
            "copNo": data["container"]["copNo"],
            "blNo": data["container"]["blNo"],
            "trackStart": datetime.now().replace(microsecond=0),
            "trackEnd": None,
            "outboundTerminal": "",
            "inboundTerminal": "",
            "schedule": None,
        }
    else:
        log("[ETL Init] [Transform]"\
            + f" [Keys do not match in container data {data['number']}]")
        return False
    
    # Check schedule keys and extract schedule data
    schedule_keys = ["no", "statusNm", "placeNm", "yardNm",
                     "eventDt", "actTpCd", "actTpCd", "vslEngNm",
                     "lloydNo"]
    if set(schedule_keys).issubset(set(data["schedule"][0])):
        schedule = [{
            "no": int(i["no"]),
            "event": i["statusNm"],
            "placeName": i["placeNm"],
            "yardName": i["yardNm"],
            "eventDate": datetime.strptime(i["eventDt"], "%Y-%m-%d %H:%M"),
            "status": i["actTpCd"],
            "vesselName": i["vslEngNm"],
            "imo": i["lloydNo"],
        } for i in data["schedule"]]
        result["schedule"] = schedule
        # Find and save outbound and inbound terminals
        for i in data["schedule"]:
            if i["statusNm"].find("Outbound Terminal") > -1:
                result["outboundTerminal"] = i["placeNm"]\
                + "|" + i["yardNm"]
            if i["statusNm"].find("Inbound Terminal") > -1:
                result["inboundTerminal"] = i["placeNm"]\
                + "|" + i["yardNm"]
    else:
        log("[ETL Init] [Transform]"\
            + f" [Keys do not match in schedule data {data['number']}]")
        return False
    return result

In [39]:
transformed_data = transform(raw_data)
#transformed_data

### 5.Load

In [8]:
def load(data):
    """Loads data into init and tracking collections."""
    # Check data argument
    if not data:
        log("[ETL Init] [Load] [No data to load]")
        return None
    
    # Connect to database and load data
    conn = MongoClient(access.init)
    try:
        conn.admin.command("ping")
        cur_init = conn.one.init.insert_one(data)
        if cur_init.acknowledged == False:
            log("[ETL Init] [Load] "\
                + f"[{data['blNo']} not loaded to init]")
        cur_tracking = conn.one.tracking.insert_one(data)
        if cur_tracking.acknowledged == False:
            log("[ETL Init] [Load] "\
                + f"[{data['blNo']} not loaded to tracking]")
        conn.close()
    except ConnectionFailure:
        log("[ETL Init] [Load] "\
            + f"[Connection failure for {data['blNo']}]")
        conn.close()
    except BaseException as err:
        log("[ETL Init] [Load] "\
            + f"[{err.details} for {data['blNo']}]")
        conn.close()
    

In [41]:
load(transformed_data)

## Pipeline code (one record)

In [25]:
if check_record(cntr_number):
    raw_data = extract(cntr_number)
    transformed_data = transform(raw_data)
    load(transformed_data)

## Populate database

In [10]:
numbers = ["OSAB42346500", "OSAB42347600", "OSAB47548700",
           "OSAB47550700", "OSAB49916800", "OSAB49918300",
          "OSAB55715900", "OSAB55716300"]
for num in numbers:
    if check_record(num):
        raw_data = extract(num)
        transformed_data = transform(raw_data)
        load(transformed_data)