<a href="https://colab.research.google.com/github/NetoRibeiro/DATA8001Assignment1/blob/main/R00206995_util.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Import Libraries

In [1]:
%matplotlib inline

import matplotlib.pyplot as plt
from matplotlib import colors
import numpy as np
import pandas as pd
from datetime import datetime as dt

import calendar
import re
import string

import math
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score, accuracy_score

##ETL Functions

In [2]:
def processed_csv(df_original, df_registration_final, 
                  df_car_model_final, df_tax_band_final, df_colour_final):
  """
  Purpose of this funtion: Join all Data Frames into one
  :param df_original: Data Frame
  :param df_registration_final: Data Frame
  :param df_car_model_final: Data Frame
  :param df_tax_band_final: Data Frame
  :param df_colour_final: Data Frame
  return: Processed Data Frame
  """
  try:
    df_in_list = [df_registration_final, df_car_model_final,
                  df_tax_band_final, df_colour_final]

    df = pd.DataFrame.join(df_original, df_in_list)

    df.drop(['car_reg',  'county', 'make', 'model', 'type',
                              'colour', 'purchase_date', 'tax_band'],
                      axis=1, inplace=True)
    
    df.rename(columns={"car_reg_new": "car_reg",
                                "county_new": "county",
                                "make_new": "make",
                                "model_new": "model", "type_new": "type",
                                "colour_new": "colour",
                                "purchase_date_new": "purchase_date",
                                "tax_band_new": "tax_band"},
                        inplace=True)
        
    df['car_reg'] = df['car_reg'].str.upper()
    df['year'] = pd.to_numeric(df['year'])
    df['month'] = pd.to_numeric(df['month'])
    df['county'] = df['county'].str.upper()
    df['make'] = df['make'].str.upper()
    df['model'] = df['model'].str.upper()
    df['type'] = df['type'].str.upper()
    df['colour'] = df['colour'].str.upper()
    df['price'] = df['price'].astype(float)

    df_processed = df[
                      ['car_reg', 'purchase_date', 'year', 'month', 'county',
                       'make', 'model', 'type', 'colour',  'tax_band', 'price']
                      ].copy()

    return df_processed

  except Exception as ex:
    raise Exception(f'Error when try to process data frames:: {ex}')

In [3]:
def etl_process(file_name):

  """
  Purpose of this function: Validate the provided data set.
  :param file_name: String variable with full file adrress.
  return: transformed data set
  
  Validation: Only allows these columns on the raw csv file:

    ['car_reg', 'purchase_date', 'county', 'make', 'model', 'type', 'colour',
       'tax_band', 'price']

  """
  expected_features_names = ['car_reg',  'county', 'make', 'model', 'type',
                             'colour', 'purchase_date', 'tax_band', 'price']
  
  try:

    df_original = pd.read_csv(file_name)
    
  except Exception as ex:
    raise Exception(f'CSV file does not match with the spected layout:: {ex}') 

  try:
    
    df_columns = df_original.columns

    received_features_name = []
    for feature in df_columns:
      received_features_name.append(feature)

    missing_feature = []
    for column in received_features_name:
      if column not in expected_features_names:
        missing_feature.append(column)

    if len(missing_feature) > 0:
      raise Exception(f'Column Name is not matching')
    
    else:
      print('Looks good')
      
      return df_original
    
  except Exception as ex:
    raise Exception(f'CSV file does not match with the spected layout:: {ex}') 

