<a href="https://colab.research.google.com/github/Pikarz/data_clean_and_validation_from_olddoc_to_cardmaster/blob/main/05%20-%20insert_corrected_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Insert the data
This notebook aims to insert the corrected data into the database. It takes in input the output of the notebook 04 and inserts each row that didn't generate errors in the previous notebooks.

The notebook also makes additional errors checks that should have already been made by the pipeline, just to be sure that the inserted data is correct.

In [4]:
import pandas as pd
import psycopg2
from datetime import datetime, time

def get_datetime(str_date: str) -> datetime:
    date = None
    try:
        date = datetime.strptime(str_date, "%Y-%m-%d").replace(hour=0, minute=0, second=0)
    except ValueError:
        date = datetime.strptime(str_date, "%Y-%m-%d %H:%M:%S")
    return date

def create_selling_card(curr, card, s_date):
  query = "INSERT INTO SellingCollectable (collectable, date) VALUES (%s, %s) RETURNING collectable"
  try:
    curr.execute(query, (card, s_date))
    selling_card = curr.fetchone()
    if selling_card:
      print("Selling card inserted successfully")
    return selling_card
  except psycopg2.DatabaseError as e:
    print(f"Error while inserting a selling row: {e}")
    return None

def create_disposed_card(curr, card, selling_date, s_price):
  s_date = get_datetime(selling_date)
  buyer_name = 'default_counterparty'

  # Create a selling card record
  selling_card = create_selling_card(curr, card, s_date)
  if pd.isna(selling_card):
    return None

  source_id = get_or_insert_source(curr, buyer_name)
  counterparty_id = get_or_insert_counterparty(curr, buyer_name)

  # Insert a transaction record and get its ID
  transaction_query = "INSERT INTO CollectableTransaction (date, source, counterparty) VALUES (%s, %s, %s) RETURNING id"
  transaction = None
  try:
    curr.execute(transaction_query, (s_date, source_id, counterparty_id,))
    transaction = curr.fetchone()
  except psycopg2.DatabaseError as e:
    return None
  if transaction is None:
    return None
  # Insert the disposed card record
  disposed_query = "INSERT INTO disposedCollectable (collectable, transaction, amount_received) VALUES (%s, %s, %s) RETURNING collectable"
  try:
    curr.execute(disposed_query, (card, transaction, s_price,))
    disposed_collectable = curr.fetchone()
    if disposed_collectable:
      print("Disposed card inserted successfully")
    return disposed_collectable
  except psycopg2.DatabaseError as e:
    return None

def create_acquired_card(curr, card, transaction, purchase_amount, shipping_costs='0', tax_costs='0'):
  query = "INSERT INTO acquiredCollectable (collectable, transaction, purchase_amount, shipping_costs, tax_costs) VALUES (%s, %s, %s, %s, %s) RETURNING collectable"
  try:
    curr.execute(query,(card, transaction, purchase_amount, shipping_costs, tax_costs))
    acquired_card = curr.fetchone()[0]
    if acquired_card:
      print('Acquired card inserted successfully')
    return acquired_card
  except Exception as e:
    return None

def get_or_insert_source(curr, source_name):
    source_id = None
    if pd.isna(source_name):
      source_name = 'default_counterparty'
    curr.execute("SELECT name FROM Source WHERE name = %s", (source_name,)) # check if it exists
    source_id = curr.fetchone()

    if source_id is None:
      curr.execute("INSERT INTO Source (name) VALUES (%s) RETURNING name", (source_name,)) # insert if it doesn't exist
      source_id = curr.fetchone()[0]
    return source_id

def get_or_insert_counterparty(curr, counterparty_name):
  counterparty_id = None
  if pd.isna(counterparty_name):
    counterparty_name = 'default_counteparty'
  curr.execute("SELECT name FROM Counterparty WHERE name = %s", (counterparty_name,)) # check if it exists
  counterparty_id = curr.fetchone()

  if counterparty_id is None:
    curr.execute("INSERT INTO Counterparty (name) VALUES (%s) RETURNING name", (counterparty_name,)) # insert if it doesn't exist
    counterparty_id = curr.fetchone()[0]
  return counterparty_id

