# Data Preprocessing with Dask 

## Preparation of Dask oriented 

In [ ]:
import numpy as np
import pandas as pd

# dask --> for parallel computing in local devices 
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

In [ ]:
# set up of local cluster using dask + jupyter 
from dask.distributed import Client

client = Client(n_workers=4)
client

# Execution of Data Merging 

In [2]:
import os.path # importing os.path module for directory manipulation 
import glob  # Import glob for working with file patterns

# Define a path or directory containing data files
path = "/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset"

# Get the directory name   
# from the specified path 
dirname = os.path.dirname(path) 
  
# Print the directory name   
print(dirname)

/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics


In [27]:
print("Path to raw pricecatcher data files")
print(path)
print()

pricecatcher_files = glob.glob(f"{path}/*.csv")
pricecatcher_files = sorted(pricecatcher_files)
print("Sample List of Pricecatcher Files")
print(pricecatcher_files)

Path to raw pricecatcher data files
/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset

Sample List of Pricecatcher Files
['/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-01.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-02.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-03.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-04.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-05.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-06.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pricecatcher_2022-07.csv', '/home/linux_subsystem/Documents/Thesis_GeospatialTimeSeriesAnalytics/raw dataset/pr

In [28]:
# Load all CSV files into a Dask DataFrame
ddf = dd.read_csv(pricecatcher_files)

# Example operation: Compute the mean of a column
mean_result = ddf['price'].mean().compute()

# Print the result
print(mean_result)

13.314751659850524


In [29]:
ddf.head()

Unnamed: 0,date,premise_code,item_code,price
0,2022-01-01,2,1,9.1
1,2022-01-01,2,9,36.0
2,2022-01-01,2,14,24.0
3,2022-01-01,2,16,4.3
4,2022-01-01,2,18,4.5


In [30]:
ddf.tail()

Unnamed: 0,date,premise_code,item_code,price
2170653,2023-12-31,20903,1928,6.99
2170654,2023-12-31,20903,1930,5.99
2170655,2023-12-31,20903,1943,13.0
2170656,2023-12-31,20903,1944,5.9
2170657,2023-12-31,20903,1946,0.99


In [None]:
# convert ddf from series into dataframe
ddf = pd.DataFrame(ddf).compute()
print("BEFORE")
print(ddf.head())

# convert 'date' attribute into date format 
ddf['date'] = pd.to_datetime(ddf['date']).compute()
# change the datetime format into YYYY/MM/DD
ddf['date'] = ddf['date'].dt.strftime('%Y/%m/%d').compute()

print("AFTER")
print(ddf) 


In [0]:
# using dictionary to convert remaining columns
convert_dict = {'premise_code': int,
                'item_code': int, 
                'price': float
                }
ddf = ddf.astype(convert_dict).compute()

print(ddf) 

In [ ]:
ddf.astype(str)
print(df.dtypes)

In [31]:
describe = ddf.describe().compute()
describe

Unnamed: 0,premise_code,item_code,price
count,56504340.0,56504340.0,56504340.0
mean,11342.32,1005.108,13.31475
std,7061.466,723.5869,12.50929
min,-1.0,-1.0,0.01
25%,4110.0,152.0,5.5
50%,13369.0,1153.0,10.0
75%,17980.0,1591.0,17.99
max,20903.0,2058.0,3599.0


In [ ]:
ddf.date.astype('int64') 

In [18]:
corr_matrix = ddf.corr().compute()

print(corr_matrix)

ValueError: could not convert string to float: '2022-03-01'

In [ ]:
ddf1 = dd.read_csv('large_dataset1.csv')