In [1]:
# Dataset taken from: https://www.kaggle.com/datasets/ealtman2019/ibm-transactions-for-anti-money-laundering-aml?select=HI-Medium_Trans.csv
# 4 different approaches to read the file:
# 1. Dask, 2. Modin, 3. Ray, 4. Pandas
# Size of the file being read-in: 3.03 GB
# Source code adapted from: https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html
# Source code adapted from: https://docs.dask.org/en/latest/user-interfaces.html#laziness-and-computing

In [2]:
# Modin reading in the file
"""import modin.pandas as pd
import time

start_time = time.time()

df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv')

end_time = time.time()

print("Time taken by Modin to read CSV file: ", end_time - start_time, "seconds")
# Modin not a computationally efficient way to read in CSV file as it takes too long"""

'import modin.pandas as pd\nimport time\n\nstart_time = time.time()\n\ndf = pd.read_csv(\'/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv\')\n\nend_time = time.time()\n\nprint("Time taken by Modin to read CSV file: ", end_time - start_time, "seconds")\n# Modin not a computationally efficient way to read in CSV file as it takes too long'

In [3]:
# Ray reading in the file
import ray
import time

# Start Ray
ray.init(ignore_reinit_error=True)

# Define the CSV file path
csv_path = "/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv"

# Time how long it takes to read the CSV file
start_time = time.time()
dataframe = ray.data.read_csv(csv_path)
end_time = time.time()

# Print the time taken
print(f"Time taken: {end_time - start_time} seconds")

# Shutdown Ray
ray.shutdown()

# Computationally inefficient compared to Dask

2023-03-13 00:41:00,288	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Time taken: 2.3709144592285156 seconds


In [4]:
# Pandas reading in the file
"""import pandas as pd

start_time = time.time()
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv')
end_time = time.time()

elapsed_time = end_time - start_time
print(f'Time taken to read file: {elapsed_time} seconds')
# Computationally very inefficient compared to Dask"""

"import pandas as pd\n\nstart_time = time.time()\ndf = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv')\nend_time = time.time()\n\nelapsed_time = end_time - start_time\nprint(f'Time taken to read file: {elapsed_time} seconds')\n# Computationally very inefficient compared to Dask"

In [2]:
# Dask reading in the file
import dask.dataframe as dd
import time

# Start timer
start_time = time.time()

# Read in CSV file
df = dd.read_csv('/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv')

# Stop timer and calculate elapsed time
elapsed_time = time.time() - start_time

# Print elapsed time
print(f"Time taken by Dask to read CSV file: {elapsed_time} seconds")

Time taken by Dask to read CSV file: 0.04643988609313965 seconds


In [3]:
# Using Dask to perform basic validation on the data
def clean_column_name(col_name):
    # Replace whitespace characters with underscore
    col_name = col_name.replace(' ', '_')
    # Remove special characters
    col_name = ''.join(e for e in col_name if e.isalnum() or e == '_')
    # Convert to lowercase
    col_name = col_name.lower()
    return col_name

# Use the map() function to apply the clean_column_name() function to all column names in the dataframe
df.columns = df.columns.map(clean_column_name)

# Preview the cleaned column names
print(df.columns)

Index(['timestamp', 'from_bank', 'account', 'to_bank', 'account1',
       'amount_received', 'receiving_currency', 'amount_paid',
       'payment_currency', 'payment_format', 'is_laundering'],
      dtype='object')


In [4]:
import yaml

# Define the YAML file paths
yaml_file_path = '/content/drive/MyDrive/Colab Notebooks/output_YAML.yaml'

# Define the separator for the YAML file
yaml_separator = ', '

# Clean the column names
cleaned_columns = [col.strip().replace(' ', '_').replace('.', '_') for col in df.columns]

# Write the cleaned column names to a YAML file
with open(yaml_file_path, 'w') as f:
    yaml.dump(cleaned_columns, f, default_flow_style=False, sort_keys=False)

In [13]:
# Read in YAML file
with open('/content/drive/MyDrive/Colab Notebooks/output_YAML.yaml') as f:
    yaml_data = yaml.safe_load(f)

# Define separator for read and write files
separator = ','

# Read in CSV file
df = dd.read_csv('/content/drive/MyDrive/Colab Notebooks/HI-Medium_Trans.csv', sep=separator)

# Validate number of columns
if len(df.columns) != len(yaml_data):
    print('Error: Number of columns in CSV file does not match YAML file.')
    exit()
print('Validation successful.')
# Total number of columns in the YAML file = 11

# Define function to count rows in each partition
def count_rows(partition):
    return len(partition)

# Map function to each partition and sum the results
# total_rows = df.map_partitions(count_rows).sum().compute()
# Total number of rows in the CSV file = 31,631,294

# Converting to pip-separated gz file
df.to_csv('/content/drive/MyDrive/Colab Notebooks/output_GZ.gz', sep='|', compression='gzip', single_file=True)

Validation successful.


KeyboardInterrupt: ignored