In [4]:
def registration_status(car_registration, looking_for):
  '''
  Purpose of that function: To get patterns on the CAR Registration
  :param car_registration: String value
  :param looking_for: Patterns, Year, County Code, Registration_number
  '''
  # Get the century for the registration
  reg_date = dt.fromordinal(dt.today().toordinal())
  reg_century = reg_date.strftime('%Y')[:2]

  #Create the Car registration patterns
  reg_patterns = '[0-9]+[-]+[a-z]+[-]+[0-9]'
  reg_missing_year_county = '[x]+[-]+[x]+[-]+[0-9]'
  reg_missing_year = '[x]+[-]+[a-z]+[-]+[0-9]'
  reg_missing_county = '[0-9]+[-]+[x]+[-]+[0-9]'
  car_reg_return = ""
  try:
    
    looking_for = looking_for.lower()
    reg_year, reg_county, reg_number = car_registration.split("-")

    if looking_for == "registration_number":
      car_reg_return = reg_number

    elif (re.search(reg_patterns, car_registration)):
      
      if looking_for == "patterns":
        car_reg_return = "match"
      elif looking_for == "year":
        car_reg_return = f'{reg_century}{reg_year[:2]}'
      elif looking_for == "county":
        car_reg_return = reg_county
      else:
        car_reg_return = ""
      
    elif (re.search(reg_missing_year_county, car_registration)):
      
      if looking_for == "patterns":
        car_reg_return = "missing_year_county"
      elif looking_for == "year":
        car_reg_return = ""
      elif looking_for == "county":
        car_reg_return = ""
      else:
        car_reg_return = ""
      
    elif (re.search(reg_missing_year, car_registration)):
      
      if looking_for == "patterns":
        car_reg_return = "missing_year"
      elif looking_for == "year":
        car_reg_return = ""
      elif looking_for == "county":
        car_reg_return = reg_county
      else:
        car_reg_return = ""

    elif (re.search(reg_missing_county, car_registration)):
      
      if looking_for == "patterns":
        car_reg_return = "missing_county"
      elif looking_for == "year":
        car_reg_return = f'{reg_century}{reg_year[:2]}'
      elif looking_for == "county":
        car_reg_return = ""
      else:
        car_reg_return = ""

    else:
      car_reg_return = ""
      
    return car_reg_return
    

  except Exception as ex:
    raise Exception(f'Car registration is not matching with the expected paterns :: {ex}')   

In [5]:
def get_named_colours(named_col, looking_for):
  """
  Purpose of this function: Return transformed colour variable
  :param named_col: str orignal var
  :param looking_for: str Options: NAMED or HEXA
  :return: Named colours when Named or Hexadcimal colour when 
  ###Examples:
  For string "RED":
    When Named  return "red"
    When HEXA "RED" retunr "#FF0000"
  For string "#FF0000":
    When Named  return "red"
    When HEXA "RED" retunr "#FF0000"
  """
  try:
  
    looking_for = looking_for.lower()
  
  except Exception as ex:
    raise Exception(f'Second variable is not a string:: {ex}')  
  
  try:
    
    iscolour = colors.is_color_like(named_col)
  
  except Exception as ex:
    raise Exception(f'Colour name is not a valid format:: {ex}') 

  try:
    
    if iscolour:
      named_col = named_col
    else:
      named_col = named_col.replace("<colour>","").replace("</colour>","")
      
    for name in list_named_colors:

      if named_col.lower() in name:
        inside_name = name[0]
        inside_name = inside_name.replace("xkcd:","")
        inside_hexa = name[1]
        break
      
      elif named_col in name:
        inside_name = name[0]
        inside_name = inside_name.replace("xkcd:","")
        inside_hexa = name[1]
        break
      
      else:
        inside_name = ""
        inside_hexa = ""
    
    if looking_for == "named":
      return inside_name
    
    elif looking_for == "hexa":
      return inside_hexa
    
    else:
      print(f'Please choose NAMED or HEXA columns\t{looking_for}')
    
  except Exception as ex:
    raise Exception(f'Error:: {ex}')

In [6]:
def get_month_num(month):
  month_dict = dict((month.lower(), index) for index, month in enumerate(calendar.month_abbr) if month)
  try:
    month_num = month_dict[month.lower()]
    return month_num
  except Exception as ex:
    raise Exception(f'Month does not matching with the expected patterns :: {ex}')  


def isNumber(numb):
  is_num = bool(re.search(r'\d', numb))
  return is_num

  
def get_semester_num(month):
  '''
  Purpose of that function it is to convert the Month in Semester (1 or 2)
  :param month: Month from the purchase date
  '''
  try:
    
    month_num = pd.to_numeric(month)
    
    if month_num > 6:
      return 2
    
    else:
      return 1
  
  except Exception as ex:
    raise Exception(f'Month number does not matching with the expected patterns :: {ex}')  
                     

