# 1. Filtering
Before we are going to clean up the data and apply pre-processing steps, we will first see if there are not already products and services which are already directly matching with the EP catalogue (ASSUMING THAT THE EP CATALOGUS CONTAINS CLEAN DATA). This reduces the number of records we would need to pre-process. Since we still need those records for the final output, we will flag them by adding a link to the EP catalogue.

## 1.1 Import the required libraries

In [0]:
import pandas as pd
import os 

from utils import *

## 1.2 Load in the raw data

In [0]:
base_path = "abfss://preprocessing@storagetiltdevelop.dfs.core.windows.net/data/example_data/input/"

In [0]:
if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
    def file_exists(path):
        try:
            dbutils.fs.ls(path)
            return True
        except Exception as e:
            if 'java.io.FileNotFoundException' in str(e):
                raise FileNotFoundError("File could not be found. Are you sure the file exists in the provided directory?")
            else:
                raise
    # Determine the location of the dataframe containing the company based products and services
    europages_activities_catalogue_location = "abfss://preprocessing@storagetiltdevelop.dfs.core.windows.net/data/example_data/input/scraped_data/scraped_EP_products_catalogue.csv"
    file_path = dbutils.widgets.get("FilePath")
    file_path = str(os.path.join(base_path, file_path).replace("\\", "/"))
    
    # Read the dataframe
    europages_activities_catalogue = spark.read.option("header",True).csv(europages_activities_catalogue_location).toPandas()
    # check if file exists
    file_exists(file_path)
    file_df = spark.read.option("header",True).csv(file_path).toPandas()

else:
    # Determine the location of the dataframe EuroPages catalogue
    europages_activities_catalogue_location = "../../data/example_data/input/scraped_data/scraped_EP_products_catalogue.csv"
    # Determine the location of the dataframe containing unprocessed products and services
    base_tilt_data_location = "../../data/example_data/input/tilt_base_products_and_services_unprocessed.csv"
    # Determine the location of the dataframe containing the unprocessed products and services
    italy_tilt_data_location = "../../data/example_data/input/tilt_italy_products_and_services_unprocessed.csv"
    
    # Read the dataframe
    europages_activities_catalogue  = pd.read_csv(europages_activities_catalogue_location)
    # Read the base_data
    base_tilt_data = pd.read_csv(base_tilt_data_location)
    # Read the italy data
    italy_tilt_data = pd.read_csv(italy_tilt_data_location)

## 1.3 Filtering

In [0]:
def exclude_covered_records(df_1, df_2):
    # Merge the two dataframes based on the 'products_and_services' column
    merged_df = df_1.merge(df_2, on='products_and_services', how='inner')

    # Remove ID column and rename products_id_y to linke_EP_products_id
    merged_df = merged_df.drop(['ID', 'products_id_y'], axis=1)
    merged_df = merged_df.rename(columns={'products_id_x': 'products_id'})
    
    # add column called to_process which labels the records that need to be processed
    merged_df['to_process'] = False
    

    # concatentate the merged_df with the records in df_1 that arent in merged_df based of products_id
    df_1 = df_1[~df_1['products_id'].isin(merged_df['products_id'])]
    merged_df = pd.concat([merged_df, df_1], ignore_index=True)	
    # replace the NaN values in the to_process column with "yes"
    merged_df['to_process'] = merged_df['to_process'].fillna(True)

    return merged_df

### Provided input Data

In [0]:
filtered_file_df = exclude_covered_records(file_df, europages_activities_catalogue)

### Base Data

In [0]:
# filtered_base_data = exclude_covered_records(base_tilt_data, europages_activities_catalogue)

### tilt Italy Data

In [0]:
# filtered_italy_data = exclude_covered_records(italy_tilt_data, europages_activities_catalogue)

## 1.4 Export the filtered dataframe

In [0]:
if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
    # Define the path for the new dataframe
    output_path = str(os.path.join(os.path.dirname(os.path.dirname(file_path)), "output/flagged_" + os.path.basename(file_path))).replace("\\", "/")    
    # Convert the pandas dataframe to a spark sql dataframe
    filtered_df_spark = spark.createDataFrame(filtered_file_df)
    # Write the new dataframe to the path
    filtered_df_spark.write.csv(output_path, mode="overwrite", header=True)
    dbutils.jobs.taskValues.set(key = 'OutPath', value = output_path)
else:
    # Define the path for the new dataframe
    output_filtered_base_data = "../../data/example_data/output/base_data/base_flagged_products.csv"
    # Define the path for the new dataframe
    output_filtered_italy_data = "../../data/example_data/output/italy_data/italy_flagged_products.csv"

    # Write the new dataframe to the path
    filtered_base_data.to_csv(output_filtered_base_data,index = False)
    # Write the new dataframe to the path
    filtered_italy_data.to_csv(output_filtered_italy_data,index = False)