## Task: File Ingestion and Schema validation

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [16]:
import os
import time

In [17]:
import gdown
gdown.download('https://drive.google.com/uc?id=1jHl_LAXu26kndGpd2J5W-Xz0s5Pqkp6Q', quiet=False)


Downloading...
From (original): https://drive.google.com/uc?id=1jHl_LAXu26kndGpd2J5W-Xz0s5Pqkp6Q
From (redirected): https://drive.google.com/uc?id=1jHl_LAXu26kndGpd2J5W-Xz0s5Pqkp6Q&confirm=t&uuid=63491ba3-62c6-4e9b-83d7-5ccdbb4b5767
To: /content/IBM Transactions for Anti Money Laundering.zip
100%|██████████| 583M/583M [00:05<00:00, 114MB/s]


'IBM Transactions for Anti Money Laundering.zip'

In [18]:
import zipfile

# Path to the ZIP file
zip_file_path = "IBM Transactions for Anti Money Laundering.zip"

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall()  # Extracts all files to the current directory


In [19]:
File_path='/content/IBM Transactions for Anti Money Laundering.csv'

In [20]:
#Size of the file
file_size=os.path.getsize(File_path)
print(f"The file size is {file_size} bytes")

# Convert size to GB
file_size_gb = file_size / (1024**3)

print(f"The file size is {file_size_gb:.2f} GB")

The file size is 3031783420 bytes
The file size is 2.82 GB


# Read the file using Dask

In [21]:
!pip install dask[dataframe]



