# This notebook imports and processes data for an agricultural price index

Instrucciones: hay que cambiar el mes de la base nueva.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql import Window
import os
import pandas as pd
import numpy as np
import re
import sklearn as skl
import seaborn as sns
import plotly.express as px

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

from datetime import datetime, timedelta, date
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, BooleanType, DoubleType, ArrayType, DateType

from operator import add
from functools import reduce

spark.conf.set("fs.azure.account.key.rosadevdatalake.dfs.core.windows.net", "TrY8v62FPF8bWK6ZVNvT619Z2nFRCw4EoWhrVHEQmsj1yJ6ibrfCrtEvxhGUSMomOTDBNKx/SUUY+AStEHIjxw==")
spark.conf.set("fs.azure.account.key.rosaproddatalake.dfs.core.windows.net", dbutils.secrets.get(scope = "rosa-prod-kv", key = "rosaproddatalake-accesskey"))


This notebook assumes we get parquet files data in a defined folder, structured as follows:

- full_dataset
  - Folder [UK_prices_{date}]
    - UK_prices_{date}

 Where the date has a consistent date format YYYYmmdd

In [0]:
raw_path_file = '/mnt/rosadev_databricks/ROSA/Unilever/BE_CAR/full_dataset/'
clean_path_file = '/mnt/rosadev_databricks/ROSA/Unilever/BE_CAR/clean_data/'

list = dbutils.fs.ls(raw_path_file)
list = [list[x][1].split('_')[1] for x in range(len(list))]
list.sort()

last_date_file = list[-1]
last_date_added = pd.to_datetime(last_date_file)

print(last_date_file, last_date_added)

In [0]:
# First and last date of the week
last = last_date_added
first = last_date_added - timedelta(weeks=1)
print(first, last)

# New data - processing

In [0]:
col_names = ["period_date", "store_city", "store_key", "barcode", "item_desc", "total_sales_amount", "total_qty"]
data = spark.read.parquet(raw_path_file + 'Carrefour_' + last_date_file)

data = (data
  .drop_duplicates()
  .select(col_names)
  .withColumn('period_date', F.col('period_date').cast(DateType()))
  .withColumn('store_city', F.regexp_replace(F.col('store_city').cast(StringType()), '\..*$', ''))
  .withColumn('store_key', F.regexp_replace(F.col('store_key').cast(StringType()), '\..*$', ''))
  .withColumn('total_sales_amount', F.col('total_sales_amount').cast(DoubleType()))
  .withColumn('total_qty', F.col('total_qty').cast(DoubleType()))
)

data.limit(10).display()

period_date,store_city,store_key,barcode,item_desc,total_sales_amount,total_qty
2023-09-04,UNKNOWN CITY_KEY,B654,50097425,DOVE DEO ROLL ON ORIGINAL 50ML/DOVE DEO ROLL ON ORIGINAL 50ML,8.78,3.0
2023-09-04,UNKNOWN CITY_KEY,1722,54024502,REXONA STICK COTON 40ML/REXONA STICK KATOEN 40ML,6.99,1.0
2023-09-04,UNKNOWN CITY_KEY,B899,54024786,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,2.49,1.0
2023-09-04,UNKNOWN CITY_KEY,1866,54024786,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,2.49,1.0
2023-09-04,UNKNOWN CITY_KEY,B406,54024786,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,5.36,2.0
2023-09-04,UNKNOWN CITY_KEY,B284,54024878,SIGNAL BD EXP.COMF MED/SIGNAL TB EXP.COMF MED,3.69,1.0
2023-09-04,UNKNOWN CITY_KEY,B468,54024878,SIGNAL BD EXP.COMF MED/SIGNAL TB EXP.COMF MED,4.29,1.0
2023-09-04,UNKNOWN CITY_KEY,B062,59082903,REXONA ROLL CLEAN SENT 50ML/REXONA ROLL CLEAN SENT 50ML,4.05,1.0
2023-09-04,UNKNOWN CITY_KEY,B598,59085744,REXONA FM INTENSE SPORT 100ML/REXONA FM INTENSE SPORT 100ML,7.59,1.0
2023-09-04,UNKNOWN CITY_KEY,B719,73103714,REXONA ST.COBALT BLUE 50ML/REXONA ST.COBALT BLUE 50ML,4.35,1.0



