In [13]:
%%writefile file.yaml
file_type: csv
dataset_name: stream_games_reviews
file_name: stream_dataset
table_name: reviews_table
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 0
columns:
  - app_id
  - app_name
  - review_text
  - review_score
  - review_votes

Overwriting file.yaml


In [14]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1

    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


In [15]:
import pandas as pd

In [16]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
source_path = '/content/drive/MyDrive/Colab Notebooks/stream_dataset.csv'
destination_path = '/content'

In [18]:
import shutil

try:
  shutil.copy2(source_path, destination_path)
  print("File copied successfully!")
except FileNotFoundError:
  print("Error: File not found at the source path.")
except Exception as e:
  print(f"An error occurred: {e}")

File copied successfully!


In [19]:
import testutility as utl

In [20]:
config_data=utl.read_config_file("/content/file.yaml")

In [21]:
config_data

{'file_type': 'csv',
 'dataset_name': 'stream_games_reviews',
 'file_name': 'stream_dataset',
 'table_name': 'reviews_table',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 0,
 'columns': ['app_id',
  'app_name',
  'review_text',
  'review_score',
  'review_votes']}

In [22]:
file_type=config_data['file_type']
source_file="./"+config_data['file_name']+f'.{file_type}'
delimiter=config_data['inbound_delimiter']

In [23]:
df=pd.read_csv(source_file,delimiter=delimiter)
df.head()

Unnamed: 0,app_id,app_name,review_text,review_score,review_votes
0,10,Counter-Strike,Ruined my life.,1,0
1,10,Counter-Strike,This will be more of a ''my experience with th...,1,1
2,10,Counter-Strike,This game saved my virginity.,1,0
3,10,Counter-Strike,• Do you like original games? • Do you like ga...,1,0
4,10,Counter-Strike,"Easy to learn, hard to master.",1,1


In [24]:
utl.col_header_val(df,config_data)

column name and column length validation passed


1

In [25]:
if utl.col_header_val(df,config_data)==0:
  print("Validation Failed")
  print("Cols of file are: ",df.columns)
  print("Cols of YAML are: ",config_data['columns'])
  print("Handle mismatched columns or Simply Reject the file")
else:
  print("Validation Passed")
  print("Data has passed basic validation and is ready for further processing.")

column name and column length validation passed
Validation Passed
Data has passed basic validation and is ready for further processing.


In [26]:
import pandas as pd
from timeit import timeit
import gzip

def compress_to_gz(data, output_filename):
  """
  Compresses the provided data (string) to a pipe-separated GZ format.

  Args:
      data: The string data to compress (validation results).
      output_filename: The name of the output GZ file.
  """
  with gzip.open(output_filename, "wb") as f_out:
    f_out.write(data.encode("utf-8"))



In [70]:
if utl.col_header_val(df, config_data) == 0:
  print("Validation Failed")
  print("Cols of file are: ", df.columns)
  print("Cols of YAML are: ", config_data['columns'])
  print("Handle mismatched columns or Simply Reject the file")
  cols=df.cols
  c_cols=config_data['columns']
  validation_message = ("Column mismatch|Handle mismatched columns or Simply Reject the file")
else:
  print("Validation Passed")
  print("Data has passed basic validation and is ready for further processing.")
  validation_message=("Validation Passed")


column name and column length validation passed
Validation Passed
Data has passed basic validation and is ready for further processing.


In [28]:
import pandas as pd
from timeit import timeit

csv_path = "/content/stream_dataset.csv"

def read_csv_pandas():
    start_time = timeit()
    df_pandas = pd.read_csv(csv_path)
    end_time = timeit()
    return end_time - start_time

pandas_time = read_csv_pandas()


In [29]:
pandas_time

-0.014622276001318824

In [30]:
import dask.dataframe as dd

csv_path = "/content/stream_dataset.csv"


def process_data_chunk(chunk):

    return chunk * 2


def process_data_dask(csv_path):
    start_time = timeit()
    df_dask = dd.read_csv(csv_path)

    processed_df_dask = df_dask.map_partitions(process_data_chunk)

    end_time = timeit()
    return processed_df_dask, end_time - start_time


processed_df_dask, dask_time = process_data_dask(csv_path)


panda_time = pandas_time

if dask_time < panda_time:
    print("Dask might be sufficient for this data size or operation.")

else:
    print("Dask is likely faster for parallel processing on this dataset.")



Dask is likely faster for parallel processing on this dataset.


In [31]:
dask_time

-0.013186422000217135

In [32]:
import numpy as np
import ray

array_size = 100_000_000

data = np.random.rand(array_size)


def process_chunk(chunk):
    return chunk * 2


@ray.remote
def process_ray_chunk(chunk):
    return process_chunk(chunk)

def process_ray():
    start_time = timeit()
    ray.init()
    chunks = np.array_split(data, 4)
    processed_chunks = [process_ray_chunk.remote(chunk) for chunk in chunks]
    processed_data = np.concatenate(ray.get(processed_chunks))
    ray.shutdown()
    end_time = timeit()
    return end_time - start_time


processing_time = process_ray()

2024-05-12 20:38:27,207	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [33]:
processing_time

0.02002304099914909

In [34]:
print(f"Pandas read time: {pandas_time:.2f} seconds")
print(f"Dask processing time: {dask_time:.2f} seconds")
print(f"Ray processing time: {processing_time:.2f} seconds")


