###Importing the modules

In [None]:
import pandas as pd
import time
import dask.dataframe as dd
import os

###Different methods of file reading

#####Using Pandas

In [41]:
initial = time.time()
data = pd.read_csv('/content/sample_data/Windmill.csv')
time_consumed_pandas = time.time() - initial
print(f"Pandas has taken {time_consumed_pandas} seconds to load the datafile")



Pandas has taken 0.7290947437286377 seconds to load the datafile


#####Using Dask

In [42]:
initial = time.time()
data = dd.read_csv('/content/sample_data/Windmill.csv')
time_consumed_dask = time.time() - initial
print(f"Dask has taken {time_consumed_dask} seconds to load the datafile")

Dask has taken 0.012734413146972656 seconds to load the datafile


###Installing packages for Modin and Ray


In [4]:
!pip install ray



In [5]:
#There exist an error between existing version of pandas and installed package of modin.Modifying the pandas existing package
!pip install --upgrade pandas==2.2.*




In [6]:
#upgrading reinstalling  modin
!pip install --upgrade --force-reinstall modin


Collecting modin
  Using cached modin-0.30.1-py3-none-any.whl (1.2 MB)
Collecting pandas<2.3,>=2.2 (from modin)
  Using cached pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
Collecting packaging>=21.0 (from modin)
  Using cached packaging-24.1-py3-none-any.whl (53 kB)
Collecting numpy<2,>=1.22.4 (from modin)
  Using cached numpy-1.26.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Collecting fsspec>=2022.11.0 (from modin)
  Using cached fsspec-2024.6.0-py3-none-any.whl (176 kB)
Collecting psutil>=5.8.0 (from modin)
  Using cached psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (288 kB)
Collecting python-dateutil>=2.8.2 (from pandas<2.3,>=2.2->modin)
  Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Collecting pytz>=2020.1 (from pandas<2.3,>=2.2->modin)
  Using cached pytz-2024.1-py2.py3-none-any.whl (505 kB)
Collecting tzdata>=2022.7 (from pa

In [7]:
!pip uninstall -y pandas modin


Found existing installation: pandas 2.2.2
Uninstalling pandas-2.2.2:
  Successfully uninstalled pandas-2.2.2
Found existing installation: modin 0.30.1
Uninstalling modin-0.30.1:
  Successfully uninstalled modin-0.30.1


In [8]:
!pip install pandas==2.2.2 modin==0.30.1


Collecting pandas==2.2.2
  Using cached pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
Collecting modin==0.30.1
  Using cached modin-0.30.1-py3-none-any.whl (1.2 MB)
Installing collected packages: pandas, modin
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cudf-cu12 24.4.1 requires pandas<2.2.2dev0,>=2.0, but you have pandas 2.2.2 which is incompatible.
google-colab 1.0.0 requires pandas==2.0.3, but you have pandas 2.2.2 which is incompatible.[0m[31m
[0mSuccessfully installed modin-0.30.1 pandas-2.2.2


######Using Modin

In [10]:
import modin.pandas as mpd
import ray
ray.init(ignore_reinit_error=True)
initial = time.time()
data_modin = mpd.read_csv('/content/sample_data/Windmill.csv')
time_consumed_modin = time.time() - initial
print(f"Modin has taken {time_consumed_modin} seconds to load the datafile")


2024-06-11 15:59:43,045	INFO worker.py:1586 -- Calling ray.init() again after it has already been called.


Modin has taken 2.365089178085327 seconds to load the datafile


######Using Ray

In [11]:
initial = time.time()
ray.init(ignore_reinit_error=True)
data_ray = ray.data.read_csv('/content/sample_data/Windmill.csv')
time_consumed_ray = time.time() - initial
print(f"Ray has taken {time_consumed_ray} seconds to load the datafile")



2024-06-11 16:01:30,127	INFO worker.py:1586 -- Calling ray.init() again after it has already been called.


Ray has taken 1.566192626953125 seconds to load the datafile


###Computational efficiency

In [59]:
# The computational efficiency of loading files using different modules
Efficiancy = {
    'pandas': time_consumed_pandas,
    'Dask': time_consumed_dask,
    'Modin': time_consumed_modin,
    'Ray': time_consumed_ray
}

print("Computational Efficiency Findings:", Efficiancy)

Computational Efficiency Findings: {'pandas': 0.7290947437286377, 'Dask': 0.012734413146972656, 'Modin': 2.365089178085327, 'Ray': 1.566192626953125}


###Creating Utilis function for data validation

In [43]:
#Writing a utilis function for validation
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re

#Function to read the Yaml file
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

#Function to perform validation between the Windmill dataset and the schema we require for the data
def col_header_val(df, table_config):
    '''
This Function transform all column names to lowecase,ensure any character in the column that is not a word to _,
Removes leading and trailing zeros in colum names,replace multiple undescores to single underscores
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))


    #Convert the columns in configration file to lowercase and sort

    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()

    df.columns = list(map(lambda x: x.lower(), list(df.columns)))


    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("column name and  length are same in the givendataset and congiration -> validation passed")
        return 1
    else:
        print("column name and length mismatch, validation failed")
        mismatched_columns = list(set(df.columns).difference(expected_col))

        print("These columns are not found in the YAML file", mismatched_columns)
        missing_YAML_file = list(set(expected_col).difference(df.columns))

        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting utility.py


###Creating the Yaml File for required Schema

In [44]:
#Writing a yaml file name Windmill.yaml with the required column names and delimeters for configuration

%%writefile windmill.yaml
file_type: csv
dataset_name: Windmill
file_name: Windmill
table_name: datas
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - Temperature
    - Pressure
    - Humidity
    - Speed

Overwriting windmill.yaml


###Validating number of columns and column name of ingested file with YAML

In [53]:
import utility
data_schema = utility.read_config_file("windmill.yaml")

# Read the windmill file w.r.t our schema and verifying
file_type = data_schema['file_type']
source_file = "/content/sample_data/" + data_schema['file_name'] + f'.{file_type}'
print(source_file)

/content/sample_data/Windmill.csv


In [54]:
data = pd.read_csv(source_file, delimiter=data_schema['inbound_delimiter'])

# Validating the column headers using utility function
validation_result = utility.col_header_val(data, data_schema)

if validation_result == False:
    print("Validation failed")
else:
    print("Validation passed")


column name and  length are same in the givendataset and congiration -> validation passed
Validation passed


###Writing the file in pipe separated text file (|) in gz format.

In [55]:
data.to_csv('Windmill_compressed.gz', sep='|', index=False, compression='gzip')
data

Unnamed: 0,temperature,pressure,humidity,speed
0,19.116226,76.328420,68.494706,61.486759
1,46.548725,18.997384,54.300660,19.784080
2,83.847663,37.060320,3.783252,83.036573
3,85.344576,32.875587,36.586661,50.075240
4,81.941988,31.862048,59.735974,39.877795
...,...,...,...,...
1048570,54.544765,82.745748,30.575210,27.743963
1048571,85.365583,71.352061,4.009105,20.056982
1048572,55.923561,76.383234,90.885080,2.673864
1048573,97.751169,75.644167,80.672092,43.748530


###Summary of the file

In [58]:

compresse_summary = {
    'Total number of rows': data.shape[0],
    'Total number of columns': data.shape[1],
    'File size': os.path.getsize('Windmill_compressed.gz')
}

print("Compressed Windmill file has", compresse_summary)

Compressed Windmill file has {'Total number of rows': 1048575, 'Total number of columns': 4, 'File size': 24308878}
