In [1]:
import setuptools
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
from datetime import datetime, timedelta
from tqdm.notebook import trange, tqdm

load_dotenv()
SPARK_HOST = os.getenv('SPARK_HOST')
SPARK_APP_NAME = os.getenv('SPARK_APP_NAME')

spark = SparkSession.builder.remote(SPARK_HOST).appName(SPARK_APP_NAME).getOrCreate()

In [None]:
# Load datasets

ROOT = os.getenv('WORKING_DIR')

trafidata = spark.read.parquet(ROOT + 'data/traficom_open_data_for_vehicles')
trafidata.show(5)

# Add index to traficom data for merging
trafidata = trafidata.withColumn('index', F.monotonically_increasing_id())

trafidata.write.options(encoding="ISO-8859-1", header=True, delimiter=";").parquet(ROOT + 'data/trafidata_with_indices')

taxprice = spark.read.parquet(ROOT + 'data/tax_data_with_price_predictions')
taxprice.show(5)

In [3]:
TRAFIDATA_ORIG = trafidata
TAXPRICE_ORIG = taxprice

# Select passanger cars
# https://www.traficom.fi/en/transport/road/vehicle-categories

trafidata = trafidata.filter(trafidata.vehicle_classification == 'M1')

DATA_START_DATE = '2011-01-01'

# Filter out old vehicles, match tax data start date
#trafidata = trafidata.filter(F.col('date_of_first_registration') >= DATA_START_DATE) \
#    .filter((F.col('date_of_use') >= DATA_START_DATE) | (F.col('date_of_use') == None))

# Lowercasing
trafidata = trafidata.withColumn('make_plaintext', F.lower(F.col('make_plaintext')))
taxprice = taxprice.withColumn('make', F.lower(F.col('make'))).withColumn('model', F.lower(F.col('model'))) \

# date is first registration or date of use, whichever is not null
taxprice = taxprice.withColumn('date', F.coalesce(F.col('date_of_first_registration'), F.col('date_of_use')))

# # add columns for traficom matched make and model
# taxprice = taxprice.withColumn('matched_make', F.lit(None).cast('string'))\
#     .withColumn('matched_model', F.lit(None).cast('string'))\
#     .withColumn('matched_trade_name', F.lit(None).cast('string'))

In [4]:
trafidata = trafidata.select('make_plaintext', 'model', 'manufac_trade_name',  'date_of_first_registration',
    'date_of_use', 'n_doors', 'drive_power', 'max_net_engine_power_kw', 'transmission', 'odometer', 'index')

In [None]:
path = ROOT + 'data/temp'
options = {'encoding':"ISO-8859-1", 'delimiter':";", 'header': True}

try:
    hammeasure = taxprice.select('index', 'make', 'model').write.options(**options).parquet(path)
except:
    pass

hammeasure = pd.read_parquet(path)
hammeasure.head()

In [None]:
import scipy
import numpy as np
import jellyfish

unique_makers = sorted([x[0] for x in trafidata.filter(F.col('make_plaintext').isNotNull()).select('make_plaintext').distinct().collect()])

manufacdata = dict()
modeldata = dict()

def get_col_unique_given_make(make, col):
    if col == 'manufac_trade_name':
        ret = manufacdata.get(make, None)
    elif col == 'model':
        ret = modeldata.get(make, None)
    
    if ret is None:
        ret = trafidata.filter((F.col('make_plaintext') == make) & (F.col(col).isNotNull())).select(F.lower(col)).distinct().collect()
    
    ret = [x[0] for x in ret]

    if col == 'manufac_trade_name':
        manufacdata[make] = ret
    elif col == 'model':
        modeldata[make] = ret

    return ret

# Match make and model
results = []

for i in tqdm(range(hammeasure.shape[0])):
    row = hammeasure.iloc[i, :]

    make = row['make']
    model = row['model']
    index = int(row['index'])

    # Find best matching make
    makedist = np.array([jellyfish.levenshtein_distance(make, x) for x in unique_makers])
    make0 = unique_makers[np.argmin(makedist)]

    # Find best matching manufacturer trade name given the best matching make
#    options = get_col_unique_given_make(make0, 'manufac_trade_name')

    # if len(options) > 0:
    #     tradedist = np.array([jellyfish.levenshtein_distance(make, x) for x in options])
    #     tradename0 = options[np.argmin(tradedist)]
    # else:
    #     tradename0 = None

    # # Find best matching model name
    # options = get_col_unique_given_make(make0, 'model')

    # if len(options) > 0:
    #     modeldist = np.array([jellyfish.levenshtein_distance(make, x) for x in options])
    #     model0 = options[np.argmin(modeldist)]
    # else:
    #     model0 = None

    #results.append([index, make0, model0, tradename0])
    results.append([index, make0])

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

df = pd.DataFrame.from_dict({
    'index': [x[0] for x in results],
    'matched_make_plaintext': [x[1] for x in results],
    # 'matched_model': [x[2] for x in results],
    # 'matched_manufac_trade_name': [x[3] for x in results]
})

df.head(50)

df = spark.createDataFrame(df)

makesprices = taxprice.join(df, on='index')

makesprices = makesprices.orderBy(F.rand())