if dask_time < processing_time:
    print("Dask is faster for parallel processing on this dataset.")

elif processing_time < 0:
    print("Pandas might be sufficient for this data size or operation.")

else:
    print("Ray is the fastest option for this data size and operation.")


Pandas read time: -0.01 seconds
Dask processing time: -0.01 seconds
Ray processing time: 0.02 seconds
Dask is faster for parallel processing on this dataset.


In [35]:
pandas_time_str=str(pandas_time)
dask_time_str=str(dask_time)
ray_time_str=str(processing_time)

In [36]:
compare_msg= validation_message + "|Pandas Processing time :" + pandas_time_str + "|Dask processing time :" + dask_time_str + "|Ray processing time:" + ray_time_str + "|Dask is faster for parallel processing on this dataset."
compress_to_gz(compare_msg, "validation_results.pipe.gz")

In [37]:
import pandas as pd

file_path = "/content/validation_results.pipe.gz"

results = pd.read_csv(file_path, compression="gzip", sep="|")

print(results)


Empty DataFrame
Columns: [Validation Passed ,  Data has passed basic validation and is ready for further processing , Pandas Processing time :-0.014622276001318824, Dask processing time :-0.013186422000217135, Ray processing time:0.02002304099914909, Dask is faster for parallel processing on this dataset.]
Index: []


In [38]:
from transformers import pipeline
import gzip

# Path to your GZ file
file_path = "/content/validation_results.pipe.gz"

# Decompress the GZ file
with gzip.open(file_path, "rb") as f_in:
  decompressed_data = f_in.read().decode("utf-8")

# Initialize summarization pipeline with a pre-trained model
summarizer = pipeline("summarization")

# Summarize the decompressed data
summary = summarizer(decompressed_data, max_length=39, min_length=30)  # Adjust lengths as needed

print("Summary:", summary[0]['summary_text'])


No model was supplied, defaulted to sshleifer/distilbart-cnn-12-6 and revision a4f8f3e (https://huggingface.co/sshleifer/distilbart-cnn-12-6).
Using a pipeline without specifying a model name and revision in production is not recommended.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Summary:  Pandas processing time :-0.014622276001318824|Ray processing time:0.013186422000217135|Pandas is faster for


In [39]:
import gzip

# Path to your GZ file
file_path = "/content/validation_results.pipe.gz"

# Decompress the GZ file
with gzip.open(file_path, "rb") as f_in:
  decompressed_data = f_in.read().decode("utf-8")

# Extract key information (validation outcome, processing times)
validation_outcome = decompressed_data.split("|")[0]  # Assuming validation message is first field
processing_times = decompressed_data.split("|")[1:]  # Extract remaining fields
# Generate a summary message (modify as needed)
summary = f"Validation Outcome: {validation_outcome}\nProcessing Times:\n"


# Validate data format before processing (assuming consistent format)
valid_data = [time_str for time_str in processing_times if len(time_str.split("|")) >= 3]

print("Summary:\n", summary)


Summary:
 Validation Outcome: Validation Passed 
Processing Times:



In [88]:
import gzip

validation_result = validation_message
total_cols = len(df.columns)
total_rows = len(df)
file_size = '2.01 GB'
compare_msg = (
    validation_result
    + "| Pandas Processing time: "
    + str(pandas_time)
    + "| Dask Processing time: "
    + str(dask_time)
    + "| Ray Processing time: "
    + str(processing_time)
    + f" | Total Rows: {total_rows} | Total Columns: {total_cols} | File Size: {file_size}"
)



# Compress the message into a GZ file (replace if needed)
file_path = "/content/validation_results.pipe.gz"
with gzip.open(file_path, "wb") as f_out:
    f_out.write(compare_msg.encode("utf-8"))

# Decompress the GZ file and read the data
with gzip.open(file_path, "rb") as f_in:
    decompressed_data = f_in.read().decode("utf-8")

decompressed_data



'Validation Passed| Pandas Processing time: -0.014622276001318824| Dask Processing time: -0.013186422000217135| Ray Processing time: 0.02002304099914909 | Total Rows: 6417106 | Total Columns: 5 | File Size: 2.01 GB'

In [90]:

# Extract key information (validation outcome, processing times)
outcome = decompressed_data.split("|")[0]
processing_times = decompressed_data.split("|")[1:]

# Generate the summary message
summary = (
    f"Validation Outcome: {outcome}\n"
    f"Processing Times:\n"
)

# Process valid data entries (assuming consistent format)
# ... (existing code)

for time_str in processing_times:
    if len(time_str.split("|")) >= 1:  # Adjust if more than 1 element is expected
        system_name = time_str.split("|")[0]
        summary += f"- {system_name}\n"
    else:
        summary += f"- Incomplete processing time data: {time_str}\n"

summary += "Dask is faster for processing on this dataset.\n"





# Print the summary
print("Summary:\n", summary)


Summary:
 Validation Outcome: Validation Passed
Processing Times:
-  Pandas Processing time: -0.014622276001318824
-  Dask Processing time: -0.013186422000217135
-  Ray Processing time: 0.02002304099914909 
-  Total Rows: 6417106 
-  Total Columns: 5 
-  File Size: 2.01 GB
Dask is faster for processing on this dataset.