def get_or_create_buying_transaction(curr, cached_transactions, b_date, counterparty_name, source):
  try: # cache hit
    transaction = cached_transactions[(b_date, counterparty_name)]
    return transaction, cached_transactions
  except KeyError: # cache miss
    source_id = get_or_insert_source(curr, source) # get source
    counterparty_id = get_or_insert_counterparty(curr, counterparty_name) #get counterparty
    if source_id is None or counterparty_id is None:
      return None, cached_transactions
    try:
      curr.execute("INSERT INTO CollectableTransaction (date, source, counterparty) VALUES (%s, %s, %s) RETURNING id", (b_date, source_id, counterparty_id,))
      transaction_id = curr.fetchone()
      # add to the cached transaction
      cached_transactions[(b_date, counterparty_name)] = transaction_id
      if transaction_id:
        print("Transaction inserted successfully")
      return transaction_id, cached_transactions
    except Exception as e:
      conn.rollback()
      return None, cached_transactions

def create_card_instance(curr, expansion, number, date_acquired, condition, language, in_collection, version, inked, comment):
  if condition == 'NM+':
    condition = 'MT'
  try:
    query_collectable = """
      INSERT INTO Collectable (in_collection, date_acquired, is_arrived, comment) VALUES (%s, %s, %s, %s) RETURNING id
    """
    curr.execute(query_collectable, (in_collection, date_acquired, True, comment))
    collectable_id = curr.fetchone()[0]
    query_cardinstance = """
      INSERT INTO CardInstance (card, cardtype_number, cardtype_expansion, language, condition, version, inked)
      VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING card
    """
    curr.execute(query_cardinstance, (collectable_id, number, expansion, language, condition, version, inked))
    card = curr.fetchone()
    if card is not None:
      print(f"CardInstance ({expansion}, {number}, {version}) inserted successfully.")
  except Exception as e:
    print(f"Exception while inserting a CardInstance: {e}")
  finally:
    return card

def get_actual_expansion(curr, expansion, number):
  actual_expansion=None
  query = """
          SELECT CardType.expansion
          FROM CardType
          WHERE CardType.expansion = %s AND CardType.number = %s
    """
  curr.execute(query, (expansion, number,))
  result = curr.fetchone()
  if result: # the expansion is a main one
    actual_expansion = result
  else:      # sub expansion
      query = """
        SELECT Cardexpansion.id
        FROM CardType, Cardexpansion
        WHERE Cardexpansion.super_expansion = %s
        AND CardType.expansion = Cardexpansion.id
        AND CardType.number = %s
      """
      curr.execute(query, (expansion, number,))
      result = curr.fetchone()
      if result:
        actual_expansion = result
  if actual_expansion:
    return actual_expansion[0]
  return None

def check_card_consistency(curr, expansion, number, version):
  query = """
    SELECT 1
    FROM versioncardtype
    WHERE card_expansion=%s
    AND card_number=%s
    AND version=%s
  """
  curr.execute(query, (expansion, number, version,))
  versioncardtype = curr.fetchone()
  return versioncardtype is not None # false when the combination exists