def date_patterns(purchase_dates, looking_for, registration):
  '''
  Purpose of that function: Find patterns on the date format
  :param purchase_dates: string date in original format
  :param looking_for: Patterns, Year, Month, Day
  :param registration: String registration number
  '''
  dt_patterns_yyyymmdd = '[0-9]+[-]+[0-9]+[-]+[0-9]'
  dt_patterns_yyyymmmdd = '[0-9]+[-]+[a-z]+[-]+[0-9]'
  dt_missing_year_ddmmm = '[0-9]+[-]+[a-z]'
  dt_missing_year_dd_mmm = '[0-9]+[ ]+[a-z]'
  car_purchase_date_return = ""
  try:
    looking_for = looking_for.lower()
    str(purchase_dates)
    if (re.search(dt_patterns_yyyymmdd, purchase_dates)):
      if looking_for == "patterns":
        car_purchase_date_return = "yyyy-mm-dd"
      elif looking_for == "year":
        car_purchase_date_return = purchase_dates[:4]
      elif looking_for == "month":
        car_purchase_date_return = purchase_dates[5:7]
      elif looking_for == "day":
        car_purchase_date_return = purchase_dates[8:10]
      else:
        car_purchase_date_return = ""

      
    elif (re.search(dt_patterns_yyyymmmdd, purchase_dates)):
      if looking_for == "patterns":
        car_purchase_date_return = "yyyy-mmm-dd"
      elif looking_for == "year":
        car_purchase_date_return = purchase_dates[:4]
      elif looking_for == "month":
        month = purchase_dates[5:8]
        car_purchase_date_return = get_month_num(month)
      elif looking_for == "day":
        car_purchase_date_return = purchase_dates[9:11]
      else:
        car_purchase_date_return = ""
      
      
    elif (re.search(dt_missing_year_ddmmm, purchase_dates)):
      if looking_for == "patterns":
        car_purchase_date_return = "ddmmm"
      elif looking_for == "year":
        car_purchase_date_return = registration_status(registration, "YEAR")
      elif looking_for == "month":
        month = purchase_dates[2:5]
        car_purchase_date_return = get_month_num(month)
      elif looking_for == "day":
        car_purchase_date_return = purchase_dates[:2]
      else:
        car_purchase_date_return = ""

    elif (re.search(dt_missing_year_dd_mmm, purchase_dates)):
      if looking_for == "patterns":
        car_purchase_date_return = "dd_mmm"
      elif looking_for == "year":
        car_purchase_date_return = registration_status(registration, "YEAR")
      elif looking_for == "month":
        month = purchase_dates[3:6]
        car_purchase_date_return = get_month_num(month)
      elif looking_for == "day":
        car_purchase_date_return = purchase_dates[:2]
      else:
        car_purchase_date_return = ""
      
    else:
      
      car_purchase_date_return = "verify"
      
    return car_purchase_date_return
    #return car_reg_status

  except Exception as ex:
    raise Exception(f'Purchase date is not matching with the expected patterns :: {ex}')  

