# Onedot | DATA ENGINEER

 The first step consists in the initialization of the Spark session. We will import some libraries from the Maven repository.
 - `com.crealytics:spark-excel_2.11` allows to read `.xlsx` data as Spark DataFrame
 - `io.delta:delta-core_2.11` allows to write tables in the delta format.
   It introduces many advantages, as well as automatic optimization mechanisms and performance improvements in most cases          (https://www.bbva.com/en/delta-lake-power-up-your-data/) 
 
 The first time this step is executed, It may require few moments depending on the internet speed. After that, the libraries with be locally cached.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

libraries = [# antlr4-runtime 4.7.1 or older resolve a bug with delta-core 0.6.x dependencies
             'org.antlr:antlr4-runtime:4.7.2',  
             'io.delta:delta-core_2.11:0.6.1',
             'com.crealytics:spark-excel_2.11:0.13.1']

conf = pyspark.SparkConf().setAll([('spark.sql.repl.eagerEval.enabled', True),
                                   ('spark.jars.packages',  ','.join(libraries)),
                                   ('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension'),
                                   ('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')])

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

In the following snippet, we are reading the `Target Data.xlsx` file and saving its content into the table `default.target`.

It is a simple proof of concept, so we are allowing the schema inference, but It is usually a good practice in a real production environment to specify the schema.

We finally read both the "ingested" json input file and the just created `target` table as DataFrames.

In [2]:
from delta.tables import *

if 'target' not in spark.catalog.listTables():
    target_data_path = './Target Data.xlsx'

    (spark.read
    .format("com.crealytics.spark.excel")
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('dataAddress', "'Sheet1'!A1")
    .load(target_data_path)).write.saveAsTable('target', mode='overwrite', format='delta')

supplier_data = spark.read.json('./supplier_car.json')
target = spark.table('target')

This is a brief preview of the data we just read:

In [3]:
target.limit(3)

carType,color,condition,currency,drive,city,country,make,manufacture_year,mileage,mileage_unit,model,model_variant,price_on_request,type,zip,manufacture_month,fuel_consumption_unit
Convertible / Roa...,Blue,Restored,GBP,LHD,Reggio nell'Emilia,IT,Jaguar,1954,53.0,,XK 120,,False,car,RE,,
Convertible / Roa...,Other,Used,GBP,LHD,Hemel Hempstead,GB,Aston Martin,2003,53.0,mile,DB7 Vantage,Coupe GTA,True,car,,,
Convertible / Roa...,Silver,Used,GBP,LHD,Zuzwil,CH,Bentley,2000,53.6,kilometer,Arnage,Red Label,False,car,,5.0,l_km_consumption


In [4]:
supplier_data.limit(3)

Attribute Names,Attribute Values,ID,MakeText,ModelText,ModelTypeText,TypeName,TypeNameFull,entity_id
Seats,2,976.0,MERCEDES-BENZ,SLR,SLR McLaren,McLaren,MERCEDES-BENZ SLR...,0001fda6-192b-46a...
Hp,235,1059.0,MERCEDES-BENZ,ML 350,ML 350 Inspiration,ML 350 Inspiration,MERCEDES-BENZ ML ...,00107c2d-0071-447...
FuelTypeText,Benzin,524.0,AUDI,S6,S6 Avant quattro 4.2,S6 Avant quattro 4.2,AUDI S6 Avant qua...,00126794-a8ef-48f...


### 1. Pre-processing

The following code groups all the attributes having the same ID and stores them in the `attributes` column.

**NOTICE**: It could have been possibile to directly store each attribute on a personal own separate column with a `pivot` operation, even though It is more resources expensive.

In [5]:
preprocessing = (supplier_data
                .select(
                    F.col('ID'),
                    F.col('MakeText'),
                    F.col('ModelText'),
                    F.col('TypeName'),
                    F.col('Attribute Names'), 
                    F.col('Attribute Values')
                ).groupBy('ID', 'MakeText', 'ModelText', 'TypeName')
                 .agg(F.map_from_arrays(F.collect_list('Attribute Names'), 
                                        F.collect_list('Attribute Values')).alias('attributes'))
               )

preprocessing.limit(3)

ID,MakeText,ModelText,TypeName,attributes
1017.0,VW,JETTA,Jetta 2.0 TDI Com...,"[Properties -> ""A..."
1140.0,McLAREN,12C,MP4-12C Cabriolet...,[FuelTypeText -> ...
195.0,MITSUBISHI,LANCER,Lancer Evo VII,[FirstRegMonth ->...


The following is an example of what the content of an `attributes` row may look like.

In [6]:
preprocessing.select('attributes').take(1)[0][0]

{'BodyTypeText': 'Limousine',
 'TransmissionTypeText': 'Automatisiertes Schaltgetriebe',
 'InteriorColorText': 'grau',
 'Km': '33500',
 'ConsumptionTotalText': '5.8 l/100km',
 'FuelTypeText': 'Diesel',
 'Hp': '140',
 'FirstRegYear': '2009',
 'DriveTypeText': 'Vorderradantrieb',
 'City': 'Zuzwil',
 'Properties': '"Ab MFK"',
 'Ccm': '1968',
 'FirstRegMonth': '3',
 'Seats': '5',
 'ConsumptionRatingText': 'E',
 'ConditionTypeText': 'Occasion',
 'BodyColorText': 'silber mét.',
 'Doors': '4',
 'Co2EmissionText': '154 g/km'}

In order to reach the desired format, the `attributes` items have to be transformed. The following function does so with the `color` key, when It is present.

Not each possible case has been translated.

The translation this method does could also be acquired by an external library (Google Translate API?). This would remove the limit of having a manual translation mapping but would need an UDF function and therefore a little loss in performance.

In [7]:
def translate_color_to_english(attributes_col):
    return (F.when(F.col('attributes').getItem('BodyColorText') == F.lit('schwarz'), F.lit('Black'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('grün'), F.lit('Green'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('blau'), F.lit('Blue'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('braun'), F.lit('Brown'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('grau'), F.lit('Grey'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('weiss'), F.lit('White'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('gelb'), F.lit('Yellow'))
             .when(F.col('attributes').getItem('BodyColorText') == F.lit('beige'), F.lit('Beige'))
             # ... with an external mapping file this could be narrowed into a line of code
             .otherwise(F.col('attributes.BodyColorText')))

## 2. Normalization

`attributes.BodyColorText` needs `attributes.MakeText` to be normalized.

We are here excluding all the fields that are unnecessary for the exercise.

In [8]:
normalized = (preprocessing
                .select(
                    F.when(
                        F.array_contains(F.map_keys('attributes'), 'BodyColorText'),
                        translate_color_to_english(F.col('attributes'))
                    ).alias('BodyColorText'),
                    F.initcap('MakeText').alias('MakeText'),
                    F.col('ModelText'),
                    F.col('TypeName'),
                    F.when(
                        F.array_contains(F.map_keys('attributes'), 'City'),
                        F.col('attributes').getItem('City')
                    ).alias('City'),
                    F.col('attributes.ConsumptionTotalText')
                ))

normalized.limit(3)

BodyColorText,MakeText,ModelText,TypeName,City,ConsumptionTotalText
silber mét.,Vw,JETTA,Jetta 2.0 TDI Com...,Zuzwil,5.8 l/100km
weiss mét.,Mclaren,12C,MP4-12C Cabriolet...,Zuzwil,12.3 l/100km
bordeaux,Mitsubishi,LANCER,Lancer Evo VII,Zuzwil,


## 3. Extraction

The following code decomposes and extract `ConsumptionTotalText` in different columns.

In [9]:
extraction_data = (normalized
                   .withColumn('extracted-value-ConsumptionTotalText', F.split('ConsumptionTotalText', ' ')[0])
                   .withColumn('extracted-unit-ConsumptionTotalText', F.split('ConsumptionTotalText', ' ')[1])
                  ).drop('ConsumptionTotalText')

extraction_data.limit(3)

BodyColorText,MakeText,ModelText,TypeName,City,extracted-value-ConsumptionTotalText,extracted-unit-ConsumptionTotalText
silber mét.,Vw,JETTA,Jetta 2.0 TDI Com...,Zuzwil,5.8,l/100km
weiss mét.,Mclaren,12C,MP4-12C Cabriolet...,Zuzwil,12.3,l/100km
bordeaux,Mitsubishi,LANCER,Lancer Evo VII,Zuzwil,,


## 4. Integration

We can chance the columns names in order to make the input compliant with the output.

In [10]:
integration_data = (extraction_data
                    .select(
                        F.col('BodyColorText').alias('color'),
                        F.col('MakeText').alias('make'),
                        F.col('ModelText').alias('model'),
                        F.col('TypeName').alias('model_variant'),
                        F.col('City').alias('city'))
                    )

integration_data.limit(3)

color,make,model,model_variant,city
silber mét.,Vw,JETTA,Jetta 2.0 TDI Com...,Zuzwil
weiss mét.,Mclaren,12C,MP4-12C Cabriolet...,Zuzwil
bordeaux,Mitsubishi,LANCER,Lancer Evo VII,Zuzwil


## 5. Product matching, enriching existing products, adding new products

The following is a common task, called `upsertion`. 

It can easily be achieved through the `delta` tables support library, which we previously imported.

In order to do It, we have to first fix one or more ID keys that determine a record matching criterion between input and target. In this case the fields `make`, `model`, `model_variant`.

If this match is found, then an arbitrarily complex (even conditional) update operation can be defined on any field of the matching record. In this exemplifying case, when a match is found and its potentially new value is `NOT NULL`, then the field `color` gets updated.

If not, then the input record is inserted into the target table.

Of course this same operation may be performed in different ways, in case in which for example `delta` is not viable or with different business rules.

It once happened to me for example, in a context with a legacy `PostgreSQL` target database, to use the `RDD`s `mapPartition` method in combination with the `pg8000` library in order to implement an upsertion task.

In [11]:
deltaTable = DeltaTable.forPath(spark, "./spark-warehouse/target/")

(deltaTable.alias("target")
 .merge(integration_data.alias("updates"), 
        condition = ((F.expr("target.make = updates.make")) & 
                     (F.expr("target.model = updates.model")) & 
                     (F.expr("target.model_variant = updates.model_variant")))
       ).whenMatchedUpdate(condition=F.expr("updates.color IS NOT NULL"), set = { "color" : "updates.color" } )
        .whenNotMatchedInsertAll()
)

target = spark.table('target') 
target.limit(5)

carType,color,condition,currency,drive,city,country,make,manufacture_year,mileage,mileage_unit,model,model_variant,price_on_request,type,zip,manufacture_month,fuel_consumption_unit
Convertible / Roa...,Blue,Restored,GBP,LHD,Reggio nell'Emilia,IT,Jaguar,1954,53.0,,XK 120,,False,car,RE,,
Convertible / Roa...,Other,Used,GBP,LHD,Hemel Hempstead,GB,Aston Martin,2003,53.0,mile,DB7 Vantage,Coupe GTA,True,car,,,
Convertible / Roa...,Silver,Used,GBP,LHD,Zuzwil,CH,Bentley,2000,53.6,kilometer,Arnage,Red Label,False,car,,5.0,l_km_consumption
Convertible / Roa...,Red,Used,GBP,LHD,Böblingen (bei St...,DE,Maserati,2016,55.0,kilometer,Ghibli,Diesel MASERATI S...,False,car,,4.0,l_km_consumption
Convertible / Roa...,Blue,Used,GBP,LHD,Aalst,BE,Chrysler,1953,56.0,,Special Coupé by ...,,True,car,,,


## CSV OUTPUTS

In order to get precisely 1 file, a repartition/coalesce is needed. 

The latter tries to avoid shuffles (https://stackoverflow.com/a/40983145/3004162) so It is generally better.

In [13]:
(preprocessing
 .withColumn('attributes', F.col('attributes').cast('string'))
 .coalesce(1)
 .write.csv(path='./csv/preprocessing', header=True, encoding='utf-8')
)

In [14]:
normalized.coalesce(1).write.csv(path='./csv/normalized', header=True, encoding='utf-8')

In [15]:
extraction_data.coalesce(1).write.csv(path='./csv/extraction_data', header=True, encoding='utf-8')

In [16]:
integration_data.coalesce(1).write.csv(path='./csv/integration_data', header=True, encoding='utf-8')