def insert_corrected_data(db_params, data_path):
  input_check = input("Did you run check_date_consistency(), fix_names_inconsistencies() and insert_version()?")
  if input_check.lower() != 'y':
    return
  conn = None
  curr = None
  try:
    conn = psycopg2.connect(**db_params)
    curr = conn.cursor()
  except Exception as e:
    print(f"Error while connecting to the DB: {e}")
    return

  df = pd.read_csv(data_path)
  if 'inserted' not in df.columns:
    df['inserted'] = False
    df['insert_comment'] = ''
    df.to_csv(data_path, index=False)

  cached_transactions = {} # cache to store transactions so that it's possible to deduce, given the date and the counterparty, a transaction
  for i, row in df.iterrows():
    if  row['date_comment'] != 'OK' or \
        row['language_comment'] != 'OK' or \
        row['status_name'] != 0 or \
        pd.isna(row['version']) or \
        row['version']=='ERROR_VERSION' or \
        row['inserted']=='TRUE': # unprocessed, erroneous or already inserted row
        continue

    result_msg = 'Inserted\n' # final message that will be displayed in the comments if all goes well

    # all the data of a card
    date_acquired = row['B-Date']
    if pd.isna(date_acquired):
      date_acquired = '2020-01-01' # handle missing information by setting a default value
    number = str(row['#'])
    actual_expansion = get_actual_expansion(curr, row['Set'], number) # to handle sub-expansions
    version = row['version']
    condition = row['Condition']
    if condition == 'M':
      condition = 'MT'
    if pd.isna(condition):
      print(f"ERROR: the card at row {i} has no condition")
      df.at[i, 'insert_comment'] = 'ERROR: The card has no condition'
      df.to_csv(data_path, index=False)
      continue
    language = row['Lang']
    in_collection = row['in collection']
    comment = row['Comments']
    inked = False
    if pd.notna(comment): # check inked status
      inked = 'inked' in comment.lower()
      # Remove 'inked' from comments
      comment = comment.lower().replace('inked', '').strip()

    if inked and condition != 'PO': # do not add cards that are stated as inked but their state is not poor
      print(f'ERROR: The card at row {i} is inked but its condition is not "PO"')
      df.at[i, 'insert_comment'] = 'ERROR: The card has inked in its comment but its condition is not PO'
      df.to_csv(data_path, index=False)
      continue

    check_card = check_card_consistency(curr, actual_expansion, number, version)
    if not check_card:
      print(f"Error! The card at row {i} has a combination of (set: {actual_expansion}, number: {number}, version: {version}) that doesn't exist in the DB.")
      df.at[i, 'insert_comment'] = 'ERROR: The (set, number, version) does not exist'
      df.to_csv(data_path, index=False)
      continue
    result_msg += 'Card checked\n'

    card = create_card_instance(curr, actual_expansion, number, date_acquired, condition, language, in_collection, version, inked, comment)
    if card is None:
      print(f"Error! An error occurred while inserting a card!")
      df.at[i, 'insert_comment'] = 'ERROR: an error occurred while inserting'
      df.to_csv(data_path, index=False)
      conn.rollback()
      continue

    result_msg += 'inserted\n'

    buying_price = row['B-Price']
    if not pd.isna(buying_price):
      seller = row['seller']
      source = row['source']
      transaction, cached_transactions = get_or_create_buying_transaction(curr, cached_transactions, date_acquired, seller, source)
      if pd.isna(transaction):
        df.at[i, 'insert_comment'] = 'ERROR: The card was inserted but while inserting its buying transaction an error occurred.'
        df.to_csv(data_path, index=False)
        conn.rollback()
        continue
      result_msg += 'buying transaction created\n'
      acquired_card = create_acquired_card(curr, card, transaction, buying_price)
      if acquired_card is None:
        df.at[i, 'insert_comment'] = 'ERROR: The transaction was inserted but an error occurred while inserting the acquired card.'
        df.to_csv(data_path, index=False)
        conn.rollback()
        continue
      result_msg += 'acquired_card created\n'

    selling_price = row['S-Price']
    selling_date = row['S-Date']
    if not pd.isna(selling_date) or not pd.isna(selling_price): # if one exists
      if not (pd.notna(selling_date) == pd.notna(selling_price)): # if one is missing then there's an error (either both or none)
        print("ERROR! Selling date or selling price is missing")
        df.at[i, 'insert_comment'] = 'ERROR: Missing selling date or selling price'
        df.to_csv(data_path, index=False)
        conn.rollback()
        continue
      result_msg += 'selling card inserted\n'

      disposed_card = create_disposed_card(curr, card, selling_date, selling_price)
      if disposed_card is None:
        print("ERROR! An error occurred while inserting a selling or disposed card")
        df.at[i, 'insert_comment'] = 'ERROR: An error occurred while inserting a selling or disposed card'
        df.to_csv(data_path, index=False)
        conn.rollback()
        continue
      else:
        result_msg += 'disposed card inserted\n'
    result_msg += 'Done!\n'
    df.at[i, 'inserted'] = True
    df.at[i,'insert_comment'] = result_msg
    df.to_csv(data_path, index=False)
    conn.commit()
  curr.close()
  print("### Completed! ###")

In [5]:
data_path = 'H:/My Drive/pkmn/pok_swap.csv'

db_params = {
    'host': '192.168.example.example',
    'port': '5432',
    'database': 'cardmaster',
    'user': 'example',
    'password': 'example'
}

insert_corrected_data(db_params, data_path)

Did you run check_date_consistency(), fix_names_inconsistencies() and insert_version()?y
CardInstance (BS, 16, default) inserted successfully.
Transaction inserted successfully
Acquired card inserted successfully
CardInstance (BS, 10, default) inserted successfully.
Acquired card inserted successfully
CardInstance (GH, 94, default) inserted successfully.
Transaction inserted successfully
Acquired card inserted successfully
CardInstance (GH, 60, default) inserted successfully.
Transaction inserted successfully
Acquired card inserted successfully
CardInstance (GH, 62, default) inserted successfully.
Transaction inserted successfully
Acquired card inserted successfully
CardInstance (GH, 63, default) inserted successfully.
Acquired card inserted successfully
CardInstance (GH, 67, default) inserted successfully.
Acquired card inserted successfully
CardInstance (GH, 84, default) inserted successfully.
Acquired card inserted successfully
CardInstance (GH, 112, default) inserted successfully.