In [7]:
def return_to_patterns(col1, col2, col3, col4, pattern):
  """
  The purpose of the function:
  Concatenate string columns into the requested pattern
  :param col1: String (Year)
  :param col2: String (County or Month)
  :param col3: String (Registration Number or Day)
  :param col4: String (Semester)
  :param pattern: String to create Car Registration or Date
  :return: New string combined the columns into the requested pattern
  """
  try:
    reg_date = dt.fromordinal(dt.today().toordinal())
    reg_century = reg_date.strftime('%Y')[:2]
    pattern = pattern.lower()
    
    if pattern == "registration":
      
      try:
        if len(col1) == 4:
          year = col1[2:4]
        elif len(col1) == 2:
          year = col1
        else:
          year = ""
          
        if len(col2) == 1:
          county_code =  col2.upper()
        else:
          county_code = ""

        if len(col3) > 0:
          reg_number = col3
        else:
          reg_number = ""
          
        if 0 < col4 < 3:
          semester = col4
        else:
          semester = ""
        
        #Only in 2013 the car registration started to receive the semester
        try:
          year = int(year)
        except Exception as ex:
          raise Exception(f'Try to convert Year into Integer :: {ex}')

        if year > 2012:
          pattern_return = f'{year}-{county_code}-{reg_number}'  
        else:
          pattern_return = f'{year}{semester}-{county_code}-{reg_number}'  

        return pattern_return
        
          
      except Exception as ex:
        raise Exception(f'Error Registration Patterns :: {ex}')
    elif pattern == "date":
      try:
        if len(col1) == 4:
          year = col1
        elif len(col1) == 2:
          year = f'{reg_century}{col1}'
        else:
          year = ""
          
        if col2 < 10:
          month =  f'0{col2}'
        elif col2 > 0:
          month =  col2
        else:
          month = ""

        if len(col3) == 1:
          day = f'0{col3}'
        elif len(col3) == 2:
          day = col3
        else:
          day = ""
        
        string_date = f'{year}-{month}-{day}'
        pattern_return = dt.fromisoformat(string_date)
        return pattern_return
      
      except Exception as ex:
          raise Exception(f'Error Date Patterns: {ex}')
    
    
  except Exception as ex:
    raise Exception(f'error: {ex}')

In [8]:
def split_car_model_pattern(make, model, cartype, looking_for):
  """
  Purpose of this function: To find the correct information for
  :param make: Str
  :param model: Str
  :param cartype: Str
  :param looking_for: Str: waiting for the following options: 

                                        AllInOne, Make,
                                        Model, type and looking_for
  return: Transformed requested option.
  """

  inside_make = ""
  inside_model_type = ""
  inside_model = ""
  inside_type = ""
  make_pattern_col = '[]:[]'
  make_pattern_obra = '[]([]'
  make_pattern_cbra = '[])[]'
  return_column = looking_for.lower()
  
  if (re.search(make_pattern_col, make)):
    start_c, end_c = re.search(make_pattern_col, make).span()
    inside_make = make[:start_c].strip().lower()
    inside_model_type = make[end_c:].strip().lower()

    if (re.search(make_pattern_obra, inside_model_type)):
      start_o, end_o = re.search(make_pattern_obra, inside_model_type).span()
      inside_type = inside_model_type[end_o:].replace(")","").strip().lower()
      inside_model = inside_model_type[:start_o].strip().lower()
      ma_mo_ty_return = f'{inside_make}|{inside_model}|{inside_type}'
      
    else:
      inside_model = make[end_c:].strip().lower()
      inside_type = cartype.lower()
      ma_mo_ty_return = f'{inside_make}|{inside_model}|{inside_type}'

  elif (re.search(make_pattern_col, model)):
    start_t, end_t = re.search(make_pattern_col, model).span()
    inside_make = make.strip().lower()
    inside_model = model[:start_t].strip().lower()
    inside_type = model[end_t:].strip().lower()
    ma_mo_ty_return = f'{inside_make}|{inside_model}|{inside_type}'
  
  else:
    inside_make = make.strip().lower()
    inside_model = model.strip().lower()
    inside_type = cartype.strip().lower()
    ma_mo_ty_return = (f'{inside_make}|{inside_model}|{inside_type}')

  if return_column == "allinone":
    return ma_mo_ty_return
  elif return_column == "make":
    return inside_make
  elif return_column == "model":
    return inside_model
  elif return_column == "type":
    return inside_type
  else:
    return ""

##Transform File

###Registration ETL

In [9]:
df_original = etl_process('/content/drive/MyDrive/Data Science and Analytics/R00206995/data/R00206995_original.csv')

Looks good


In [10]:
#Create a Registration related features
df_registration = df_original[{'car_reg', 'purchase_date', 'county'}].copy()

In [11]:
#Transform features to lower case
df_registration['purchase_date'] = df_registration['purchase_date'].str.lower()
df_registration['county'] = df_registration['county'].str.lower()
df_registration['car_reg'] = df_registration['car_reg'].str.lower()