# Data cleaning, deduplication and outlier detection

In [0]:
max_date = data.agg(F.max("period_date")).collect()[0][0]
print(max_date)

In [0]:
data = (data
        .filter(F.col('period_date') >= first)
        .filter(F.col('period_date') <= last))

### IQR

In [0]:
iqr = (
  data
  .withColumn('log_quantity', F.log(F.col('total_sales_amount') + 0.001))
  .groupBy(['period_date', 'barcode'])
  .agg(F.percentile_approx('log_quantity', 0.25, 10000).alias('IQR_1'), F.percentile_approx('log_quantity', 0.75, 10000).alias('IQR_3'))
  .withColumn('LB', F.col('IQR_1') - 3 * (F.col('IQR_3') - F.col('IQR_1')))
  .withColumn('UB', F.col('IQR_3') + 3 * (F.col('IQR_3') - F.col('IQR_1')))
)

data = (data
        .join(iqr, on = ['period_date', 'barcode'], how = 'left')
        .withColumn('log_quantity', F.log(F.col('total_sales_amount') + 0.1))
        .filter(F.col('log_quantity') >= F.col('LB'))
        .filter(F.col('log_quantity') <= F.col('UB'))
      )

data.display()

period_date,barcode,store_city,store_key,item_desc,total_sales_amount,total_qty,IQR_1,IQR_3,LB,UB,log_quantity
2023-09-04,50097425,UNKNOWN CITY_KEY,B654,DOVE DEO ROLL ON ORIGINAL 50ML/DOVE DEO ROLL ON ORIGINAL 50ML,8.78,3.0,1.1154694057345325,2.339013449607597,-2.555162725884661,6.00964558122679,2.1838015570040787
2023-09-04,54024502,UNKNOWN CITY_KEY,1722,REXONA STICK COTON 40ML/REXONA STICK KATOEN 40ML,6.99,1.0,1.944623607529848,2.646245716855093,-0.1602427204458862,4.751112044830828,1.958685340544036
2023-09-04,54024786,UNKNOWN CITY_KEY,B899,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,2.49,1.0,0.9126842362800434,2.2865573319168484,-3.208935050630372,6.408176618827264,0.9516578757114464
2023-09-04,54024786,UNKNOWN CITY_KEY,1866,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,2.49,1.0,0.9126842362800434,2.2865573319168484,-3.208935050630372,6.408176618827264,0.9516578757114464
2023-09-04,54024786,UNKNOWN CITY_KEY,B406,SIGNAL BAD CLASSIC FRESH HARD/SIGNAL TB CLASSIC FRESH HARD,5.36,2.0,0.9126842362800434,2.2865573319168484,-3.208935050630372,6.408176618827264,1.6974487897568138
2023-09-04,54024878,UNKNOWN CITY_KEY,B284,SIGNAL BD EXP.COMF MED/SIGNAL TB EXP.COMF MED,3.69,1.0,1.3058974240478616,2.50723875499441,-2.2981265687917856,6.111262747834058,1.332366019094335
2023-09-04,54024878,UNKNOWN CITY_KEY,B468,SIGNAL BD EXP.COMF MED/SIGNAL TB EXP.COMF MED,4.29,1.0,1.3058974240478616,2.50723875499441,-2.2981265687917856,6.111262747834058,1.47932922708708
2023-09-04,59082903,UNKNOWN CITY_KEY,B062,REXONA ROLL CLEAN SENT 50ML/REXONA ROLL CLEAN SENT 50ML,4.05,1.0,1.3989637642205537,2.077063716893872,-0.6353360937994028,4.111363574913828,1.4231083342426067
2023-09-04,73103714,UNKNOWN CITY_KEY,B719,REXONA ST.COBALT BLUE 50ML/REXONA ST.COBALT BLUE 50ML,4.35,1.0,1.470405703738543,2.5549767186672407,-1.7833073410475504,5.808689763453334,1.4929040961781488
2023-09-04,96081693,UNKNOWN CITY_KEY,1722,DOVE MEN DEO ROLL CL COMF 50ML/DOVE MEN DEO ROLL CL COMF 50ML,38.01,8.0,1.7211581620445082,2.41956780847966,-0.374070777260947,4.514796747785115,3.640476714885769



# Append and save data

In [0]:
data.write.format('delta').mode("overwrite").save(clean_path_file)