In [1]:
# Installing required packages
!pip install pyspark==3.1.2
!pip install findspark==2.0.1
!pip install psycopg2==2.9.3
!pip install pyarrow==1.0.0
!pip install pandas==1.1.4
!pip install numpy==1.19.5
!pip install python-Levenshtein
!pip install python-translator
!pip install fuzzywuzzy
!pip install openpyxl
!pip install requests
!pip install bs4



In [3]:
import findspark
import os
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,IntegerType,LongType,StringType,FloatType,DoubleType,DecimalType
import numpy as np
import pandas as pd
from python_translator import Translator
from fuzzywuzzy import process
import re
import math as mt

In [3]:
findspark.init("C:/******/**********/Desktop/ETL/.env/Lib/site-packages/pyspark")

In [4]:
configure = SparkConf().set("spark.jars.packages","com.crealytics:spark-excel_2.12:0.13.7")

In [5]:
spark = SparkSession.builder.appName("get_tried").config(conf = configure).master("local[3]").getOrCreate()

In [6]:
spark

In [7]:
Target_d = spark.read.format("com.crealytics.spark.excel").option("inferSchema", "True").\
    option("header","True").load("C:/****/*************/Downloads/*****************/***************/Target Data.xlsx")

In [237]:
Supplier_d = spark.read.json("C:/******/*************/Downloads/**************/**********/supplier_car.json")

In [74]:
Target_d.printSchema()

root
 |-- carType: string (nullable = true)
 |-- color: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- make: string (nullable = true)
 |-- manufacture_year: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- mileage_unit: string (nullable = true)
 |-- model: string (nullable = true)
 |-- model_variant: string (nullable = true)
 |-- price_on_request: string (nullable = true)
 |-- type: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- manufacture_month: string (nullable = true)
 |-- fuel_consumption_unit: string (nullable = true)



In [284]:
Supplier_d.printSchema()

root
 |-- Attribute Names: string (nullable = true)
 |-- Attribute Values: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- MakeText: string (nullable = true)
 |-- ModelText: string (nullable = true)
 |-- ModelTypeText: string (nullable = true)
 |-- TypeName: string (nullable = true)
 |-- TypeNameFull: string (nullable = true)
 |-- entity_id: string (nullable = true)



In [285]:
#view = {Target_d.select('*').dropDuplicates().show()}
#view

In [286]:
#view = {col:Target_d.select(F.col(col)).dropDuplicates().sort(F.col(col),ascending = False).show() for col in Target_d.columns}
#view

In [287]:
#view_d = {col:Target_d.filter(F.col(col).like('null')).count() for col in Target_d.columns}
#view_d

In [288]:
#vw = {col:Target_d.select(F.col(col)).distinct().rdd.map(lambda x: x[0]).collect() for col in Target_d.columns}
#vw

In [13]:
pnd = Target_d.toPandas()
vw = {col : pnd[col].unique() for col in Target_d.columns}
vw