In [26]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('/content/IBM Transactions for Anti Money Laundering.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.0005664825439453125 sec


# Read the file using Pandas

In [36]:
import pandas as pd
import time
start = time.time()
pandas_df = pd.read_csv('/content/IBM Transactions for Anti Money Laundering.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")
print(f"File shape: {pandas_df.shape}")

Read csv with pandas:  108.39546656608582 sec
File shape: (31898238, 11)


# Read the file using Ray

In [37]:
!pip install Modin[ray] ray

Collecting ray
  Downloading ray-2.35.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (16 kB)
Collecting Modin[ray]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting pandas<2.3,>=2.2 (from Modin[ray])
  Downloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Downloading ray-2.35.0-cp310-cp310-manylinux2014_x86_64.whl (65.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.0/65.0 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m61.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading modin-0.32.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m38.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pandas, Modin, ray
  Attempting uninstall: pandas
    F

In [38]:
import ray
import time

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Measure time to read CSV using Ray Data
start = time.time()
ray_df = ray.data.read_csv('/content/IBM Transactions for Anti Money Laundering.csv')
end = time.time()

print("Read CSV with Ray Data:", (end - start), "sec")


2024-09-16 12:19:41,978	INFO worker.py:1783 -- Started a local Ray instance.


Read CSV with Ray Data: 7.36477518081665 sec


# Read the file using Modin with Ray

In [39]:
!pip install pandas==2.2.2



In [1]:
import modin.pandas as mpd
import ray
import time

# Initialize Ray
#ray.init(ignore_reinit_error=True) #Already Initialised

# Measure time to read CSV using Modin with Ray
start = time.time()
modin_df = mpd.read_csv('/content/IBM Transactions for Anti Money Laundering.csv', nrows=1000000)
end = time.time()

print("Read CSV with Modin (Ray):", (end - start), "sec")


2024-09-16 12:23:35,320	INFO worker.py:1783 -- Started a local Ray instance.


Read CSV with Modin (Ray): 21.758262157440186 sec


**Summary**

Here Dask is better than Pandas, Ray, Modin[Ray]

# Exploration

In [3]:
from dask import dataframe as dd
dask_df = dd.read_csv('/content/IBM Transactions for Anti Money Laundering.csv')

In [4]:
dask_df.head()

Unnamed: 0,Timestamp,From Bank,Account,To Bank,Account.1,Amount Received,Receiving Currency,Amount Paid,Payment Currency,Payment Format,Is Laundering
0,2022/09/01 00:17,20,800104D70,20,800104D70,6794.63,US Dollar,6794.63,US Dollar,Reinvestment,0
1,2022/09/01 00:02,3196,800107150,3196,800107150,7739.29,US Dollar,7739.29,US Dollar,Reinvestment,0
2,2022/09/01 00:17,1208,80010E430,1208,80010E430,1880.23,US Dollar,1880.23,US Dollar,Reinvestment,0
3,2022/09/01 00:03,1208,80010E650,20,80010E6F0,73966883.0,US Dollar,73966883.0,US Dollar,Cheque,0
4,2022/09/01 00:02,1208,80010E650,20,80010EA30,45868454.0,US Dollar,45868454.0,US Dollar,Cheque,0


In [5]:
dask_df.info()

<class 'dask_expr.DataFrame'>
Columns: 11 entries, Timestamp to Is Laundering
dtypes: float64(2), int64(3), string(6)

In [6]:
len(dask_df.index)

31898238

In [7]:
#No, of Columns
len(dask_df.columns)

11

In [8]:
dask_df.columns

Index(['Timestamp', 'From Bank', 'Account', 'To Bank', 'Account.1',
       'Amount Received', 'Receiving Currency', 'Amount Paid',
       'Payment Currency', 'Payment Format', 'Is Laundering'],
      dtype='object')

In [9]:
# Clean the column names in the Dask DataFrame
dask_df.columns = dask_df.columns.str.lower()  #Make all column names lowercase for consistency
dask_df.columns = dask_df.columns.str.replace(r'[^\w\s]', '', regex=True)  # Remove special characters like "."
dask_df.columns = dask_df.columns.str.replace(' ', '_')  # Replace spaces with underscores
dask_df.columns = dask_df.columns.str.strip('_')  # Remove leading/trailing whitespaces
dask_df.columns = list(map(lambda x: x.strip(), list(dask_df.columns)))  #Remove leading/trailing whitespaces



In [10]:
# Verify the new column names
dask_df.columns

Index(['timestamp', 'from_bank', 'account', 'to_bank', 'account1',
       'amount_received', 'receiving_currency', 'amount_paid',
       'payment_currency', 'payment_format', 'is_laundering'],
      dtype='object')

# Validation

In [11]:
%%writefile store.yaml
file_type: csv
dataset_name: IBM Transactions for Anti Money Laundering
file_name: IBM Transactions for Anti Money Laundering
table_name: ibm_transactions
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 0
columns:
  - timestamp
  - from_bank
  - account
  - to_bank
  - account1
  - amount_received
  - receiving_currency
  - amount_paid
  - payment_currency
  - payment_format
  - is_laundering


Writing store.yaml


In [12]:
import yaml

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
          # Load YAML file as a Python dictionary
          return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
          print(f"Error reading YAML file: {exc}")
          return None

In [13]:
import pandas as pd
import dask.dataframe as dd

# Load the configuration
config = read_config_file('/content/store.yaml')

if config is None:
    raise ValueError("Failed to load configuration file.")

# Read the dataset
df = dd.read_csv('/content/IBM Transactions for Anti Money Laundering.csv', delimiter=config['inbound_delimiter'])

# Perform validation
def col_header_val(df, table_config):
  df.columns = df.columns.str.lower()  # Convert to lowercase
  df.columns = dask_df.columns.str.replace(' ', '_')  # Replace spaces with underscores
  df.columns = df.columns.str.replace(r'[^\w\s]', '', regex=True)  # Replace special characters
  df.columns = df.columns.str.strip('_')  # Remove leading/trailing underscores
  df.columns = list(map(lambda x: x.strip(), list(df.columns)))  #Remove leading/trailing whitespaces

  expected_col = list(map(lambda x: x.strip().lower(), table_config['columns']))
  #expected_col = table_config['columns']
  expected_col.sort()

  df_columns = list(map(lambda x: x.lower(), list(df.columns)))
  df_columns.sort()

  if len(df_columns) == len(expected_col) and df_columns == expected_col:
      print("Column name and column length validation passed")
      return True
  else:
      print("Column name and column length validation failed")
      mismatched_columns_file = list(set(df_columns).difference(expected_col))
      print("Following columns are in the file but not in the YAML file:", mismatched_columns_file)
      missing_YAML_file = list(set(expected_col).difference(df_columns))
      print("Following columns are in the YAML but missing from the file:", missing_YAML_file)
      return False


col_header_val(df, config)


Column name and column length validation passed


True

In [14]:
df.to_csv('/content/IBM_Transactions_for_Anti_Money_Laundering_processed.gz', sep=config['outbound_delimiter'], compression='gzip', single_file=True)


['/content/IBM_Transactions_for_Anti_Money_Laundering_processed.gz']

In [15]:
import os

# Get the processed file size
file_size = os.path.getsize('/content/IBM_Transactions_for_Anti_Money_Laundering_processed.gz') / (1024 * 1024)  # size in MB

# Summary
print("File Summary:")
print(f"Total rows: {df.shape[0].compute()}")
print(f"Total columns: {len(df.columns)}")
print(f"File size: {file_size:.2f} MB")


File Summary:
Total rows: 31898238
Total columns: 11
File size: 622.42 MB
