# Hello, world!

LEAP - Atmospheric Physics using AI

Hello!

I am Emilly, a Junior Data Engineer

I love Data Science, and I love any topic that involves space, mathematics, physics, chemistry and computing.

When I was young I loved to play Space Station 13, and that's the main reason I loved the idea of this challenge, too!

In the game, you could choose your profession before every round, and Atmospherics Engineer was one of the most complicated jobs you could choose in the entire crew. (Aside from being the Captain)

# Introduction

## CSV Strategies

The main objective of the challenge is to create an AI algorithm that will predict target values of a **Data Science Problem**.

> "Your goal is to create a model that predicts the target variables associated with a given set of input variables." - Data page

However, above that citation, there is a primary concern:

> "However, this multi-scale framework comes at a great computational cost, limiting its usage for experiments and ensemble climate projections. The goal is to train a model to emulate the effects of these small-scale processes at a fraction of the cost of explicitly resolving them." - Data page

So, if we want to improve the algorithm, we need to understand what are the fastest and most cost-efficient tools to deal with large-scale data

That is why I developed CSV Strategies

The challenge data is provided as CSV, and there is also an available API for downloading more data from HuggingFace, which is not implemented here yet

However, this notebook will give you a good understanding of what tools to use when dealing with huge datasets

Being a Jr. Data Engineer I am very concerned about performance and I am very grateful to have found this challenge and the people who are submitting to it!

Have a good read.

In [None]:
!pip install pyspark > NULL
print("installed pyspark")
!pip install dask > NULL
print("installed dask")
!pip install datatable > NULL
print("installed datatable")
!pip install ray > NULL
print("installed ray")
!pip install unidist > NULL
print("installed unidist")
!pip install modin > NULL
print("installed modin")

!pip install ipywidgets > NULL
print("installed ipywidgets")

# Library for abstraction
from abc import ABC, abstractmethod
print("abstraction imported")

# Libraries for strategy choosing
import pandas as pd
from pyspark.sql import SparkSession
from io import StringIO
import polars as pl
import dask.dataframe as dd
import datatable as dtt
import modin.pandas as modpd
import os
print("strategies imported")

# util
from datetime import datetime
import ipywidgets
import numpy as np
print("utils imported")

# create files dictionary
files = {}

print("creating files {dictionary:values}...")

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        file_path = os.path.join(dirname, filename)
        print(file_path)
        files[filename] = file_path

In [None]:
# utils
def now():
    return datetime.now()

In [None]:
###################################################
#          CSV READ STRATEGY                      #
###################################################
class CSVReadStrategy(ABC):
    @abstractmethod
    def read_lines(self, file_path,n_lines):
        """ read a file's n_lines: number of lines"""
        pass

###################################################
    
class PandasCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "PandasCSVReadStrategy"
    def read_lines(self, file_path, n_lines, **kwargs):
        df = pd.read_csv(file_path, nrows=n_lines, **kwargs)
        return df
    
class OSCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "OSCSVReadStrategy"
    def read_lines(self, file_path, n_lines):
        lines = []
        with open(file_path, 'r') as file:
            for _ in range(n_lines):
                line = file.readline()
                if not line:
                    break
                lines.append(line)
            file.close()
        return lines

#class PandasStringIOStrategy(CSVReadStrategy):
#    def __init__(self):
#        self.name = "PandasStringIOStrategy"
#    
#    def read_lines(self, file_path, n_lines, **kwargs):
#        # Convert data to a StringIO object
#        string_io = StringIO(file_path)
#        # stringIO enforce n_lines
#        data_lines = string_io.readlines()[:n_lines]
#        # Read CSV from the StringIO object
#        df = pd.read_csv(data_lines, nrows=n_lines, **kwargs)
#        return df
    
class PySparkCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "PySparkCSVReadStrategy"
        self.spark = SparkSession.builder.appName("CSVReader").getOrCreate()

    def read_lines(self, file_path, n_lines, schema=None, **kwargs):
        if 'new_schema' in kwargs:
            new_schema = kwargs.get('new_schema', None)
            df = self.spark.read.options(header=True, **kwargs).schema(new_schema).csv(file_path).limit(n_lines)
        else:
            df = self.spark.read.options(header=True,sampling_ratio=0.005, infer_schema=True, **kwargs).csv(file_path).limit(n_lines)
            
        return df

class PolarsCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "PolarsCSVReadStrategy"
    def read_lines(self, file_path, n_lines, **kwargs):        
        # Read CSV using Polars
        df = pl.read_csv(file_path, n_rows=n_lines, **kwargs)
        
        return df
    
class DaskCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "DaskCSVReadStrategy"
    def read_lines(self, file_path, n_lines, **kwargs):
        df = dd.read_csv(file_path, **kwargs).head(n=n_lines)
        return df

class DatatableCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "DatatableCSVReadStrategy"
    def read_lines(self, file_path, n_lines, **kwargs):
        if(kwargs==None):
            df = dtt.fread(cmd = f"""head -n {n_lines} {file_path} {kwargs}""")
        else:
            df = dtt.fread(cmd = f"""head -n {n_lines} {file_path}""")
        return df

class ModinPandasCSVReadStrategy(CSVReadStrategy):
    def __init__(self):
        self.name = "ModinPandasCSVReadStrategy"
    def read_lines(self, file_path, n_lines, **kwargs):
        df = modpd.read_csv(file_path, nrows=n_lines,header=0, **kwargs)
        return df
    
###################################################
#         DEFINE STRATEGIES                       #
###################################################
class RSA:
    def __init__(self, strategy: CSVReadStrategy):
        self.strategy = strategy
        
    def all_strategies(self):
        # Define a list of strategies
        list_of_strategies = [
            PandasCSVReadStrategy(),
            OSCSVReadStrategy(),
            #PandasStringIOStrategy(),
            PySparkCSVReadStrategy(),
            PolarsCSVReadStrategy(),
            DaskCSVReadStrategy(),
            DatatableCSVReadStrategy(),
            ModinPandasCSVReadStrategy()
        ]
        return list_of_strategies

    def set_strategy(self, strategy: CSVReadStrategy):
        self.strategy = strategy

    def read_lines(self, file_path, n_lines, **kwargs):
        return self.strategy.read_lines(file_path, n_lines, **kwargs)
    
    def create_store(self):
        self._store = {}
        # Loop through each strategy
        for st in self.all_strategies():
            print("creating strategy in dictionary: ", st.name)
            # Initialize a dictionary to store the results for the current strategy
            self._store[str(st.name)] = {
                "time_taken": [],
                "n_lines": [],
                "kwargs": None
            }
        return "store was created. access it with rsa._store"

    @staticmethod
    def help():
        msg = f"""
Welcome to Read Strategy Algorithm (RSA)

This algorithm helps in reading CSV files using different strategies. 
You can choose from the following strategies:
1. PandasCSVReadStrategy (Pandas)
2. OSCSVReadStrategy (OS)
~3. PandasStringIOStrategy (Pandas IO)~
4. PySparkCSVReadStrategy (Pyspark)
5. PolarsCSVReadStrategy (Polars)
6. DaskCSVReadStrategy (Dask)
7. DatatableCSVReadStrategy (datatable)
8. ModinPandasCSVReadStrategy (modin)

To use this algorithm:
1. Instantiate RSA with your desired strategy.
2. Use the 'set_strategy' method to change the strategy if needed.
3. Use the 'read_lines' method to read CSV files using the selected strategy.

Example:
rsa = RSA(PandasCSVReadStrategy())
rsa.set_strategy(PandasCSVReadStrategy())
rsa.read_lines('example.csv',n_lines=100)
"""
        return msg

In [None]:
print(RSA.help())

In [None]:
rsa = RSA(PandasCSVReadStrategy())

