# Process Raw Comparis Result Data

The platform Comparis.ch serves as a central hub for individuals seeking to compare insurance product prices. Every day, a large number of users engage with Comparis, generating a substantial amount of valuable information. Our main goal is to extract pertinent product and user data to conduct a comprehensive market analysis, with a specific focus on private liability, household, and combined private liability and household insurance offerings. By harnessing this extensive dataset, we aim to gain profound insights into insurance trends, consumer preferences, and market dynamics. Ultimately, these insights will guide strategic decision-making within the insurance industry.

Each Comparis request is transmitted to the insurance providers, who then provide information on whether they offer insurance coverage for the specified risk and the corresponding premium. Every request is assigned a unique reference, which is used to construct the exact result page presented to the user for comparing insurance products. This result page can be scraped, resulting in 200-300 kb of JSON data for each request. However, this JSON data contains a large amount of unnecessary information and is highly nested, requiring the extraction of the relevant information before any analysis can be performed.

In particular, we process and analyze Comparis results that were scraped during the second half of June 2023.

This notebook focuses on th data extraction part and contains the following sections.

* [1. Data Explanation](#data_explanation)
* [2. Data Download](#data_download)
* [3. Data Processing](#data_processing)
* [4. Data Dump](#data_dump)
* [5. Measurements](#measurements)
* [6. Clean-up](#clean_up)

## 1. Data Explanation
<a id='data_explanation'></a>

Before processing the raw Comparis result data, we have a look at an example JSON file to gain a better understanding of its nested structure and the information it contains. Key fields, their corresponding values, and the hierarchy of the data structure are identified.

In the following example, we examine the results for the GUID (Globally Unique Identifier) `bc3c32c0-ff5e-47c1-95e9-acc903fddf35`, so the result page can be viewed at

[comparis.ch/hausrat-versicherung/result/result?inputGuid=bc3c32c0-ff5e-47c1-95e9-acc903fddf35](https://www.comparis.ch/hausrat-versicherung/result/result?inputGuid=bc3c32c0-ff5e-47c1-95e9-acc903fddf35)

In [1]:
import os
from pathlib import Path
import json
import pandas as pd

In [2]:
# read in the example file representing a single raw Comparis home result 
example_file = Path("example_data") / "bc3c32c0-ff5e-47c1-95e9-acc903fddf35.json"
with open(example_file, "r") as file:
    content = file.read()

In [3]:
# print out one complete JSON result
result = json.loads(content)
result

{'props': {'pageProps': {'resultListInitialValues': {'Guid': 'bc3c32c0-ff5e-47c1-95e9-acc903fddf35',
    'InsuranceType': 3,
    'IsRequestValid': True,
    'ResultItemsMatching': [{'ProductId': 37,
      'ProductName': 'M',
      'ProductNameForTracking': 'm',
      'ProviderId': 6,
      'ProviderName': 'Baloise',
      'ProviderNameForTracking': 'baloise',
      'Canton': 'BL',
      'TotalPremium': 358.7,
      'Rating': 5.3,
      'PremiumFirstYear': None,
      'PremiumFirstYearDiscount': None,
      'ResultDetail': {'Taxes': 25.7,
       'HouseholdCoverageAllText': 'Feuer, Elementar, Wasser und Diebstahl zu Hause: <strong>versichert</strong>',
       'CoverageDetailsList': [{'Premium': 'versichert',
         'InsuranceSum': '-',
         'InsuranceSumRequested': None,
         'HasInsuranceSumDifference': False,
         'Deductible': '-',
         'DeductibleRequested': None,
         'HasDeductibleDifference': False,
         'CoverageInfobase': 'VHH_Result_CoverageDetails_Cov

The `guid` can be used to access the Comparis result page, from which the results were scraped.

The `insurance_type` gives inside into which insurances were requested:
* 1 household (Hausrat)
* 2 private liability (Privathaftpflicht)
* 3 household and private liability (Hausrat & Privathaftpflicht)

Here, the insurance type is equal to 3, meaning household and private liability insurance were requested.

In [4]:
guid = result["props"]["pageProps"]["resultListInitialValues"]["Guid"]
insurance_type = result["props"]["pageProps"]["resultListInitialValues"]["InsuranceType"]
print(guid)
print(insurance_type)

bc3c32c0-ff5e-47c1-95e9-acc903fddf35
3


As can be seen, the raw Comparis data exhibits a highly nested structure consisting of arrays and negligible information that holds no value for analysis purposes.

In fact, only 3 parts contain relevant information:
* __request input data__ (`props.pageProps.resultListInitialValues.ResultFilterData.InputDat`)
* __matching results__, meaning "Angebote mit gewünschter Deckung." (`props.pageProps.resultListInitialValues.ResultItemsMatching`)
* __non-matching results__, meaning "Angebote mit abweichender Deckung" (`props.pageProps.resultListInitialValues.ResultItemsNonMatching`)

In [5]:
# Comparis request input data
result["props"]["pageProps"]["resultListInitialValues"]["ResultFilterData"]["InputData"]

{'InsuranceType': 3,
 'InsuranceStartDate': '2023-08-01T00:00:00',
 'InsuranceTaker': {'Nationality': 'CH',
  'DateOfBirth': '1959-01-03T00:00:00',
  'LivesWithParents': False,
  'NumberOfAdults': 2,
  'NumberOfChildren': 0,
  'YoungestChildCurrentAge': None,
  'Email': None,
  'WorkPermit': 5,
  'NationalityString': 'Schweiz'},
 'Building': {'HouseType': 2,
  'Floor': None,
  'OwnershipStatus': 1,
  'Occupancy': 1,
  'BuildingZipCode': '4123 Allschwil (BL) Allschwil',
  'BuildingTownId': '17630',
  'NumberOfRooms': 3.5,
  'SquareMeters': 85,
  'HouseConstruction': 1,
  'FireHydrant': True},
 'Household': {'HouseCondition': 2,
  'InsuranceSum': 180000,
  'FurnitureGlassCoverageAmount': None,
  'BathroomGlassAmount': None,
  'BuildingGlassAmount': None,
  'GlassIsIsolated': False,
  'PreviousInsurer': 99999,
  'GlassCoverage': False,
  'ContentDeductible': -1,
  'SimpleTheftDeductible': 2000,
  'SportsEquipmentValueCompensation': True},
 'PrivateLiability': {'PreviousInsurer': 99999,
  

In [6]:
# matching results (Angebote mit gewünschter Deckung. Nach dem günstigsten Preis sortiert.)
matching_results = result["props"]["pageProps"]["resultListInitialValues"]["ResultItemsMatching"]
print(f"{len(matching_results)} matching results")
matching_results

20 matching results


[{'ProductId': 37,
  'ProductName': 'M',
  'ProductNameForTracking': 'm',
  'ProviderId': 6,
  'ProviderName': 'Baloise',
  'ProviderNameForTracking': 'baloise',
  'Canton': 'BL',
  'TotalPremium': 358.7,
  'Rating': 5.3,
  'PremiumFirstYear': None,
  'PremiumFirstYearDiscount': None,
  'ResultDetail': {'Taxes': 25.7,
   'HouseholdCoverageAllText': 'Feuer, Elementar, Wasser und Diebstahl zu Hause: <strong>versichert</strong>',
   'CoverageDetailsList': [{'Premium': 'versichert',
     'InsuranceSum': '-',
     'InsuranceSumRequested': None,
     'HasInsuranceSumDifference': False,
     'Deductible': '-',
     'DeductibleRequested': None,
     'HasDeductibleDifference': False,
     'CoverageInfobase': 'VHH_Result_CoverageDetails_Covered',
     'CoverageTitleInfobase': None,
     'DeductibleInfobase': None,
     'HasDifference': False,
     'ResultDetailIconType': 1,
     'CoverageType': 12},
    {'Premium': 'versichert',
     'InsuranceSum': "2'000",
     'InsuranceSumRequested': None,
 

In [7]:
# non-matching results (Angebote mit abweichender Deckung)
non_matching_results = result["props"]["pageProps"]["resultListInitialValues"]["ResultItemsNonMatching"]
print(f"{len(non_matching_results)} non-matching results")
non_matching_results

2 non-matching results


[{'ProductId': 36,
  'ProductName': 'S',
  'ProductNameForTracking': 's',
  'ProviderId': 6,
  'ProviderName': 'Baloise',
  'ProviderNameForTracking': 'baloise',
  'Canton': 'BL',
  'TotalPremium': 318.1,
  'Rating': 5.3,
  'PremiumFirstYear': None,
  'PremiumFirstYearDiscount': None,
  'ResultDetail': {'Taxes': 23.7,
   'HouseholdCoverageAllText': 'Feuer, Elementar, Wasser und Diebstahl zu Hause: <strong>versichert</strong>',
   'CoverageDetailsList': [{'Premium': 'versichert',
     'InsuranceSum': '-',
     'InsuranceSumRequested': None,
     'HasInsuranceSumDifference': False,
     'Deductible': '-',
     'DeductibleRequested': None,
     'HasDeductibleDifference': False,
     'CoverageInfobase': 'VHH_Result_CoverageDetails_Covered',
     'CoverageTitleInfobase': None,
     'DeductibleInfobase': None,
     'HasDifference': False,
     'ResultDetailIconType': 1,
     'CoverageType': 12},
    {'Premium': 'versichert',
     'InsuranceSum': "1'000",
     'InsuranceSumRequested': "2'000"

As can be seen, both the matching and non-matching results are presented as lists of insurance products. In each case, every entry within the list corresponds to a single insurance product. The structure of the matching and the non-matching result list are equal to each other.

In [8]:
# single product matching result
result_box = matching_results[0]
result_box

{'ProductId': 37,
 'ProductName': 'M',
 'ProductNameForTracking': 'm',
 'ProviderId': 6,
 'ProviderName': 'Baloise',
 'ProviderNameForTracking': 'baloise',
 'Canton': 'BL',
 'TotalPremium': 358.7,
 'Rating': 5.3,
 'PremiumFirstYear': None,
 'PremiumFirstYearDiscount': None,
 'ResultDetail': {'Taxes': 25.7,
  'HouseholdCoverageAllText': 'Feuer, Elementar, Wasser und Diebstahl zu Hause: <strong>versichert</strong>',
  'CoverageDetailsList': [{'Premium': 'versichert',
    'InsuranceSum': '-',
    'InsuranceSumRequested': None,
    'HasInsuranceSumDifference': False,
    'Deductible': '-',
    'DeductibleRequested': None,
    'HasDeductibleDifference': False,
    'CoverageInfobase': 'VHH_Result_CoverageDetails_Covered',
    'CoverageTitleInfobase': None,
    'DeductibleInfobase': None,
    'HasDifference': False,
    'ResultDetailIconType': 1,
    'CoverageType': 12},
   {'Premium': 'versichert',
    'InsuranceSum': "2'000",
    'InsuranceSumRequested': None,
    'HasInsuranceSumDifference

A significant portion of each product result consists of information regarding the coverage details, and these details are further organized as a list within the product result.

Within each product result, the coverage details list provides specific information about the insurance coverage offered by that particular product. 

In [9]:
# coverage details
result_box_coverage_details = result_box["ResultDetail"]["CoverageDetailsList"]
result_box_coverage_details

[{'Premium': 'versichert',
  'InsuranceSum': '-',
  'InsuranceSumRequested': None,
  'HasInsuranceSumDifference': False,
  'Deductible': '-',
  'DeductibleRequested': None,
  'HasDeductibleDifference': False,
  'CoverageInfobase': 'VHH_Result_CoverageDetails_Covered',
  'CoverageTitleInfobase': None,
  'DeductibleInfobase': None,
  'HasDifference': False,
  'ResultDetailIconType': 1,
  'CoverageType': 12},
 {'Premium': 'versichert',
  'InsuranceSum': "2'000",
  'InsuranceSumRequested': None,
  'HasInsuranceSumDifference': False,
  'Deductible': '500',
  'DeductibleRequested': None,
  'HasDeductibleDifference': False,
  'CoverageInfobase': 'VHH_Result_CoverageDetails_Covered',
  'CoverageTitleInfobase': 'HH_Claim_SimpleTheft',
  'DeductibleInfobase': None,
  'HasDifference': False,
  'ResultDetailIconType': 1,
  'CoverageType': 4},
 {'Premium': 'versichert',
  'InsuranceSum': '-',
  'InsuranceSumRequested': None,
  'HasInsuranceSumDifference': False,
  'Deductible': '-',
  'DeductibleRe

The relevant coverage information can be accessed as follows:

In [10]:
for subbox in result_box_coverage_details:
    # Hausrat
    if subbox["CoverageType"] == 13 and subbox[
            "CoverageTitleInfobase"
        ] == "HH_Calc_InsuranceSum":
        household_insurance_sum = subbox["InsuranceSum"]
        household_premium = subbox["Premium"]
        household_deductible = subbox["Deductible"]
    # Einfacher Diebstahl auswärts
    if subbox["CoverageType"] == 4 and subbox["CoverageTitleInfobase"] == "HH_Claim_SimpleTheft":
        hh_claims_simple_theft_insurance_sum = subbox["InsuranceSum"]
        hh_claims_simple_thef_premium = subbox["Premium"]
        hh_claims_simple_thef_deductible = subbox["Deductible"]
    # Sportgeräte zum Neuwert
    if subbox["CoverageType"] == 6 and subbox["CoverageTitleInfobase"] == "hh_balloon_sportsequipment":
        hh_balloon_sportsequipment_insurance_sum = subbox["InsuranceSum"]
        hh_balloon_sportsequipment_premium = subbox["Premium"]
        hh_balloon_sportsequipment_deductible = subbox["Deductible"]
    # Mobiliarglas
    if subbox["CoverageType"] == 9 and subbox["CoverageTitleInfobase"] == "HH_Claim_Glass":
        hh_claim_glass_insurance_sum = subbox["InsuranceSum"]
        hh_claim_glass_premium = subbox["Premium"]
        hh_claim_glass_deductible = subbox["Deductible"]
    # Privathaftpflicht
    if subbox["CoverageType"] == 14 and subbox["CoverageTitleInfobase"] == "hh_balloon_Garantiesumme_title":
        private_liability_insurance_sum = subbox["InsuranceSum"]
        private_liability_premium = subbox["Premium"]
        private_liability_deductible = subbox["Deductible"]
    # Führen fremder Fahrzeuge
    if subbox["CoverageType"] == 7 and subbox["CoverageTitleInfobase"] == "hh_txt_results_hp_fremdlenker":
        hp_fremdlenker_insurance_sum = subbox["InsuranceSum"]
        hp_fremdlenker_premium = subbox["Premium"]
        hp_fremdlenker_deductible = subbox["Deductible"]

## 2. Data Download
<a id='data_download'></a>

The raw Comparis result data is (temporarily until the beginning of August 2023) stored in a public AWS s3 bucket.
In the following chunks, the data is downloaded from this bucket and saved locally.

The data can be found under https://tmp-raw-comparis-results.s3.eu-central-1.amazonaws.com/raw_comparis_home_results.json

In [11]:
from urllib.request import urlretrieve

In [12]:
# some defintions for raw data access
url = "https://tmp-raw-comparis-results.s3.eu-central-1.amazonaws.com/raw_comparis_home_results.json"
file = Path("raw_comparis_home_results.json")

In [13]:
# download data if not already available
if not file.exists():
    print(f"Downloading {repr(str(file))}")
    urlretrieve(url, file)
else:
    print(f"{repr(str(file))} already downloaded")

'raw_comparis_home_results.json' already downloaded


## 3. Data Processing
<a id='data_processing'></a>

In this section, the raw nested Comparis JSON data is processed using PySpark in order to extract the useful information.

In [14]:
# specify the number of cores and the user
cores = 4
user = "Gruppe_3"
print(f"Using {repr(user)} to connect to sparky and request {cores=}")

Using 'Gruppe_3' to connect to sparky and request cores=4


In [15]:
import sparky
import pyspark
import pyspark.sql
from pyspark.sql.functions import col, concat, explode, lit, posexplode
from pyspark.sql import functions as F

sc = sparky.connect(f"sparknotebook-{user}", cores)
spark = pyspark.sql.SparkSession.builder.getOrCreate()

~~~ Sparky module loaded ~~~


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/09 08:44:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 4 cores; real number might be less.


In [16]:
# read in data
df = spark.read.option("multiline", "true").json(str(file))

23/07/09 08:45:15 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [17]:
# check how many entries are in the DataFrame
df.count()

                                                                                

2325

In [18]:
# check the schema
df.printSchema()

root
 |-- appGip: boolean (nullable = true)
 |-- assetPrefix: string (nullable = true)
 |-- buildId: string (nullable = true)
 |-- customServer: boolean (nullable = true)
 |-- gip: boolean (nullable = true)
 |-- isFallback: boolean (nullable = true)
 |-- page: string (nullable = true)
 |-- props: struct (nullable = true)
 |    |-- breadcrumbs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Children: string (nullable = true)
 |    |    |    |-- Href: string (nullable = true)
 |    |    |    |-- IsSelected: boolean (nullable = true)
 |    |    |    |-- NavigationLink: string (nullable = true)
 |    |    |    |-- NavigationLinkText: string (nullable = true)
 |    |    |    |-- NavigationSubtitle: string (nullable = true)
 |    |    |    |-- NavigationSubtitleOrderNumber: string (nullable = true)
 |    |    |    |-- TargetTrackingCrossLabel: string (nullable = true)
 |    |    |    |-- Title: string (nullable = true)
 |    |-- comparisPageC

In [19]:
# check the schema part of the request input data
df.select(
    col("props.pageProps.resultListInitialValues.ResultFilterData.InputData")
).printSchema()

root
 |-- InputData: struct (nullable = true)
 |    |-- Building: struct (nullable = true)
 |    |    |-- BuildingTownId: string (nullable = true)
 |    |    |-- BuildingZipCode: string (nullable = true)
 |    |    |-- FireHydrant: boolean (nullable = true)
 |    |    |-- Floor: long (nullable = true)
 |    |    |-- HouseConstruction: long (nullable = true)
 |    |    |-- HouseType: long (nullable = true)
 |    |    |-- NumberOfRooms: double (nullable = true)
 |    |    |-- Occupancy: long (nullable = true)
 |    |    |-- OwnershipStatus: long (nullable = true)
 |    |    |-- SquareMeters: long (nullable = true)
 |    |-- Discounts: struct (nullable = true)
 |    |    |-- HasHealthInsuranceByCss: boolean (nullable = true)
 |    |    |-- HasOtherInsuranceAtElvia: boolean (nullable = true)
 |    |    |-- HasZurichLifeInsurance: boolean (nullable = true)
 |    |    |-- HasZurichMotorbikeInsurance: boolean (nullable = true)
 |    |-- DoubleOptIn: string (nullable = true)
 |    |-- Househol

In [20]:
# check the schema part of the matching results
df.select(
    col("props.pageProps.resultListInitialValues.ResultItemsMatching")
).printSchema()

root
 |-- ResultItemsMatching: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Canton: string (nullable = true)
 |    |    |-- IsDirectBuy: boolean (nullable = true)
 |    |    |-- LogoName: string (nullable = true)
 |    |    |-- PremiumFirstYear: double (nullable = true)
 |    |    |-- PremiumFirstYearDiscount: long (nullable = true)
 |    |    |-- ProductId: long (nullable = true)
 |    |    |-- ProductName: string (nullable = true)
 |    |    |-- ProductNameForTracking: string (nullable = true)
 |    |    |-- ProductPosition: long (nullable = true)
 |    |    |-- ProviderId: long (nullable = true)
 |    |    |-- ProviderName: string (nullable = true)
 |    |    |-- ProviderNameForTracking: string (nullable = true)
 |    |    |-- Rating: double (nullable = true)
 |    |    |-- ResultDetail: struct (nullable = true)
 |    |    |    |-- CoverageDetailsList: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 | 

In [21]:
# check the schema part of the non-matching results
df.select(
    col("props.pageProps.resultListInitialValues.ResultItemsNonMatching")
).printSchema()

root
 |-- ResultItemsNonMatching: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Canton: string (nullable = true)
 |    |    |-- IsDirectBuy: boolean (nullable = true)
 |    |    |-- LogoName: string (nullable = true)
 |    |    |-- PremiumFirstYear: double (nullable = true)
 |    |    |-- PremiumFirstYearDiscount: long (nullable = true)
 |    |    |-- ProductId: long (nullable = true)
 |    |    |-- ProductName: string (nullable = true)
 |    |    |-- ProductNameForTracking: string (nullable = true)
 |    |    |-- ProductPosition: long (nullable = true)
 |    |    |-- ProviderId: long (nullable = true)
 |    |    |-- ProviderName: string (nullable = true)
 |    |    |-- ProviderNameForTracking: string (nullable = true)
 |    |    |-- Rating: double (nullable = true)
 |    |    |-- ResultDetail: struct (nullable = true)
 |    |    |    |-- CoverageDetailsList: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)


### 3.1 Select useful information
We select all useful informations once for the matching results and once for the non matching results. Both are combined in the end.

In [22]:
def select_useful_columns(
    data: pyspark.sql.dataframe.DataFrame,
    input_data_struct_col: str,
    result_struct_col: str,
) -> pyspark.sql.dataframe.DataFrame:
    """Select useful columns from raw comparis result data."""
    selected_data = data.select(
        col("props.pageProps.inputGuid").alias("Guid"),
        col(f"{result_struct_col}.Canton"),

        ## input data:
        col("props.pageProps.resultListInitialValues.InsuranceType"),
        # insurance taker
        col(f"{input_data_struct_col}.InsuranceTaker.Nationality"),
        col(f"{input_data_struct_col}.InsuranceTaker.DateOfBirth"),
        col(f"{input_data_struct_col}.InsuranceTaker.NumberOfAdults"),
        col(f"{input_data_struct_col}.InsuranceTaker.NumberOfChildren"),
        # building
        col(f"{input_data_struct_col}.Building.BuildingZipCode"),
        col(f"{input_data_struct_col}.Building.SquareMeters"),
        col(f"{input_data_struct_col}.Building.HouseType"),
        col(f"{input_data_struct_col}.Building.Floor"),
        col(f"{input_data_struct_col}.Building.NumberOfRooms"),
        # household
        col(f"{input_data_struct_col}.Household.InsuranceSum").alias("RequestedHouseholdInsuranceSum"),
        col(f"{input_data_struct_col}.Household.GlassCoverage").alias("RequestedHouseholdGlassCoverage"),
        col(f"{input_data_struct_col}.Household.SimpleTheftDeductible").alias("RequestedHouseholdSimpleTheftDeductible"),
        col(f"{input_data_struct_col}.Household.SportsEquipmentValueCompensation").alias("RequestedHouseholdSportsEquipmentValueCompensation"),
        # private liability
        col(f"{input_data_struct_col}.PrivateLiability.InsuranceSum").alias("RequestedPrivateLiabilityInsuranceSum"),
        col(f"{input_data_struct_col}.PrivateLiability.Deductible").alias("RequestedPrivateLiabilityDeductible"),
        col(f"{input_data_struct_col}.PrivateLiability.WantsPrivateLiabilityThirdPartyVehicles").alias("RequestedPrivateLiabilityThirdPartyVehicles"),

        ## results list:
        # product details
        col(f"{result_struct_col}.ProductPosition"),
        col(f"{result_struct_col}.TotalPremium"),
        col(f"{result_struct_col}.Rating"),
        col(f"{result_struct_col}.ProviderName"),
        col(f"{result_struct_col}.ProductName"),
        col(f"{result_struct_col}.ResultDetail.Taxes"),
        # coverage details
        col(f"{result_struct_col}.ResultDetail.CoverageDetailsList"),
    )
    return selected_data

In [23]:
# select useful data for matching and non-matching results
input_data_str = "props.pageProps.resultListInitialValues.ResultFilterData.InputData"
matching_result_str = "props.pageProps.resultListInitialValues.ResultItemsMatching"
non_matching_result_str = "props.pageProps.resultListInitialValues.ResultItemsNonMatching"

df_matching = select_useful_columns(df, input_data_str, matching_result_str)
df_non_matching = select_useful_columns(df, input_data_str, non_matching_result_str)

In [24]:
# original data is not needed anymore, free memory
del df

In [25]:
# drop (potential) duplicated entries
df_matching = df_matching.dropDuplicates(["Guid"])
df_non_matching = df_non_matching.dropDuplicates(["Guid"])

In [26]:
# check the schema of the matching results
df_matching.printSchema()

root
 |-- Guid: string (nullable = true)
 |-- Canton: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- InsuranceType: long (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- NumberOfAdults: long (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- BuildingZipCode: string (nullable = true)
 |-- SquareMeters: long (nullable = true)
 |-- HouseType: long (nullable = true)
 |-- Floor: long (nullable = true)
 |-- NumberOfRooms: double (nullable = true)
 |-- RequestedHouseholdInsuranceSum: long (nullable = true)
 |-- RequestedHouseholdGlassCoverage: boolean (nullable = true)
 |-- RequestedHouseholdSimpleTheftDeductible: long (nullable = true)
 |-- RequestedHouseholdSportsEquipmentValueCompensation: boolean (nullable = true)
 |-- RequestedPrivateLiabilityInsuranceSum: long (nullable = true)
 |-- RequestedPrivateLiabilityDeductible: long (nullable = true)
 |-- RequestedPrivateLiabilityThirdPart

In [27]:
# the schema of the non-matching results is equal to the matching results
df_non_matching.printSchema()

root
 |-- Guid: string (nullable = true)
 |-- Canton: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- InsuranceType: long (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- NumberOfAdults: long (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- BuildingZipCode: string (nullable = true)
 |-- SquareMeters: long (nullable = true)
 |-- HouseType: long (nullable = true)
 |-- Floor: long (nullable = true)
 |-- NumberOfRooms: double (nullable = true)
 |-- RequestedHouseholdInsuranceSum: long (nullable = true)
 |-- RequestedHouseholdGlassCoverage: boolean (nullable = true)
 |-- RequestedHouseholdSimpleTheftDeductible: long (nullable = true)
 |-- RequestedHouseholdSportsEquipmentValueCompensation: boolean (nullable = true)
 |-- RequestedPrivateLiabilityInsuranceSum: long (nullable = true)
 |-- RequestedPrivateLiabilityDeductible: long (nullable = true)
 |-- RequestedPrivateLiabilityThirdPart

### 3.2 Explode array columns

The dataframes contain mutliple array columns, which need to be exploded. Each array column has the same length as all the other array columns in the row, but the array size differs from row to row.

In [28]:
# exemplary check first three entries for 'TotalPremium'
df_matching.select(col("TotalPremium")).take(3)

                                                                                

[Row(TotalPremium=[274.2, 298.7, 298.9, 299.7, 333.3, 345.55, 360.5, 361.5, 384.6, 392.1, 432.9, 448.0, 466.4, 477.9, 484.05, 535.5, 902.1]),
 Row(TotalPremium=[112.8, 120.0, 135.7, 139.8, 148.6, 155.4, 155.5, 163.4, 167.11, 171.3, 178.3, 180.0, 187.0, 188.68, 190.8, 194.6, 195.0, 206.6, 213.47]),
 Row(TotalPremium=[67.5, 84.8, 88.1, 92.03, 93.3, 94.8, 99.9, 105.0, 105.0, 107.9, 110.0, 111.8, 114.75, 114.75, 130.5, 138.6, 146.6, 160.65, 193.6])]

In [29]:
# exemplary check first three entries for 'ProviderName'
df_matching.select(col("ProviderName")).take(3)

                                                                                

[Row(ProviderName=['AXA', 'ELVIA by Allianz', 'Smile', 'Simpego', 'Smile', 'Simpego', 'AXA', 'Helvetia', 'Generali', 'Baloise', 'Allianz Suisse', 'Zurich', 'Simpego', 'Zurich', 'CSS Versicherungen', 'Die Mobiliar', 'Baloise']),
 Row(ProviderName=['ELVIA by Allianz', 'Baloise', 'AXA', 'Helvetia', 'Helvetia', 'Baloise', 'Smile', 'Allianz Suisse', 'Die Mobiliar', 'Smile', 'Zurich', 'Baloise', 'AXA', 'CSS Versicherungen', 'Zurich', 'Generali', 'Allianz Suisse', 'Generali', 'Die Mobiliar']),
 Row(ProviderName=['Helvetia', 'AXA', 'Allianz Suisse', 'Die Mobiliar', 'ELVIA by Allianz', 'Smile', 'Smile', 'Generali', 'Generali', 'Allianz Suisse', 'AXA', 'Baloise', 'Simpego', 'Simpego', 'Simpego', 'Zurich', 'Zurich', 'CSS Versicherungen', 'Baloise'])]

In [30]:
def explode_array_elements(
    data: pyspark.sql.dataframe.DataFrame,
    array_cols: list,
) -> pyspark.sql.dataframe.DataFrame:
    """Explode array columns"""
    # every other column is a non-array column
    non_array_cols = set(data.columns) - set(array_cols)
    # pair the array elements using arrays_zip
    data = data.withColumn(
        "pairs",
        F.explode(F.arrays_zip(*array_cols))
    ).selectExpr(
        *non_array_cols,
        "pairs.*",
    )
    return data

In [31]:
# list of all array columns
array_cols = [
    "Canton",
    "ProductPosition",
    "TotalPremium",
    "Rating",
    "ProviderName",
    "ProductName",
    "Taxes",
    "CoverageDetailsList",
]

# eplode the array columns in matching and non-matching results
df_matching = explode_array_elements(df_matching, array_cols)
df_non_matching = explode_array_elements(df_non_matching, array_cols)

In [32]:
# check again 'TotalPremium' and 'ProviderName'
df_non_matching.select("TotalPremium", "ProviderName").take(5)

                                                                                

[Row(TotalPremium=125.0, ProviderName='Baloise'),
 Row(TotalPremium=180.9, ProviderName='Generali'),
 Row(TotalPremium=201.44, ProviderName='Die Mobiliar'),
 Row(TotalPremium=214.1, ProviderName='Zurich'),
 Row(TotalPremium=225.9, ProviderName='Allianz Suisse')]

### 3.3 Get product name

In order to identify the insurance product, the columns `ProviderName` and `ProductName` need to be combined.

In [33]:
# have a look at 'ProviderName' and 'ProductName' columns
df_matching.select("ProviderName", "ProductName").take(5)

                                                                                

[Row(ProviderName='Simpego', ProductName='Home L'),
 Row(ProviderName='Baloise', ProductName='M'),
 Row(ProviderName='Generali', ProductName='Optima'),
 Row(ProviderName='Simpego', ProductName='Home XL'),
 Row(ProviderName='Smile', ProductName='clever')]

In [34]:
def get_product_name(
    data: pyspark.sql.dataframe.DataFrame,
) -> pyspark.sql.dataframe.DataFrame:
    """Get full product name by combining the provider and product name."""
    data = data.withColumn(
        "Product",
        concat(data["ProviderName"], lit(" "), data["ProductName"])
    ).drop("ProviderName", "ProductName")
    return data

In [35]:
# get full product name for matching and non-matching results
df_matching = get_product_name(df_matching)
df_non_matching = get_product_name(df_non_matching)

In [36]:
# check new 'Product' column
df_matching.select("Product").take(5)

                                                                                

[Row(Product='Simpego Home L'),
 Row(Product='Baloise M'),
 Row(Product='Generali Optima'),
 Row(Product='Simpego Home XL'),
 Row(Product='Smile clever')]

### 3.5 Unpack coverage details

The coverage information within the product results are not easily parsed due to its nested structure. Each product has a column called `CoverageDetailsList`, which further consists of a row of rows.

To navigate this nested structure and extract the specific coverage details, such as private liability, the `CoverageDetailsList`.`CoverageTyp` field can be utilized.

In [37]:
# check one example of the 'CoverageDetailsList' column
df_matching.select("CoverageDetailsList").take(1)

                                                                                

[Row(CoverageDetailsList=[Row(CoverageInfobase='VHH_Result_CoverageDetails_NotRequested', CoverageTitleInfobase=None, CoverageType=12, Deductible='-', DeductibleInfobase=None, DeductibleRequested=None, HasDeductibleDifference=False, HasDifference=False, HasInsuranceSumDifference=False, InsuranceSum='-', InsuranceSumRequested=None, Premium='284.45', ResultDetailIconType=3), Row(CoverageInfobase='VHH_Result_CoverageDetails_Covered', CoverageTitleInfobase='HH_Claim_SimpleTheft', CoverageType=4, Deductible='200', DeductibleInfobase=None, DeductibleRequested=None, HasDeductibleDifference=False, HasDifference=False, HasInsuranceSumDifference=False, InsuranceSum="5'000", InsuranceSumRequested=None, Premium='61.95', ResultDetailIconType=1), Row(CoverageInfobase='VHH_Result_CoverageDetails_Covered', CoverageTitleInfobase='hh_balloon_sportsequipment', CoverageType=6, Deductible='-', DeductibleInfobase=None, DeductibleRequested=None, HasDeductibleDifference=False, HasDifference=False, HasInsuranc

In [38]:
def unpack_coverage_details(
    data: pyspark.sql.dataframe.DataFrame,
) -> pyspark.sql.dataframe.DataFrame:
    """Unpack 'CoverageDetailsList' column."""
    coverage_type_name_mapping = {
        4: "ClaimsSimpleTheft",       # Einfacher Diebstahl auswärts
        6: "BalloonSportsequipment",  # Sportgeräte zum Neuwert
        7: "HPFremdlenker",           # Führen fremder Fahrzeuge
        9: "ClaimGlass",              # Mobiliarglas
        13: "Household",              # Hausrat
        14: "PrivateLiability",       # Privathaftpflicht
    }
    exprs = [
        F.expr(
            f"filter(CoverageDetailsList, x -> x.CoverageType = {coverage_type})[0].InsuranceSum"
        ).alias(
            f"{coverage_type_name_mapping[coverage_type]}_InsuranceSum"
        )
        for coverage_type in coverage_type_name_mapping.keys()
    ] + [
        F.expr(
            f"filter(CoverageDetailsList, x -> x.CoverageType = {coverage_type})[0].Premium"
        ).alias(f"{coverage_type_name_mapping[coverage_type]}_Premium")
        for coverage_type in coverage_type_name_mapping.keys()
    ] + [
        F.expr(
            f"filter(CoverageDetailsList, x -> x.CoverageType = {coverage_type})[0].Deductible"
        ).alias(f"{coverage_type_name_mapping[coverage_type]}_Deductible")
        for coverage_type in coverage_type_name_mapping.keys()
    ]
    data = data.select("*", *exprs)
    # drop obsolete 'CoverageDetailsList' column
    data = data.drop("CoverageDetailsList")
    return data

In [39]:
# unpack coverage details in matching and non-matching results
df_matching = unpack_coverage_details(df_matching)
df_non_matching = unpack_coverage_details(df_non_matching)

In [40]:
# check new matching result schema
df_matching.printSchema()

root
 |-- BuildingZipCode: string (nullable = true)
 |-- RequestedHouseholdSportsEquipmentValueCompensation: boolean (nullable = true)
 |-- RequestedHouseholdInsuranceSum: long (nullable = true)
 |-- NumberOfAdults: long (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- RequestedPrivateLiabilityDeductible: long (nullable = true)
 |-- HouseType: long (nullable = true)
 |-- Guid: string (nullable = true)
 |-- SquareMeters: long (nullable = true)
 |-- NumberOfRooms: double (nullable = true)
 |-- RequestedHouseholdGlassCoverage: boolean (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- RequestedHouseholdSimpleTheftDeductible: long (nullable = true)
 |-- RequestedPrivateLiabilityThirdPartyVehicles: boolean (nullable = true)
 |-- RequestedPrivateLiabilityInsuranceSum: long (nullable = true)
 |-- InsuranceType: long (nullable = true)
 |-- Floor: long (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Produ

### 3.6 Parse `BuildingZipCode` column

The column `BuildingZipCode` contains a single string containing information such as the zip code as well as the city and state (canton). In the following, this column is parsed.

In [41]:
# set random seed
random_seed = 12345

# select rows randomly with fixed random seed
randomly_selected_rows = df_matching.orderBy(F.rand(random_seed)).limit(5)

# show 'BuildingZipCode' of randomly selected rows
randomly_selected_rows.select("BuildingZipCode").take(5)

                                                                                

[Row(BuildingZipCode='2017 Boudry (NE) Boudry'),
 Row(BuildingZipCode='8001 Zürich (ZH) Zürich'),
 Row(BuildingZipCode='8400 Winterthur (ZH) Winterthur'),
 Row(BuildingZipCode='3052 Zollikofen (BE) Zollikofen'),
 Row(BuildingZipCode='6500 Bellinzona (TI) Bellinzona')]

In [42]:
def parse_building_zip_code(
    data: pyspark.sql.dataframe.DataFrame,
) -> pyspark.sql.dataframe.DataFrame:
    """Parse 'BuildingZipCode' column.
    Example: 
        '4123 Allschwil (BL) Allschwil' gets parsed into 
            * ZipCode = 4123
            * State = BL
    """
    data = data\
        .withColumn("ZipCode", F.split(data.BuildingZipCode, " ")[0])\
        .withColumn("CityState", F.split(data.BuildingZipCode, " ")[1])\

    data = data\
        .withColumn("State", F.trim(F.regexp_replace(data.BuildingZipCode, ".*\\(|\\).*", "")))\
        .drop("CityState")
    # drop now obsolete 'BuildingZipCode' column
    return data

In [43]:
# parse 'BuildingZipCode' column in matching and non-matching results
df_matching = parse_building_zip_code(df_matching)
df_non_matching = parse_building_zip_code(df_non_matching)

In [44]:
# select rows randomly with fixed random seed (previuosly defined)
randomly_selected_rows = df_matching.orderBy(F.rand(random_seed)).limit(5)

# show 'BuildingZipCode' of randomly selected rows
randomly_selected_rows.select(
    "ZipCode",
    "State",
).show(5)

[Stage 25:>                                                         (0 + 1) / 1]

+-------+-----+
|ZipCode|State|
+-------+-----+
|   2017|   NE|
|   8001|   ZH|
|   8400|   ZH|
|   3052|   BE|
|   6500|   TI|
+-------+-----+



                                                                                

### 3.7 Add `MatchingOffer` column

In [45]:
# add 'MatchingOffer' column 
# True for all matching results
df_matching = df_matching.withColumn("MatchingOffer", F.lit(True))
# False for all non matching reuslts
df_non_matching = df_non_matching.withColumn("MatchingOffer", F.lit(False))

### 3.8 Convert column types

Some columns, such as insurance sum, represent a double value, but are currently still represented as strings. In the next few chunks, the types of these columns are converted.

In [46]:
def convert_column_type(
    data: pyspark.sql.dataframe.DataFrame,
    columns_to_convert: list,
) -> pyspark.sql.dataframe.DataFrame:
    """Convert columns in a PySpark DataFrame from object to float type."""
    for column in columns_to_convert:
        data = data.withColumn(column, col(column).cast("double"))
    return data

In [47]:
# columns to convert
columns_to_convert = [
    "ClaimsSimpleTheft_InsuranceSum",
    "BalloonSportsequipment_InsuranceSum",
    "HPFremdlenker_InsuranceSum",
    "ClaimGlass_InsuranceSum",
    "Household_InsuranceSum",
    "PrivateLiability_InsuranceSum",
    "ClaimsSimpleTheft_Premium",
    "BalloonSportsequipment_Premium",
    "HPFremdlenker_Premium",
    "ClaimGlass_Premium",
    "Household_Premium",
    "PrivateLiability_Premium",
    "ClaimsSimpleTheft_Deductible",
    "BalloonSportsequipment_Deductible",
    "HPFremdlenker_Deductible",
    "ClaimGlass_Deductible",
    "Household_Deductible",
    "PrivateLiability_Deductible",
    "ZipCode"
]


# convert the column type
df_matching = convert_column_type(df_matching, columns_to_convert)
df_non_matching = convert_column_type(df_non_matching, columns_to_convert)

In [48]:
# check final matching results schema
df_matching.printSchema()

root
 |-- BuildingZipCode: string (nullable = true)
 |-- RequestedHouseholdSportsEquipmentValueCompensation: boolean (nullable = true)
 |-- RequestedHouseholdInsuranceSum: long (nullable = true)
 |-- NumberOfAdults: long (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- RequestedPrivateLiabilityDeductible: long (nullable = true)
 |-- HouseType: long (nullable = true)
 |-- Guid: string (nullable = true)
 |-- SquareMeters: long (nullable = true)
 |-- NumberOfRooms: double (nullable = true)
 |-- RequestedHouseholdGlassCoverage: boolean (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- RequestedHouseholdSimpleTheftDeductible: long (nullable = true)
 |-- RequestedPrivateLiabilityThirdPartyVehicles: boolean (nullable = true)
 |-- RequestedPrivateLiabilityInsuranceSum: long (nullable = true)
 |-- InsuranceType: long (nullable = true)
 |-- Floor: long (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Produ

In [49]:
# check final non-matching results schema
df_non_matching.printSchema()

root
 |-- BuildingZipCode: string (nullable = true)
 |-- RequestedHouseholdSportsEquipmentValueCompensation: boolean (nullable = true)
 |-- RequestedHouseholdInsuranceSum: long (nullable = true)
 |-- NumberOfAdults: long (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- NumberOfChildren: long (nullable = true)
 |-- RequestedPrivateLiabilityDeductible: long (nullable = true)
 |-- HouseType: long (nullable = true)
 |-- Guid: string (nullable = true)
 |-- SquareMeters: long (nullable = true)
 |-- NumberOfRooms: double (nullable = true)
 |-- RequestedHouseholdGlassCoverage: boolean (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- RequestedHouseholdSimpleTheftDeductible: long (nullable = true)
 |-- RequestedPrivateLiabilityThirdPartyVehicles: boolean (nullable = true)
 |-- RequestedPrivateLiabilityInsuranceSum: long (nullable = true)
 |-- InsuranceType: long (nullable = true)
 |-- Floor: long (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Produ

### 3.9 Merge matching and non-matching results

In [50]:
# append the mathcing and non-matching result DataFrames vertically
df_results = df_matching.union(df_non_matching)

df_results.show()

[Stage 32:>                                                         (0 + 1) / 1]

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

                                                                                

In [51]:
from pyspark.sql.functions import desc, asc

# reorder rows so mathcing and non-matching with same 'Guid' are beneath each other
df_results = df_results.orderBy("Guid", desc("MatchingOffer"), asc("TotalPremium"))
df_results.take(2)

                                                                                

[Row(BuildingZipCode='8803 Rüschlikon (ZH) Rüschlikon', RequestedHouseholdSportsEquipmentValueCompensation=True, RequestedHouseholdInsuranceSum=125000, NumberOfAdults=2, DateOfBirth='1972-11-13T00:00:00', NumberOfChildren=0, RequestedPrivateLiabilityDeductible=-1, HouseType=2, Guid='00059c61-b468-4404-9bb9-8cba15612414', SquareMeters=105, NumberOfRooms=5.0, RequestedHouseholdGlassCoverage=True, Nationality='CH', RequestedHouseholdSimpleTheftDeductible=2000, RequestedPrivateLiabilityThirdPartyVehicles=False, RequestedPrivateLiabilityInsuranceSum=5000000, InsuranceType=3, Floor=0, Canton='ZH', ProductPosition=1, TotalPremium=274.2, Rating=5.0, Taxes=13.1, Product='AXA SMALL', ClaimsSimpleTheft_InsuranceSum=None, BalloonSportsequipment_InsuranceSum=None, HPFremdlenker_InsuranceSum=None, ClaimGlass_InsuranceSum=None, Household_InsuranceSum=None, PrivateLiability_InsuranceSum=None, ClaimsSimpleTheft_Premium=None, BalloonSportsequipment_Premium=None, HPFremdlenker_Premium=None, ClaimGlass_Pr

In [52]:
# have a look at the first few rows in pandas
first_few_rows = df_results.limit(40).toPandas()
first_few_rows.head(n=40)

                                                                                

Unnamed: 0,BuildingZipCode,RequestedHouseholdSportsEquipmentValueCompensation,RequestedHouseholdInsuranceSum,NumberOfAdults,DateOfBirth,NumberOfChildren,RequestedPrivateLiabilityDeductible,HouseType,Guid,SquareMeters,...,PrivateLiability_Premium,ClaimsSimpleTheft_Deductible,BalloonSportsequipment_Deductible,HPFremdlenker_Deductible,ClaimGlass_Deductible,Household_Deductible,PrivateLiability_Deductible,ZipCode,State,MatchingOffer
0,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,72.8,,,,,500.0,500.0,8803.0,ZH,True
1,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,66.0,,,,0.0,500.0,500.0,8803.0,ZH,True
2,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,79.5,200.0,,,,500.0,500.0,8803.0,ZH,True
3,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,80.05,300.0,,,,500.0,500.0,8803.0,ZH,True
4,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,91.6,200.0,,,,200.0,200.0,8803.0,ZH,True
5,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,86.6,200.0,,,,200.0,200.0,8803.0,ZH,True
6,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,103.9,,,,,200.0,200.0,8803.0,ZH,True
7,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,81.4,500.0,,500.0,0.0,500.0,500.0,8803.0,ZH,True
8,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,94.8,200.0,,,200.0,200.0,200.0,8803.0,ZH,True
9,8803 Rüschlikon (ZH) Rüschlikon,True,125000,2,1972-11-13T00:00:00,0,-1,2,00059c61-b468-4404-9bb9-8cba15612414,105,...,,500.0,,,0.0,500.0,500.0,8803.0,ZH,True


In [53]:
first_few_rows[["Guid", "Product", "TotalPremium", "ProductPosition", "MatchingOffer"]].head(n=30)

Unnamed: 0,Guid,Product,TotalPremium,ProductPosition,MatchingOffer
0,00059c61-b468-4404-9bb9-8cba15612414,AXA SMALL,274.2,1,True
1,00059c61-b468-4404-9bb9-8cba15612414,ELVIA by Allianz nice,298.7,2,True
2,00059c61-b468-4404-9bb9-8cba15612414,Smile clever,298.9,3,True
3,00059c61-b468-4404-9bb9-8cba15612414,Simpego Home M,299.7,4,True
4,00059c61-b468-4404-9bb9-8cba15612414,Smile premium,333.3,5,True
5,00059c61-b468-4404-9bb9-8cba15612414,Simpego Home L,345.55,6,True
6,00059c61-b468-4404-9bb9-8cba15612414,AXA SMALL Plus,360.5,7,True
7,00059c61-b468-4404-9bb9-8cba15612414,Helvetia Privatkundenversicherung,361.5,8,True
8,00059c61-b468-4404-9bb9-8cba15612414,Generali Optima,384.6,9,True
9,00059c61-b468-4404-9bb9-8cba15612414,Baloise M,392.1,10,True


## 4. Data Dump
<a id='data_dump'></a>

In [54]:
parquet_out_file = "processed_comparis_home_results.parquet"
csv_out_file = "processed_comparis_home_results.csv"

In [55]:
# save the combined DataFrame as Parquet and CSV file
df_results.write.format("parquet").mode('overwrite').save(parquet_out_file)
df_results.write.format("csv").mode('overwrite').save(csv_out_file)

                                                                                

## 5. Measurements
<a id='measurements'></a>

In the previous section, we successfully saved the processed Comparis Home result data as Parquet and CSV files.
Now we want to examine the differences between the resulting file sizes and the read-in times, which are both crucial considerations in a business setting as well.

File size is an essential factor to consider, especially when storing data on platforms like AWS S3. AWS S3 charges based on the amount of stored data in GB. On the other hand, read-in time is significant when it comes to data retrieval and analysis. Faster read-in times can lead to improved efficiency and productivity, especially when using services like AWS Athena. AWS Athena charges based on the amount of data scanned during query execution. Therefore, reducing the scanned data size can help optimize costs.

In [60]:
from time import time

In [61]:
def get_folder_size(folder: Path) -> int:
    """Get size of a folder in bytes."""
    return sum(f.stat().st_size for f in folder.glob('**/*') if f.is_file())

In [62]:
# measure file size
csv_file_size = get_folder_size(Path(csv_out_file))
parquet_file_size = get_folder_size(Path(parquet_out_file))

print(f"CSV file size: {round(csv_file_size / 1024, 2)} kB")
print(f"Parquet file size: {round(parquet_file_size / 1024, 2)} kB")
print(f"The CSV is {round(csv_file_size / parquet_file_size, 1)} times the size of the Parquet")

CSV file size: 10240.36 kB
Parquet file size: 761.79 kB
The CSV is 13.4 times the size of the Parquet


In [63]:
# measure read time
start_time = time()
df_csv = spark.read.csv(csv_out_file, header=True, inferSchema=True)
csv_read_time = time() - start_time

start_time = time()
df_parquet = spark.read.parquet(parquet_out_file)
parquet_read_time = time() - start_time

print(f"CSV read time: {csv_read_time} seconds")
print(f"Parquet read time: {parquet_read_time} seconds")
print(f"Reading in CSV took {round(csv_read_time / parquet_read_time, 1)} as long as reading the Parquet")



CSV read time: 4.26403546333313 seconds
Parquet read time: 0.14482927322387695 seconds
Reading in CSV took 29.4 as long as reading the Parquet


                                                                                

In [68]:
file = Path("raw_comparis_home_results.json")
n_cores = range(2, 97, 2)
processing_times = {}

# loop over number of cores and measure times
for cores in n_cores:
    print(f"Measuring time for {cores=}")
    sc.stop()
    
    sc = sparky.connect(f"sparknotebook-Gruppe_3", cores)
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    df = spark.read.option("multiline", "true").json(str(file))
    df.count()
    
    # start measurement
    start_time = time()
    
    # select useful data for matching and non-matching results
    df_matching = select_useful_columns(df, input_data_str, matching_result_str)
    df_non_matching = select_useful_columns(df, input_data_str, non_matching_result_str)
    
    # drop duplicated entries
    df_matching = df_matching.dropDuplicates(["Guid"])
    df_non_matching = df_non_matching.dropDuplicates(["Guid"])
    
    # eplode the array columns in matching and non-matching results
    df_matching = explode_array_elements(df_matching, array_cols)
    df_non_matching = explode_array_elements(df_non_matching, array_cols)

    # get full product name for matching and non-matching results
    df_matching = get_product_name(df_matching)
    df_non_matching = get_product_name(df_non_matching)

    # unpack coverage details in matching and non-matching results
    df_matching = unpack_coverage_details(df_matching)
    df_non_matching = unpack_coverage_details(df_non_matching)

    # parse 'BuildingZipCode' column in matching and non-matching results
    df_matching = parse_building_zip_code(df_matching)
    df_non_matching = parse_building_zip_code(df_non_matching)

    # add 'MatchingOffer' column
    df_matching = df_matching.withColumn("MatchingOffer", F.lit(True))
    df_non_matching = df_non_matching.withColumn("MatchingOffer", F.lit(False))

    # convert the column type
    df_matching = convert_column_type(df_matching, columns_to_convert)
    df_non_matching = convert_column_type(df_non_matching, columns_to_convert)

    # append the matching and non-matching result DataFrames vertically
    processed_data = df_matching.union(df_non_matching)

    # reorder rows so mathcing and non-matching with same 'Guid' are beneath each other
    processed_data = processed_data.orderBy("Guid", desc("MatchingOffer"), asc("TotalPremium"))
    
    processed_data.show()
    # end measurement
    processing_times[cores] = time() - start_time
    
    # write out
    parquet_out_file = f"processed_comparis_home_results_{cores}.parquet"
    processed_data.write.format("parquet").mode('overwrite').save(parquet_out_file)
    
    sc.stop()


Measuring time for cores=84
Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 84 cores; real number might be less.


                                                                                

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

                                                                                

Measuring time for cores=86
Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 86 cores; real number might be less.


23/07/09 09:48:44 ERROR TaskSchedulerImpl: Lost executor 5 on 10.0.4.5: Command exited with code 50
23/07/09 09:48:44 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 3) (10.0.4.5 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Command exited with code 50
23/07/09 09:48:44 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 4) (10.0.4.5 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Command exited with code 50
23/07/09 09:49:30 ERROR TaskSchedulerImpl: Lost executor 4 on 10.0.4.137: Command exited with code 52
23/07/09 09:49:30 WARN TaskSetManager: Lost task 0.1 in stage 4.0 (TID 5) (10.0.4.137 executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Command exited with code 52
23/07/09 09:49:30 WARN TaskSetManager: Lost task 0.1 in stage 5.0 (TID 6) (10.0.4.137 executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

                                                                                

Measuring time for cores=88
Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 88 cores; real number might be less.


23/07/09 09:50:56 WARN TransportChannelHandler: Exception in connection from /209.195.92.20:42102
java.lang.IllegalArgumentException: Too large frame: 4849910940755296249
	at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.nett

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

                                                                                

Measuring time for cores=90
Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 90 cores; real number might be less.


                                                                                

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

                                                                                

Measuring time for cores=92
Attached to Sparky cluster context from sparky-collab as sparknotebook-Gruppe_3.
Requested 92 cores; real number might be less.


                                                                                

+--------------------+--------------------------------------------------+------------------------------+--------------+-------------------+----------------+-----------------------------------+---------+--------------------+------------+-------------+-------------------------------+-----------+---------------------------------------+-------------------------------------------+-------------------------------------+-------------+-----+------+---------------+------------+------+-----+--------------------+------------------------------+-----------------------------------+--------------------------+-----------------------+----------------------+-----------------------------+-------------------------+------------------------------+---------------------+------------------+-----------------+------------------------+----------------------------+---------------------------------+------------------------+---------------------+--------------------+---------------------------+-------+-----+---------

23/07/09 09:53:04 ERROR TaskSchedulerImpl: Lost executor 7 on 10.0.4.104: Command exited with code 52
23/07/09 09:53:04 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 15) (10.0.4.104 executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Command exited with code 52
23/07/09 09:53:04 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 16) (10.0.4.104 executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Command exited with code 52
23/07/09 09:53:49 ERROR TaskSchedulerImpl: Lost executor 9 on 10.0.1.89: Command exited with code 52
23/07/09 09:53:49 WARN TaskSetManager: Lost task 0.1 in stage 9.0 (TID 17) (10.0.1.89 executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Command exited with code 52
23/07/09 09:53:49 WARN TaskSetManager: Lost task 0.1 in stage 10.0 (TID 18) (10.0.1.89 executor 9): ExecutorLostFailure (executor 9 exited caused by one of the runn

Py4JJavaError: An error occurred while calling o21756.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 21) (10.0.4.5 executor 5): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: Java heap space


23/07/09 09:55:00 ERROR TaskSchedulerImpl: Lost executor 5 on 10.0.4.5: Command exited with code 52
23/07/09 10:06:02 WARN TransportChannelHandler: Exception in connection from /192.241.222.24:41102
java.lang.IllegalArgumentException: Too large frame: 5135603447292250188
	at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channe

In [65]:
processing_times

{2: 12.704103946685791,
 4: 12.437473773956299,
 6: 13.61857533454895,
 8: 17.269924879074097,
 10: 13.085744619369507,
 12: 12.249535083770752,
 14: 11.978893280029297,
 16: 12.602339506149292,
 18: 11.8199782371521,
 20: 12.62258005142212,
 22: 11.746447563171387,
 24: 11.34380054473877,
 26: 11.81308126449585,
 28: 12.044515609741211,
 30: 11.422452926635742,
 32: 10.689001321792603,
 34: 11.3423490524292,
 36: 11.387629747390747,
 38: 12.06263780593872,
 40: 11.217017889022827}

In [67]:
processing_times

{40: 11.418594121932983,
 42: 11.71618103981018,
 44: 11.772807359695435,
 46: 11.809372186660767,
 48: 130.89635968208313,
 50: 11.19321060180664,
 52: 11.233144760131836,
 54: 11.164588689804077,
 56: 11.52021837234497,
 58: 11.802715301513672,
 60: 10.973886013031006,
 62: 10.501060962677002,
 64: 11.638586044311523,
 66: 112.1554229259491,
 68: 59.33721303939819,
 70: 11.599371194839478,
 72: 11.393665075302124,
 74: 11.785641193389893,
 76: 11.21257495880127,
 78: 12.127326488494873,
 80: 11.095917463302612,
 82: 11.689680337905884,
 84: 12.439908504486084}

In [69]:
processing_times

{84: 11.557278871536255,
 86: 157.68842697143555,
 88: 11.772629499435425,
 90: 11.375708818435669,
 92: 11.085503101348877}

## 6. Clean-up
<a id='clean_up'></a>

Finally, we can remove the downloaded raw data file and close the sparky connection.

In [70]:
# remove downloaded raw data file
file.unlink()

In [71]:
sc.stop()