makesprices.show(5)

tax_indices = makesprices.select('index').collect()
tax_indices = [x['index'] for x in tax_indices]


#print(tax_indices[:5])

# matched_data = spark.createDataFrame(df)

# matched_data.show(5)

# pd.DataFrame([indices, makes, models, tradenames], columns= ['index', ])

# # add columns for traficom matched make and model
# taxprice = taxprice.withColumn('matched_make', F.lit(None).cast('string'))\
#     .withColumn('matched_model', F.lit(None).cast('string'))\
#     .withColumn('matched_trade_name', F.lit(None).cast('string'))


In [None]:
from pyspark.storagelevel import StorageLevel

dou = 'date_of_use'
dofr = 'date_of_first_registration'

matches = []
used = set()

# trafidata = trafidata.select('n_doors', 'drive_power', 'max_net_engine_power_kw', 'transmission', 'odometer', 'index')

# Define distance function
def distance(tax, trafi, datecol):
    # Model
    if tax['model'] != None and  trafi['model'] != None:
        modeld = jellyfish.levenshtein_distance(tax['model'], trafi['model'])
        modeld = modeld / (1 + modeld) # map to [0, 1]
    else:
        modeld = np.inf

    # Date
    dated = abs((tax[datecol] - trafi[datecol]).days) # 0 or 1

    # Doors
    if tax['n_doors'] != None and trafi['n_doors'] != None:
        doord = 0 if trafi['n_doors'] == tax['n_doors'] else np.inf
    else:
        doord = 1

    # Odometer
    if tax['odometer_unit_1000km'] != None and trafi['odometer'] != None:
        odod = abs(trafi['odometer'] * 1000 - tax['odometer_unit_1000km']) 
        odod = odod / (1 + odod) # map to [0, 1]
    else:
        odod = 1

    return modeld + dated + doord + odod

makesprices1 = makesprices.select('index', 'matched_make_plaintext', 'date_of_use', 'date_of_first_registration',
    'model', 'odometer_unit_1000km', 'driving_power', 'n_doors', 'Kw', 'transmission', 'body_style', 'drivetrain').collect()
trafidata1 = trafidata.orderBy('date_of_first_registration').persist(StorageLevel.MEMORY_ONLY)
trafidata2 = trafidata.orderBy('date_of_use').persist(StorageLevel.MEMORY_ONLY)

# Iterate through all entries by key in random order

for i in tqdm(range(len(makesprices1))):
    result = makesprices1[i]

    if result[dou] == None:
        datecol = dofr
        date = result[dofr]
        datasrc = trafidata2
    else:
        datecol = dou
        date = result[dou]
        datasrc = trafidata1

    candidates = datasrc.filter((F.col(datecol) >= date - timedelta(days=1)) & (F.col(datecol) <= date + timedelta(days=1))).filter(F.col('make_plaintext') == result['matched_make_plaintext'])\
        .select('date_of_first_registration', 'date_of_use', 'model', 'odometer', 'drive_power', 'n_doors', 'max_net_engine_power_kw', 'transmission', 'index').collect()

    if len(candidates) == 0:
        continue

    candidates = [x for x in candidates if x['index'] not in used]

    if len(candidates) == 0:
        continue

    distances = [distance(result, x, datecol) for x in candidates]
    pair = candidates[np.argmin(distances)]
    matches.append((result['index'], pair['index']))
    used.add(pair['index'])


In [10]:
matchesdf = spark.createDataFrame(matches, ['tax_index', 'trafi_index'])
matchesdf.write.options(encoding="ISO-8859-1", header=True, delimiter=";").parquet(ROOT + 'data/tax_trafi_index_pairs')

In [None]:
trafidata = TRAFIDATA_ORIG
taxprice = TAXPRICE_ORIG

matches = matchesdf.withColumnRenamed('tax_index', 'index')
taxprice = taxprice.join(matches, on='index').withColumnRenamed('index', 'tax_index').withColumnRenamed('trafi_index', 'index')
trafidata = trafidata.join(taxprice, on='index')
trafidata.show(5)



In [8]:
# import numpy as np
# import matplotlib.pyplot as plt
# %matplotlib inline

# dates = trafidata.select('date_of_first_registration').distinct().orderBy(F.col('date_of_first_registration'), ascending=False).collect()
# dates = [x[0] for x in dates]
# distincts = np.zeros(len(dates))

# merged = None

# for i, date in enumerate(dates[365:365+10]):
#     traf = trafidata.filter(F.col('date_of_first_registration') == date).groupBy('make_plaintext').count().select('make_plaintext', 'count').collect()
#     tax = taxprice.filter(F.col('date_of_first_registration') == date).groupBy('make').count().select('make', 'count').collect()

#     trafcounts = dict([(x[0], x[1]) for x in traf ])
#     taxcounts = dict([(x[0], x[1]) for x in tax ])
#     mergeable = []

#     for k, v in taxcounts.items():
#         if k not in trafcounts:
#             del trafcounts[k]

#         if v == 1 and trafcounts.get(k, None) == 1:
#             mergeable.append(k)

#     print(date)
#     print(f'tra {trafcounts}')
#     print(f'tax {taxcounts}')
#     print(mergeable)