In [None]:
rsa.create_store()

In [None]:
rsa._store

# first read 500 lines (small sampling)

In [None]:
n_lines = 500

for st in rsa.all_strategies():
    print(f"executing strategy {st.name}...")
    
    t = now()
    
    rsa.set_strategy(st)

    kwargs = rsa._store[str(st.name)]["kwargs"]
    
    if(kwargs != None):
        data = rsa.read_lines(files["train.csv"], n_lines = n_lines, **kwargs)
    else:
        data = rsa.read_lines(files["train.csv"], n_lines = n_lines)

    dt = now() - t
        
    rsa._store[str(st.name)]["time_taken"] = dt.total_seconds()
    rsa._store[str(st.name)]["n_lines"] = n_lines
    rsa._store[str(st.name)]["data"] = data

In [None]:
for k in rsa._store.keys():
    print(k)
    print(rsa._store[k]["time_taken"])
    print("type of data:", type(rsa._store[k]["data"]))
    print("-"*20)

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType

# Assume rsa is your dataframe
df = rsa._store["PySparkCSVReadStrategy"]["data"]

# Get the schema
schema = df.schema

# Create a new schema with desired changes
new_schema = StructType([
    StructField(field.name, FloatType() if field.name != "sample_id" else field.dataType, field.nullable)
    for field in schema
])

In [None]:
rsa._store["PySparkCSVReadStrategy"]["kwargs"] = \
    {"lineSep":"\n","new_schema":new_schema}

In [None]:
n_lines = 500

for st in rsa.all_strategies():
    print(f"executing strategy {st.name}...")
    
    t = now()
    
    rsa.set_strategy(st)

    kwargs = rsa._store[str(st.name)]["kwargs"]
    
    if(kwargs != None):
        data = rsa.read_lines(files["train.csv"], n_lines = n_lines, **kwargs)
    else:
        data = rsa.read_lines(files["train.csv"], n_lines = n_lines)

    dt = now() - t
        
    rsa._store[str(st.name)]["time_taken"] = dt.total_seconds()
    rsa._store[str(st.name)]["n_lines"] = n_lines
    rsa._store[str(st.name)]["data"] = data

In [None]:

for st in rsa._store.keys():
    x = rsa._store[st]["n_lines"]
    y = rsa._store[st]["time_taken"]
    print(x)
    print(y)

In [None]:
import matplotlib.pyplot as plt

st =list(rsa._store.keys())

x = [rsa._store[strategy]["n_lines"] for strategy in st]
y = [rsa._store[strategy]["time_taken"] for strategy in st]

# Plotting
plt.bar(st, y, color='blue')

# Adding labels and title
plt.xlabel('State')
plt.ylabel('Time Taken')
plt.title('Time Taken vs State when n_lines = 500')

# Rotating x-axis labels for better readability
plt.xticks(rotation=90)

# Displaying the plot
plt.show()


# Results analysis

Looking at the graph above, you might think that Dask has the worst performance between all of these strategies for reading CSV files

**but you yould be WRONG**

What is actually happening is that DASK reads the file in a particular way, that partitions the file into N parts

I discovered that because I plotted this graph many times

Once a specific strategy is being too far apart from the other strategies in terms of performance, **I TROUBLESHOOT**

The things I look for when troubleshooting are the parameters of that library and its modules for CSV reading or file reading, and I look for any parameters that might affect the number of rows being read.

On pandas, I came accross some, including nrows, chunksize

But Dask only has a limiting function that works AFTER the partitioning of the file

Which Might turn it into the fastest way to read a CSV file yet!

We will go over other strategies and iterations to approve this possibility

In [None]:
import matplotlib.pyplot as plt

st =list(rsa._store.keys())
st = [item for item in st if item != "DaskCSVReadStrategy"]

x = [rsa._store[strategy]["n_lines"] for strategy in st]
y = [rsa._store[strategy]["time_taken"] for strategy in st]