{'carType': array(['Convertible / Roadster', 'Coupé', 'Custom', 'null', 'SUV',
        'Other', 'Saloon', 'Single seater', 'Station Wagon', 'Targa'],
       dtype=object),
 'color': array(['White', 'Other', 'Blue', 'Black', 'Silver', 'Brown', 'Red',
        'Gray', 'Green', 'Beige', 'Yellow', 'Orange', 'Purple', 'Gold'],
       dtype=object),
 'condition': array(['Used', 'Original Condition', 'Restored', 'New',
        'Used with guarantee', 'Restoration Project'], dtype=object),
 'currency': array(['USD', 'JPY', 'GBP', 'EUR', 'CHF', 'AUD'], dtype=object),
 'drive': array(['LHD', 'RHD', 'null'], dtype=object),
 'city': array(['Zuzwil', 'London', 'Scotts Valley', 'Hong Kong', 'Bovenden',
        'Isernhagen', 'De Lier', 'Berlin', 'Waalwijk', 'Lyon', 'Aachen',
        'Düsseldorf', 'Harelbeke', 'Neustadt', 'Baierbrunn bei München',
        'Bramley', 'CH-8852 Altendorf / SZ', 'Singen', 'St. Louis',
        'Erkelenz', 'HETEREN', 'Courbevoie', 'Mill', 'Overijse', 'Paris',
        'Hunting

Schema mapped from the Supplier data including the Attribute Names 

In [290]:
"""
schemaMap - A function that maps the column names
    from the source data to form a new schema
@supplier_PandasDF: a source data
@Returns: the new shema
"""

def schemaMap(supplier_PandasDF):
    ls = supplier_PandasDF.columns.tolist()
    nw = supplier_PandasDF["Attribute Names"].unique().tolist()
    col = ls + nw
    [col.remove(i) for i in ["Attribute Values","Attribute Names","entity_id"]]
    sch = StructType()
    for a in range(0,len(col)):
        sch.add(StructField(str(col[a]),StringType(),True))
    return sch

In [261]:
# result of schema with all column names and Attribute names
schemaMap(Supplier_d.toPandas())

StructType(List(StructField(ID,StringType,true),StructField(MakeText,StringType,true),StructField(ModelText,StringType,true),StructField(ModelTypeText,StringType,true),StructField(TypeName,StringType,true),StructField(TypeNameFull,StringType,true),StructField(Seats,StringType,true),StructField(Hp,StringType,true),StructField(FuelTypeText,StringType,true),StructField(Ccm,StringType,true),StructField(BodyColorText,StringType,true),StructField(ConsumptionTotalText,StringType,true),StructField(Doors,StringType,true),StructField(Co2EmissionText,StringType,true),StructField(TransmissionTypeText,StringType,true),StructField(FirstRegYear,StringType,true),StructField(Properties,StringType,true),StructField(ConditionTypeText,StringType,true),StructField(Km,StringType,true),StructField(DriveTypeText,StringType,true),StructField(InteriorColorText,StringType,true),StructField(ConsumptionRatingText,StringType,true),StructField(FirstRegMonth,StringType,true),StructField(City,StringType,true),StructFi

processing the supplier data and filtering on the product ID

In [292]:
"""
ProcessTranspose - A function that filters
    each product data with same ID and completes
    it's attributes and values
@iterator: a set of rows passed as collection
    from dataset
@Returns: A processed dataframe
"""


def ProcessTranspose(iterator):
    i = 1
    for pdf in iterator:
        if i:
            column = schemaMap(pdf)
            NewSupply_d = pd.DataFrame(data = [], columns = column.names)
            i -= 1
        #pdf.sort_values(by = "ID", inplace = True)
        id_list = pdf["ID"].unique().tolist()
        d_pdf  = pdf.set_index("Attribute Names")
        for id in id_list:
            data = d_pdf[d_pdf["ID"] == id]
            data = data.transpose()
            for key in data.columns.tolist():
                NewSupply_d.loc[id, key] = data.loc["Attribute Values"][key]

            NewSupply_d.loc[id,"MakeText"] = data.loc["MakeText"][0]
            NewSupply_d.loc[id,"ModelText"] = data.loc["ModelText"][0]
            NewSupply_d.loc[id,"ModelTypeText"] = data.loc["ModelTypeText"][0]
            NewSupply_d.loc[id,"TypeName"] = data.loc["TypeName"][0]
            NewSupply_d.loc[id,"TypeNameFull"] = data.loc["TypeNameFull"][0]
            NewSupply_d.loc[id,"ID"] = data.loc["ID"][0]

        NewSupply_d.reset_index(inplace = True)
                
    yield NewSupply_d


Result of prcessing

In [328]:
schema = schemaMap(Supplier_d.toPandas())
data1 = Supplier_d.select("*").sort("ID").mapInPandas(ProcessTranspose,schema).toPandas()
data1

Unnamed: 0,ID,MakeText,ModelText,ModelTypeText,TypeName,TypeNameFull,Seats,Hp,FuelTypeText,Ccm,...,FirstRegYear,Properties,ConditionTypeText,Km,DriveTypeText,InteriorColorText,ConsumptionRatingText,FirstRegMonth,City,BodyTypeText
0,1.0,MERCEDES-BENZ,E 320,E 320 Elégance 4-Matic,E 320 Elégance 4-Matic,MERCEDES-BENZ E 320 Elégance 4-Matic,5,224,Benzin,3199,...,1999,"""Ab MFK""",Occasion,31900,Allrad,grau,,1,Zuzwil,Limousine
1,10.0,LAMBORGHINI,,Espada GT 400 Serie 3,Espada GT 400 Serie 3,LAMBORGHINI Espada GT 400 Serie 3,4,350,Benzin,3929,...,1973,"""Ab MFK""",Oldtimer,48000,Hinterradantrieb,schwarz,,4,Zuzwil,Coupé
2,100.0,FERRARI,F360,F360 Modena Berlinetta,F360 Modena Berlinetta,FERRARI F360 Modena Berlinetta,2,400,Benzin,3586,...,2004,"""Ab MFK""",Occasion,42600,Hinterradantrieb,beige,G,2,Zuzwil,Coupé
3,1000.0,ALFA ROMEO,8C,8C,,ALFA ROMEO 8C,0,450,Benzin,4691,...,2011,"""Ab MFK""",Occasion,23900,Hinterradantrieb,schwarz,,4,Porrentruy,Cabriolet
4,1001.0,BMW,2002,2002 Turbo,Turbo,BMW 2002 Turbo,4,170,Benzin,1977,...,1974,"""Ab MFK""",Occasion,97100,Hinterradantrieb,schwarz,,3,Zuzwil,Limousine
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1148,995.0,MERCEDES-BENZ,CLS 350,CLS 350 CGI,CLS 350 CGI,MERCEDES-BENZ CLS 350 CGI,4,292,Benzin,3498,...,2010,"""Ab MFK""",Occasion,25300,Hinterradantrieb,schwarz,G,1,Zuzwil,Limousine
1149,996.0,LAMBORGHINI,DIABLO,Diablo 5.7 VT,Diablo 5.7 VT,LAMBORGHINI Diablo 5.7 VT,2,492,Benzin,5707,...,1993,"""Ab MFK""",Occasion,46700,Allrad,,,6,Zuzwil,Coupé
1150,997.0,AUDI,RS6,RS6 Avant 5.0 V10 quattro,RS6 Avant 5.0 V10 quattro,AUDI RS6 Avant 5.0 V10 quattro,5,580,Benzin,4991,...,2009,"""Ab MFK""",Occasion,118800,Allrad,schwarz,G,2,Zuzwil,Kombi
1151,998.0,FERRARI,612,612,612,FERRARI 612,4,540,Benzin,5748,...,2004,"""Ab MFK""",Occasion,42200,Hinterradantrieb,beige,G,10,Zuzwil,Coupé


In [330]:
newdata1 = spark.createDataFrame(data = data1)

In [331]:
"""
Translate - A function that translates certain
    columns (CarType, color, condition, e.t.c) using
    a translator and word processing
@line: the rows from the spark column to
    be normalized
@b_list: a list of values from the target data use
    for normalization
@Returns: A normalized column
"""

def Translate(line, b_list):
    translator = Translator()
    look_up = {"Vorführmodell":"Used with guarantee","Oldtimer":"Restored","Occasion":"Used","violett":"Purple",\
            "Kleinwagen":"Saloon","km":["kilometers","l_km_consumption"]}
    
    compare = b_list
    backup = {}
    getval = ""
    value = re.search('^\s*[\w]*',str(line))
    if value:
        value = value.group()
        if value not in backup.keys() and value not in look_up.keys():
            result = translator.translate(str(value),"english","german")
            if (result == "nan" or result == "zero" or result == "well"):
                backup[value] = np.nan
                getval = np.nan
            else:
                bestMatch , score = process.extractOne(str(result), compare)
                if score > 70:
                    backup[value] = bestMatch
                    getval = bestMatch
                else:
                    backup[value] = "Other"
                    getval = "Other"
        else:
            if value in backup.keys():
                getval = backup[value]
            elif value in look_up.keys():
                getval = look_up[value]
    return getval


In [332]:
"""
Extract - A function that extracts the certain
    columns (City, make, e.t.c) using word processing
@line: the rows from the spark column to
    be normalized
@b_list: a list of values from the target data use
    for normalization
@Returns: A normalized column
"""

def Extract(line,b_list):

    look_up = {"Vorführmodell":"Used with guarantee","Oldtimer":"Restored","Occasion":"Used","violett":"Purple",\
            "Kleinwagen":"Saloon","km":["kilometers","l_km_consumption"]}
    
    compare = b_list
    back_up = {}
    getval = ""
    value = re.search('^\s*[\w\s.-/\d]*', line)
    if value:
        value = value.group()
        if value not in back_up.keys() and value not in look_up.keys():
            if value == "null" or value == "zero" or value == "well" or value == "nan" or value == "None":
                back_up[value] = np.nan
                getval = np.nan
            else:
                bestMatch, score = process.extractOne(str(value), compare)
                if score > 90:
                    back_up[value] = bestMatch
                    getval = bestMatch
                else:
                    back_up[value] = "null"
                    getval = "null"
        else:
            if value in back_up.keys():
                getval = back_up[value]
            elif value in look_up.keys():
                getval = look_up[value]
    return getval
    
    


In [333]:
"""
ExtractMileage - A function that extracts the
    mileage value in Kilometers per litre.
@line: the rows from the spark column to
    be normalized
@Returns: A normalized mileage column
"""

def ExtractMileage(line):

    value = re.search('(^\s*[\w\.\d]*)(\s*\w*l)/*\d*([\w]*)', str(line))
    getval = ""
    if value:
        num = value.group(1)
        dim = value.group(3)
        if str(num) == "null" or str(num) == "zero" or str(num) == "nul" or str(num) == "nan":
            getval = np.nan
        elif (round(float(num),1) != 0.0):
            getval = str(round((100 / float(num)) ,1)) + dim + "/l"
        else:
            getval = "null"
    return getval


In [334]:
"""
pass_as_closure - A pointer to a function, returning a
    function depending on the column name passed.
@col_name: the name of the column to normalize
@Target_d: the target data use in normalization
@Returns: the normalized result of the column
"""


def pass_as_closure(col_name,Target_d):
    
    dict = {"BodyTypeText":"carType","BodyColorText":"color","ConditionTypeText":"condition",\
        "DriveTypeText":"drive","City":"city","MakeText":"make","FirstRegYear":"manufacture_year",\
        "FirstRegMonth":"manufacture_month","ModelText":"model","ModelTypeText":"model_variant",\
        "ConsumptionTotalText":"mileage"}
    targ = Target_d.toPandas()[dict[col_name]].unique().tolist()
    
    def f(s):
        if dict[col_name] == "carType" or dict[col_name] == "color" or dict[col_name] == "condition":
            return Translate(s, targ)
        elif dict[col_name] == "city" or dict[col_name] == "make":
            return Extract(s, targ)
        elif dict[col_name] == "mileage":
            return ExtractMileage(s)
            
    return F.udf(f)

In [335]:
data2 = newdata1.withColumn("BodyColorText",pass_as_closure("BodyColorText",Target_d)(F.col("BodyColorText"))).\
        withColumn("BodyTypeText",pass_as_closure("BodyTypeText",Target_d)(F.col("BodyTypeText"))).\
        withColumn("ConditionTypeText",pass_as_closure("ConditionTypeText",Target_d)(F.col("ConditionTypeText"))).\
        withColumn("City",pass_as_closure("City",Target_d)(F.col("City"))).\
        withColumn("MakeText",pass_as_closure("MakeText",Target_d)(F.col("MakeText"))).\
        withColumn("ConsumptionTotalText",pass_as_closure("ConsumptionTotalText",Target_d)(F.col("ConsumptionTotalText"))).toPandas()
        
data2

Unnamed: 0,ID,MakeText,ModelText,ModelTypeText,TypeName,TypeNameFull,Seats,Hp,FuelTypeText,Ccm,...,FirstRegYear,Properties,ConditionTypeText,Km,DriveTypeText,InteriorColorText,ConsumptionRatingText,FirstRegMonth,City,BodyTypeText
0,1.0,,E 320,E 320 Elégance 4-Matic,E 320 Elégance 4-Matic,MERCEDES-BENZ E 320 Elégance 4-Matic,5,224,Benzin,3199,...,1999,"""Ab MFK""",Used,31900,Allrad,grau,,1,Zuzwil,Other
1,10.0,Lamborghini,,Espada GT 400 Serie 3,Espada GT 400 Serie 3,LAMBORGHINI Espada GT 400 Serie 3,4,350,Benzin,3929,...,1973,"""Ab MFK""",Restored,48000,Hinterradantrieb,schwarz,,4,Zuzwil,Coupé
2,100.0,Ferrari,F360,F360 Modena Berlinetta,F360 Modena Berlinetta,FERRARI F360 Modena Berlinetta,2,400,Benzin,3586,...,2004,"""Ab MFK""",Used,42600,Hinterradantrieb,beige,G,2,Zuzwil,Coupé
3,1000.0,Alfa Romeo,8C,8C,,ALFA ROMEO 8C,0,450,Benzin,4691,...,2011,"""Ab MFK""",Used,23900,Hinterradantrieb,schwarz,,4,Porrentruy,Convertible / Roadster
4,1001.0,BMW,2002,2002 Turbo,Turbo,BMW 2002 Turbo,4,170,Benzin,1977,...,1974,"""Ab MFK""",Used,97100,Hinterradantrieb,schwarz,,3,Zuzwil,Other
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1148,995.0,,CLS 350,CLS 350 CGI,CLS 350 CGI,MERCEDES-BENZ CLS 350 CGI,4,292,Benzin,3498,...,2010,"""Ab MFK""",Used,25300,Hinterradantrieb,schwarz,G,1,Zuzwil,Other
1149,996.0,Lamborghini,DIABLO,Diablo 5.7 VT,Diablo 5.7 VT,LAMBORGHINI Diablo 5.7 VT,2,492,Benzin,5707,...,1993,"""Ab MFK""",Used,46700,Allrad,,,6,Zuzwil,Coupé
1150,997.0,Audi,RS6,RS6 Avant 5.0 V10 quattro,RS6 Avant 5.0 V10 quattro,AUDI RS6 Avant 5.0 V10 quattro,5,580,Benzin,4991,...,2009,"""Ab MFK""",Used,118800,Allrad,schwarz,G,2,Zuzwil,Station Wagon
1151,998.0,Ferrari,612,612,612,FERRARI 612,4,540,Benzin,5748,...,2004,"""Ab MFK""",Used,42200,Hinterradantrieb,beige,G,10,Zuzwil,Coupé


In [336]:

"""
Integration - Reserves only the columns in
    target dataset as final dataset
@normalize_df: dataset from normalization
@Target_data: the target dataset
@Returns: target data mapped with normalized data
"""


def Integration(normalize_df,Target_data):

    Target_schema = pd.DataFrame(data= [], columns= Target_data.columns)
    
    dict = {"BodyTypeText":"carType","BodyColorText":"color","ConditionTypeText":"condition",\
        "DriveTypeText":"drive","City":"city","MakeText":"make","FirstRegYear":"manufacture_year",\
        "FirstRegMonth":"manufacture_month","ModelText":"model","ModelTypeText":"model_variant",\
        "ConsumptionTotalText":"mileage"}

    mapped_col = ["carType","color","condition","city","make","manufacture_year","manufacture_month",\
            "mileage"]

    for col in dict.keys():
        if dict[col] in mapped_col:
            for row in range(len(normalize_df)):
                if dict[col] != 'mileage':   
                    Target_schema.loc[row, dict[col]] = normalize_df.iloc[row][col]
                else:
                    val = normalize_df.iloc[row][col]
                    if (str(val) == 'nan' or str(val) == 'zero' or str(val) == 'null' or str(val) == 'nul'):
                            Target_schema.loc[row, dict[col]] = np.nan
                    else:
                        value = re.search('(^\s*[.\d]*)(\w*)(/\w)', val)
                        if (value):
                            num = value.group(1)
                            dim = value.group(2)
                            Target_schema.loc[row, dict[col]] = num
                            if (str(dim) == 'km'):
                                Target_schema.loc[row,'mileage_unit'] = 'kilometers'
                                Target_schema.loc[row,'fuel_consumption_unit'] = 'l_km_consumption'      
    return Target_schema
                    


In [337]:
data3 = Integration(new, Target_d.toPandas())
data3

Unnamed: 0,carType,color,condition,currency,drive,city,country,make,manufacture_year,mileage,mileage_unit,model,model_variant,price_on_request,type,zip,manufacture_month,fuel_consumption_unit
0,Other,Other,Used,,,Zuzwil,,MERCEDES,1999,8.7,kilometers,,,,,,1,l_km_consumption
1,Coupé,Other,Restored,,,Zuzwil,,Lamborghini,1973,,,,,,,,4,
2,Coupé,Blue,Used,,,Zuzwil,,Ferrari,2004,5.2,kilometers,,,,,,2,l_km_consumption
3,Convertible / Roadster,Silver,Used,,,Porrentruy,,Alfa Romeo,2011,,,,,,,,4,
4,Other,Silver,Used,,,Zuzwil,,BMW,1974,,,,,,,,3,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1148,Other,Silver,Used,,,Zuzwil,,MERCEDES,2010,10.8,kilometers,,,,,,1,l_km_consumption
1149,Coupé,Silver,Used,,,Zuzwil,,Lamborghini,1993,,,,,,,,6,
1150,Station Wagon,Silver,Used,,,Zuzwil,,Audi,2009,7.1,kilometers,,,,,,2,l_km_consumption
1151,Coupé,Silver,Used,,,Zuzwil,,Ferrari,2004,4.8,kilometers,,,,,,10,l_km_consumption


In [338]:
# please replace the directory path with your path
with pd.ExcelWriter("C:/********/****************/Downloads/************.xlsx") as writer:
    data1.to_excel(writer, index = False, sheet_name= "Preprocessing")
    data2.to_excel(writer, index = False, sheet_name= "Normalization")
    data3.to_excel(writer, index = False, sheet_name = "Integration")