In [12]:
# Create new column feature using lambda function
df_registration['reg_county'] = df_registration.apply(lambda row: registration_status(row['car_reg'], "COUNTY"), axis=1)
df_registration['reg_number'] = df_registration.apply(lambda row: registration_status(row['car_reg'], "Registration_number"), axis=1)

In [13]:
df_counties = df_registration.loc[(df_registration['county'].isnull() == False) & (df_registration['reg_county'] != "")][{'county', 'reg_county', 'car_reg'}].copy()

In [14]:
df_counties = df_counties.groupby(['county', 'reg_county'], as_index=False)['car_reg'].count()

In [15]:
# encoder for County and County Code
label_county = LabelEncoder()
label_reg_county = LabelEncoder()

In [16]:
# train the encoder for County and County Code
label_county.fit(df_counties['county'])
label_reg_county.fit(df_counties['reg_county'])

LabelEncoder()

In [17]:
# transform and add new columns with the label encoder
df_counties['county_lbl'] = label_county.transform(df_counties['county'])
df_counties['reg_county_lbl'] = label_reg_county.transform(df_counties['reg_county'])

In [18]:
# Drop unsed column
df_counties.drop(['car_reg'], axis=1, inplace=True)

In [19]:
# Create Labeled data frame
df_county_label = df_counties[{'county', 'county_lbl'}].copy()
df_county_code_label = df_counties[{'reg_county', 'reg_county_lbl'}].copy()

In [20]:
#Merge the county label to the registration data frame
df_registration = pd.merge(df_registration, df_county_label, how='left', on='county')
df_registration = pd.merge(df_registration, df_county_code_label, how='left', on='reg_county')

In [21]:
#Fill county label missing values with the ones is not missing on county code label
df_registration['county_lbl'].fillna(df_registration['reg_county_lbl'], axis=0, inplace=True)

In [22]:
#Drop Reg_County_lbl
df_registration.drop(['reg_county_lbl'], axis=1, inplace=True)

In [23]:
#Convert to integer
df_registration['county_lbl'] = df_registration['county_lbl'].astype(int)

In [24]:
# transform and add new columns with the label encoder
df_registration['county_new'] = label_county.inverse_transform(df_registration['county_lbl'])
df_registration['reg_county'] = label_reg_county.inverse_transform(df_registration['county_lbl'])

###Purchese Date ETL

In [25]:
#Apply lambda function to get year and month from purchase date
df_registration['year'] = df_registration.apply(lambda row: date_patterns(row['purchase_date'], "YEAR", row['car_reg']), axis=1)
df_registration['month'] = df_registration.apply(lambda row: date_patterns(row['purchase_date'], "MONTH", row['car_reg']), axis=1)
df_registration['purchase_day'] = df_registration.apply(lambda row: date_patterns(row['purchase_date'], "DAY", row['car_reg']), axis=1)
df_registration['month'] = pd.to_numeric(df_registration['month'])

In [26]:
#Apply lambda function to get semester from purchase date
df_registration['purchase_semester'] = df_registration.apply(lambda row: get_semester_num(row['month']), axis=1)

In [27]:
# Apply lambda function to get Car Registration or Date with the correct pattern
df_registration['purchase_date_new'] = df_registration.apply(
    lambda row: return_to_patterns(
        row['year'], row['month'], row['purchase_day'], row['purchase_semester'], "DATE"),
    axis=1)

In [28]:
df_registration['car_reg_new'] = df_registration.apply(
    lambda row: return_to_patterns(
        row['year'], row['reg_county'], row['reg_number'], row['purchase_semester'], "REGISTRATION"),
    axis=1)

In [29]:
df_registration_final = df_registration[{'year', 'month', 'car_reg_new', 'purchase_date_new', 'county_new'}].copy()

###Tax Band ETL

In [30]:
# Create a data frame for start the tax band feature
df_tax_band = df_original.loc[df_original['tax_band'].isnull() == False][{'tax_band'}].copy()

In [31]:
# Change to upper case
df_tax_band['tax_band'] = df_tax_band['tax_band'].str.upper()

In [32]:
# Apply lambda function to find numbers
df_tax_band['isNumber'] = df_tax_band['tax_band'].apply(lambda row: isNumber(row))

