![title](data_glacier_screenshot_of_assignment.png) 

## Week #6 Assignment- Data Ingestion & Schema- Carmelo R Casiraro- Data Glacier- Batch Code: LISUM34 

# Step 1- Generate Training Data (Run Locally)
### This routine will produce 40 million lines of output. This will most likely cause the kernel to crash if using an online Jupyter notebook editor. Therefore, this code should be downloaded onto your local system and run from there.

In [None]:
import csv
import random

def generate_label(feature1, feature2, feature3):
    """
    Generates a label based on the three features.
    For simplicity, we'll use a basic rule-based approach.
    """
    if feature1 + feature2 > feature3:
        return 'A'
    elif feature1 - feature2 < feature3:
        return 'B'
    else:
        return 'C'

def generate_data(num_samples):
    """
    Generates synthetic data with three numerical features and a label.
    """
    data = []
    for _ in range(num_samples):
        feature1 = random.uniform(0, 100)
        feature2 = random.uniform(0, 100)
        feature3 = random.uniform(0, 100)
        label = generate_label(feature1, feature2, feature3)
        data.append([feature1, feature2, feature3, label])
    return data

def write_csv(file_name, data):
    """
    Writes the generated data to a CSV file.
    """
    header = ['Feature1', 'Feature2', 'Feature3', 'Label']
    with open(file_name, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(header)
        writer.writerows(data)

if __name__ == "__main__":
    num_samples = 40000000
    data = generate_data(num_samples)
    write_csv('training_data.csv', data)
    print(f"Generated {num_samples} samples and written to 'training_data.csv'")

### Step 1.1- After running the above code locally (NOT in this notebook), you can run the following code to verify that the output is indeed > 2GB

In [None]:
import os
def get_file_size(file_path):
    try:
        size = os.path.getsize(file_path)
        return size
    except OSError as e:
        print(f"Error: {e}")
        return None
def human_readable_size(size, decimal_places=2):
    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
        if size < 1024.0:
            return f"{size:.{decimal_places}f} {unit}"
        size /= 1024.0
print(human_readable_size(get_file_size("test_data.csv")))

# Step 2- Upload file to google drive for hosting
### Attempting to read the entire >2GB file into memory from this notebook will overload the kernel. Therefore, we need to stream the data from a third party. Here, we use google drive. Alternatives are available, but the code to interact with them may be slightly different.

### Step 2.1- Go to Google Drive and + to Upload File
![title](step2.1_googledrive_uploadfile.png)


### Click on file where the data set is to upload file on your local system
![title](step2.1_googledrive_clickondatasetfiletoupload.png)

### Step 2.2- Add the collection of the file id once it is uploaded
![title](getfileid_googledriveurllink_image.png)

# Step 3- Define function to get download link from Google Drive
### if you type the link to the file into the address bar, you'll see a warning screen that this file is too big to scan for viruses. Google will download the file only once you submit the form. So, we use the requests library to get the form and then use beautifulsoup to parse and submit it.

In [1]:
import requests
from bs4 import BeautifulSoup

def get_download_link(file_id):
    URL = "https://docs.google.com/uc?export=download"
    session = requests.Session()
    response = session.get(URL, params={'id': file_id})
    soup = BeautifulSoup(response.content, 'html.parser')
    
    form = soup.find('form', {'id': 'download-form'})
    if not form:
        raise ValueError("Couldn't find the download form in the response.")
    
    download_url = form['action']
    params = {input['name']: input['value'] for input in form.find_all('input') if 'name' in input.attrs}
    
    return download_url, params

file_id = "1bW3a-Ym2WnUlZdqwzm5a9yVGSGLeLtrU" 

# Step 4 - Read with Pandas
### To avoid overloading the kernel, we stream the file and read in chunks of 10000 rows at a time. You can adjust the chunk_size parameter as needed. The larger this value, the faster the routine will run, but the more memory it will consume.

In [3]:
import io
import csv
import time
import pandas as pd

def process_csv_with_pandas(file_id, chunk_size=10000):
    start_time = time.time()
    
    download_url, params = get_download_link(file_id)
    
    with requests.get(download_url, params=params, stream=True) as response:
        response.raise_for_status()
        
        lines = (line.decode('utf-8') for line in response.iter_lines())
        
        header = next(lines)
        print(f"Header: {header}")
        
        csv_reader = csv.reader(lines)
        
        total_rows = 0
        chunk = []
        
        for row in csv_reader:
            if not row:
                continue
            chunk.append(row)
            if len(chunk) == chunk_size:
                df = pd.DataFrame(chunk, columns=header.split(','))
                total_rows += len(df)
                chunk = []
        
        if chunk:
            df = pd.DataFrame(chunk, columns=header.split(','))
            total_rows += len(df)
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        print(f"Total rows processed: {total_rows}")
        print(f"Time taken: {elapsed_time:.2f} seconds")
        print(f"Processing speed: {total_rows / elapsed_time:.2f} rows/second")
        
process_csv_with_pandas(file_id, chunk_size=10000)

Header: Feature1,Feature2,Feature3,Label
Total rows processed: 40000000
Time taken: 96.90 seconds
Processing speed: 412797.86 rows/second


# Step 5- Read with Dask 

In [4]:
import io
import csv
import time
import dask.dataframe as dd
import requests

def process_csv_with_dask(file_id, chunk_size=10000):
    start_time = time.time()
    
    download_url, params = get_download_link(file_id)
    
    with requests.get(download_url, params=params, stream=True) as response:
        response.raise_for_status()
        
        lines = (line.decode('utf-8') for line in response.iter_lines())
        
        header = next(lines)
        print(f"Header: {header}")
        
        csv_reader = csv.reader(lines)
        
        total_rows = 0
        chunk = []
        
        for row in csv_reader:
            if not row:
                continue
            chunk.append(row)
            if len(chunk) == chunk_size:
                df = dd.from_pandas(pd.DataFrame(chunk, columns=header.split(',')), npartitions=1)
                total_rows += len(df.compute())
                chunk = []
        
        if chunk:
            df = dd.from_pandas(pd.DataFrame(chunk, columns=header.split(',')), npartitions=1)
            total_rows += len(df.compute())
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        print(f"Total rows processed: {total_rows}")
        print(f"Time taken: {elapsed_time:.2f} seconds")
        print(f"Processing speed: {total_rows / elapsed_time:.2f} rows/second")
        
process_csv_with_dask(file_id, chunk_size=10000)

Header: Feature1,Feature2,Feature3,Label
Total rows processed: 40000000
Time taken: 116.24 seconds
Processing speed: 344117.44 rows/second


## Step 6- Read with Modin

In [None]:
import io
import csv
import time
import modin.pandas as pd
import requests

# Initialize Modin to use Ray as the execution engine
import os
os.environ["MODIN_ENGINE"] = "ray"  # or "dask" if you prefer to use Dask as the engine

def process_csv_with_modin(file_id, chunk_size=10000):
    start_time = time.time()
    
    download_url, params = get_download_link(file_id)
    
    with requests.get(download_url, params=params, stream=True) as response:
        response.raise_for_status()
        
        lines = (line.decode('utf-8') for line in response.iter_lines())
        
        header = next(lines)
        print(f"Header: {header}")
        
        csv_reader = csv.reader(lines)
        
        total_rows = 0
        chunk = []
        
        for row in csv_reader:
            if not row:
                continue
            chunk.append(row)
            if len(chunk) == chunk_size:
                df = pd.DataFrame(chunk, columns=header.split(','))
                total_rows += len(df)
                chunk = []
        
        if chunk:
            df = pd.DataFrame(chunk, columns=header.split(','))
            total_rows += len(df)
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        print(f"Total rows processed: {total_rows}")
        print(f"Time taken: {elapsed_time:.2f} seconds")
        print(f"Processing speed: {total_rows / elapsed_time:.2f} rows/second")
        
process_csv_with_modin(file_id, chunk_size=10000)

Header: Feature1,Feature2,Feature3,Label


2024-07-11 01:50:24,118	INFO worker.py:1771 -- Started a local Ray instance.


## Step 7- Create a YAML File & Validate Data Set with YAML file

In [10]:
%%writefile schema.yaml
columns:
- name: Feature1
  type: float
- name: Feature2
  type: float
- name: Feature3
  type: float
- name: Label
  type: string

Overwriting schema.yaml


## Step 8- Use the YAML file to validate the data set

In [11]:
import yaml
import requests
import csv
import io

def validate_data(file_id, schema_file):
    with open(schema_file, 'r') as file:
        schema = yaml.safe_load(file)
    
    expected_columns = [col['name'] for col in schema['columns']]
    
    download_url, params = get_download_link(file_id)
    
    with requests.get(download_url, params=params, stream=True) as response:
        response.raise_for_status()
        
        # Read the first line of the file to get the header
        header_line = next(response.iter_lines()).decode('utf-8')
        header = header_line.split(',')
        
        # Check if the number of columns matches the expected count
        if len(header) != len(expected_columns):
            raise ValueError(f"Incorrect number of columns. Expected: {len(expected_columns)}, Got: {len(header)}")
        
        # Check if the column names match the expected names
        if header != expected_columns:
            raise ValueError(f"Column names do not match schema. Expected: {expected_columns}, Got: {header}")
    
    print("Data validation completed successfully.")

validate_data(file_id, 'schema.yaml')

Data validation completed successfully.


## Step 9- Output in gz format

In [None]:
import gzip

def convert_to_pipe_separated_gz(file_id, output_file, chunk_size=10000):
    download_url, params = get_download_link(file_id)
    
    with requests.get(download_url, params=params, stream=True) as response, gzip.open(output_file, 'wt') as gz_file:
        response.raise_for_status()
        
        lines = (line.decode('utf-8') for line in response.iter_lines())
        
        header = next(lines).split(',')
        gz_file.write('|'.join(header) + '\n')
        
        csv_reader = csv.reader(lines)
        
        for row in csv_reader:
            if row:
                gz_file.write('|'.join(row) + '\n')
    
    print(f"File converted and saved as: {output_file}")

convert_to_pipe_separated_gz(file_id, 'output_data.txt.gz')

## Step 10- Create a Summary report of the file

In [None]:
import os

def print_file_summary(file_path):
    with gzip.open(file_path, 'rt') as file:
        header = next(file).strip().split('|')
        num_columns = len(header)
        
        num_rows = sum(1 for line in file) + 1  
        # Add 1 to include the header
    
    file_size = os.path.getsize(file_path)
    
    print(f"File Summary for {file_path}:")
    print(f"Number of rows: {num_rows}")
    print(f"Number of columns: {num_columns}")
    print(f"File size: {file_size} bytes")

print_file_summary('output_data.txt.gz')