# Plotting
plt.bar(st, y, color='blue')

# Adding labels and title
plt.xlabel('State')
plt.ylabel('Time Taken')
plt.title('Time Taken vs State when n_lines = 500')

# Rotating x-axis labels for better readability
plt.xticks(rotation=90)

# Displaying the plot
plt.show()


In [None]:
def time_execution_analysis_between_strategies(m, s):
    """
        Creates a dictionary of data for data analysis on best strategy to read a CSV
        
        m = Max number of ROWS to read from files
        
        s = Steps, in number of ROWS to iterate over to get the dat
        
        example:
        
        m = 1500
        s = 500
        
        starts at reading 500 rows
        reads every 500 rows:
        - 500
        - 1000
        - 1500
        up to 1500
        
        returns: strategy_data = dictionary datatype
                    {strategy.name: {'x': [500, 1000, 1500],'y': [0.160281, 0.325411, 0.453121]},
    """
    # Initialize a dictionary to store x and y values for each strategy
    strategy_data = {st.name: {'x': [], 'y': []} for st in rsa.all_strategies()}

    # max N of counter
    max_exc = m / s
    # Number of rows + steps
    ms = m + s
    
    for n_lines in range(s,ms, s):
        c_exc = n_lines / s

        for st in rsa.all_strategies():
            if(
                (c_exc <= max_exc and st.name != "DaskCSVReadStrategy"
                   or c_exc == max_exc and st.name == "DaskCSVReadStrategy")
            ):
                #print(f"executing strategy {st.name}...")

                t = now()

                rsa.set_strategy(st)

                kwargs = rsa._store[str(st.name)]["kwargs"]

                if(kwargs != None):
                    data = rsa.read_lines(files["train.csv"], n_lines = n_lines, **kwargs)
                else:
                    data = rsa.read_lines(files["train.csv"], n_lines = n_lines)

                del data

                dt = now() - t

                rsa._store[str(st.name)]["time_taken"] = dt.total_seconds()
                rsa._store[str(st.name)]["n_lines"] = n_lines
                #rsa._store[str(st.name)]["data"] = data

                # Append values to the strategy-specific lists
                strategy_data[st.name]['x'].append(n_lines)
                strategy_data[st.name]['y'].append(dt.total_seconds())

    return strategy_data

In [None]:
strat = time_execution_analysis_between_strategies(m=1500,s=500)

In [None]:
strat

In [None]:
# Plotting the results
plt.figure(figsize=(10, 6))

for st_name, data in strat.items():
    plt.plot(data['x'], data['y'], label=st_name)

plt.xlabel('Number of Lines')
plt.ylabel('Time Taken (seconds)')
plt.title('Time Taken for Different Strategies')
plt.legend()
plt.show()

In [None]:
# provides 10 iterations
#Million_1 = 1000000
#H_Thousand = 100000

#strat = time_execution_analysis_between_strategies(m=Million_1,s=H_Thousand)

In [None]:
# Plotting the results
#plt.figure(figsize=(10, 6))

#for st_name, data in strat.items():
#    plt.plot(data['x'], data['y'], label=st_name)

#plt.xlabel('Number of Lines')
#plt.ylabel('Time Taken (seconds)')
#plt.title('Time Taken for Different Strategies')
#plt.legend()
#plt.show()

# CSV Strategies V5 Analysis : 4 strategies

Analysing READS for CSV file

- Pandas
- OS
- PySpark
    (its performance here is probably related to pyspark not having any action such as querying, saving data, etc.)
- Polars

    - y = time taken (seconds)
    - x = number of lines (^6)
        - 100.000
        - 200.000
        - ...
        - 1.000.000
        
Time taken: 6 Hours, 26 minutes
    Iterations: 40 (4 strategies, 10 row Iterations)

![alt text](https://github.com/EmaoqFilho-NTTData/csv_strategy/blob/main/strategies_v5.png?raw=true "Title")

# WIP...