In [33]:
# Create a data frame with only String rows
df_tax_band_string = df_tax_band.loc[df_tax_band['isNumber'] == False ][{'tax_band', 'isNumber'}].copy()

In [34]:
# All ASCII uppercases
df_tax_band_train = pd.DataFrame(list(string.ascii_uppercase))

In [35]:
# Create a train data frame for string tax band
df_tax_band_train.rename({0: "tax_band"}, axis=1, inplace=True)

In [36]:
# encoder for Tax Band
label_band = LabelEncoder()

In [37]:
# train the encoder for Tax Band
label_band.fit(df_tax_band_train['tax_band'])

LabelEncoder()

In [38]:
# transform and add new columns with the label encoder
df_tax_band_string['tax_band_lbl'] = label_band.transform(df_tax_band_string['tax_band'])

In [39]:
# Apply group by to remove duplication
df_tax_band_group = df_tax_band_string.groupby(['tax_band', 'tax_band_lbl'], as_index=False)['isNumber'].count()

In [40]:
# Remove unsed column
df_tax_band_group.drop(['isNumber'], axis=1, inplace=True)

In [41]:
# Merge data frames
df_tax_band = pd.merge(df_tax_band, df_tax_band_group, how='left', on='tax_band')

In [42]:
# Fill all NA with the tax_band feature
df_tax_band['tax_band_lbl'].fillna(df_tax_band['tax_band'], axis=0, inplace=True)

In [43]:
# Convert label feature to int
df_tax_band['tax_band_lbl'] = df_tax_band['tax_band_lbl'].astype(int)

In [44]:
# Apply inverse function to get the String for tax
df_tax_band['tax_band_new'] = label_band.inverse_transform(df_tax_band['tax_band_lbl'])

In [45]:
# Get final data frame for tax band
df_tax_band_final = df_tax_band[{'tax_band_new'}].copy()

###Make, Mode and Type ETL

In [46]:
# Create a car model data frame
df_car_model = df_original[{'make', 'model', 'type'}].copy()

In [47]:
# Replace NA for ""
df_car_model['model'].fillna("", axis=0, inplace=True)
df_car_model['type'].fillna("", axis=0, inplace=True)

In [48]:
# Apply lambda function to get transformed features
df_car_model['make_new'] = df_car_model.apply(lambda row: split_car_model_pattern(row['make'], row['model'], row['type'], "Make"), axis=1)
df_car_model['model_new'] = df_car_model.apply(lambda row: split_car_model_pattern(row['make'], row['model'], row['type'], "Model"), axis=1)
df_car_model['type_new'] = df_car_model.apply(lambda row: split_car_model_pattern(row['make'], row['model'], row['type'], "Type"), axis=1)

In [49]:
# Filter only used columns
df_car_model_final = df_car_model[{'make_new', 'model_new', 'type_new'}].copy()

###Colour ETL

https://matplotlib.org/stable/api/colors_api.html?highlight=colors#module-matplotlib.colors

In [50]:
# Filter only Colour to the data frame
df_colour = df_original[{'colour'}].copy()

In [51]:
# Get the named colours from Matplotlib colors
table_of_colours = colors.get_named_colors_mapping()

In [52]:
# Transform the table colours into a list
list_named_colors = []
for row in table_of_colours.items():
  list_named_colors.append(row)

In [53]:
# Apply lambda funtion to get the named colour
df_colour["colour_new"] = df_colour['colour'].apply(
    lambda row: get_named_colours(row, "Named")
)

In [54]:
# Apply lambda funtion to get the hexadecimal colour
df_colour["colour_hexa"] = df_colour['colour'].apply(
    lambda row: get_named_colours(row, "Hexa")
)

In [55]:
df_colour_final = df_colour[{'colour_new'}].copy()

### Combine Transformed Data Frame into One

In [56]:
df_processed = processed_csv(df_original, df_registration_final, 
                  df_car_model_final, df_tax_band_final, df_colour_final)

In [57]:
df_processed.to_csv('/content/drive/MyDrive/Data Science and Analytics/R00206995/data/R00206995_processed.csv', index=False)