<a href="https://colab.research.google.com/github/Krishnan-Raghavan/Packt/blob/main/DataCleaningAnd_PreparationChapter1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Batch Preprocessing

In [1]:
import time
import random

# Step 1: Generate Mock Data
def generate_mock_data(num_records):
    data = []
    for _ in range(num_records):
        record = {
            'id': random.randint(1, 1000),
            'value': random.random() * 100
        }
        data.append(record)
    return data

# Step 2: Batch Processing
def process_in_batches(data, batch_size):
    for i in range(0, len(data), batch_size):
        yield data[i:i + batch_size]

# Step 3: Transform Data
def transform_data(batch):
    transformed_batch = []
    for record in batch:
        transformed_record = {
            'id': record['id'],
            'value': record['value'],
            'transformed_value': record['value'] * 1.1  # Example transformation
        }
        transformed_batch.append(transformed_record)
    return transformed_batch

# Step 4: Load Data
def load_data(batch):
    for record in batch:
        # Simulate loading data into a database
        print(f"Loading record into database: {record}")

# Main Function
def main():
    # Parameters
    num_records = 100  # Total number of records to generate
    batch_size = 10    # Number of records per batch

    # Generate data
    data = generate_mock_data(num_records)
    print("Original data:",data)

    # Process and load data in batches
    for batch in process_in_batches(data, batch_size):
        transformed_batch = transform_data(batch)
        print("Batch before loading:")
        for record in transformed_batch:
            print(record)
        load_data(transformed_batch)
        time.sleep(1)  # Simulate time delay between batches

if __name__ == "__main__":
    main()

Original data: [{'id': 912, 'value': 37.514101941646594}, {'id': 434, 'value': 91.01836485437836}, {'id': 892, 'value': 44.9738569055606}, {'id': 62, 'value': 53.02668536038415}, {'id': 295, 'value': 50.1038323916566}, {'id': 599, 'value': 13.509679120266217}, {'id': 557, 'value': 98.8388533743878}, {'id': 177, 'value': 48.36796342015495}, {'id': 775, 'value': 1.4574926189093729}, {'id': 849, 'value': 87.19827309065298}, {'id': 232, 'value': 99.76613535855212}, {'id': 726, 'value': 64.42022229042259}, {'id': 945, 'value': 9.441939638170737}, {'id': 5, 'value': 74.07366974076345}, {'id': 514, 'value': 25.586711735500568}, {'id': 249, 'value': 74.7778540143523}, {'id': 405, 'value': 8.726823839134369}, {'id': 687, 'value': 59.89786238726163}, {'id': 947, 'value': 82.54217960300006}, {'id': 686, 'value': 93.56489728123313}, {'id': 833, 'value': 86.25237082895885}, {'id': 195, 'value': 82.72211109669986}, {'id': 887, 'value': 10.297149280471697}, {'id': 223, 'value': 33.79289093614036}, {'

Ingest Data In Streaming Mode

In [2]:
import time
import random

# Step 1: Generate Mock Data Continuously
def generate_mock_data():
    while True:
        record = {
            'id': random.randint(1, 1000),
            'value': random.random() * 100
        }
        yield record
        time.sleep(0.5)  # Simulate data arriving every 0.5 seconds

# Step 2: Stream Processing
def process_stream():
    for record in generate_mock_data():
        transformed_record = transform_data(record)
        load_data(transformed_record)

# Step 3: Transform Data
def transform_data(record):
    transformed_record = {
        'id': record['id'],
        'value': record['value'],
        'transformed_value': record['value'] * 1.1  # Example transformation
    }
    return transformed_record

# Step 4: Load Data
def load_data(record):
    # Simulate loading data into a database
    print(f"Loading record into database: {record}")

# Main Function
def main():
    process_stream()

if __name__ == "__main__":
    main()

Loading record into database: {'id': 899, 'value': 94.72406524940448, 'transformed_value': 104.19647177434493}
Loading record into database: {'id': 246, 'value': 50.167885481568185, 'transformed_value': 55.18467402972501}
Loading record into database: {'id': 640, 'value': 6.329472801399183, 'transformed_value': 6.9624200815391015}
Loading record into database: {'id': 169, 'value': 29.519661960438125, 'transformed_value': 32.47162815648194}
Loading record into database: {'id': 783, 'value': 33.21555683990053, 'transformed_value': 36.537112523890585}
Loading record into database: {'id': 754, 'value': 34.455181598374075, 'transformed_value': 37.90069975821149}
Loading record into database: {'id': 513, 'value': 27.04451241043525, 'transformed_value': 29.74896365147878}
Loading record into database: {'id': 309, 'value': 66.52997051196408, 'transformed_value': 73.18296756316049}
Loading record into database: {'id': 901, 'value': 41.59906696215522, 'transformed_value': 45.75897365837075}
Load

KeyboardInterrupt: 

Semi Real Time Data Ingestion

In [3]:
import time
import random
from collections import deque

# Step 1: Generate Mock Data Continuously
def generate_mock_data():
    while True:
        record = {
            'id': random.randint(1, 100),
            'value': random.random() * 10
        }
        yield record
        time.sleep(0.1)  # Simulate data arriving every 0.1 seconds

# Step 2: Process Semi-Real-Time
def process_semi_real_time(batch_size, interval):
    buffer = deque()
    start_time = time.time()

    for record in generate_mock_data():
        buffer.append(record)

        # Check if interval has elapsed or buffer size reached
        if (time.time() - start_time) >= interval or len(buffer) >= batch_size:
            # Process and clear the buffer
            transformed_batch = transform_data(list(buffer))  # Convert deque to list
            print(f"Batch of {len(transformed_batch)} records before loading:")
            for rec in transformed_batch:
                print(rec)
            load_data(transformed_batch)
            buffer.clear()
            start_time = time.time()  # Reset start time

# Step 3: Transform Data
def transform_data(batch):
    transformed_batch = []
    for record in batch:
        transformed_record = {
            'id': record['id'],
            'value': record['value'],
            'transformed_value': record['value'] * 1.1  # Example transformation
        }
        transformed_batch.append(transformed_record)
    return transformed_batch

# Step 4: Load Data
def load_data(batch):
    for record in batch:
        # Simulate loading data into a database
        print(f"Loading record into database: {record}")

# Main Function
def main():
    batch_size = 5    # Number of records to process per batch
    interval = 3.0    # Maximum time interval (in seconds) to process a batch

    process_semi_real_time(batch_size, interval)

if __name__ == "__main__":
    main()

Batch of 5 records before loading:
{'id': 23, 'value': 1.6636018712748069, 'transformed_value': 1.8299620584022878}
{'id': 13, 'value': 4.611398678485809, 'transformed_value': 5.072538546334391}
{'id': 36, 'value': 2.0950356384098345, 'transformed_value': 2.304539202250818}
{'id': 28, 'value': 4.17391204853299, 'transformed_value': 4.591303253386289}
{'id': 19, 'value': 0.08059572684749328, 'transformed_value': 0.08865529953224262}
Loading record into database: {'id': 23, 'value': 1.6636018712748069, 'transformed_value': 1.8299620584022878}
Loading record into database: {'id': 13, 'value': 4.611398678485809, 'transformed_value': 5.072538546334391}
Loading record into database: {'id': 36, 'value': 2.0950356384098345, 'transformed_value': 2.304539202250818}
Loading record into database: {'id': 28, 'value': 4.17391204853299, 'transformed_value': 4.591303253386289}
Loading record into database: {'id': 19, 'value': 0.08059572684749328, 'transformed_value': 0.08865529953224262}
Batch of 5 re

KeyboardInterrupt: 

Queues

In [4]:
from queue import Queue

def read_message_queue():
    q = Queue()

    # Adding messages to the queue
    for i in range(10):  # Mocking messages
        q.put(f"message {i}")

    # Reading and processing messages from the queue
    while not q.empty():
        message = q.get()
        process_message(message)
        q.task_done()  # Signal that the task is done

def process_message(message):
    print(f"Processing message: {message}")

# Example usage
read_message_queue()

Processing message: message 0
Processing message: message 1
Processing message: message 2
Processing message: message 3
Processing message: message 4
Processing message: message 5
Processing message: message 6
Processing message: message 7
Processing message: message 8
Processing message: message 9


Ingesting Data from SQL Database

In [5]:
def read_sql():
    # Simulating a SQL table with a dictionary
    sql_table = [
        {"id": 1, "name": "Alice", "age": 30},
        {"id": 2, "name": "Bob", "age": 24},
    ]
    for row in sql_table:
        process_row(row)

def process_row(row):
    print(f"Processing row: id={row['id']}, name={row['name']}, age={row['age']}")

# Example usage
read_sql()

Processing row: id=1, name=Alice, age=30
Processing row: id=2, name=Bob, age=24


Data Ingestion using a No-SQl Database

In [6]:
def read_nosql():
    data_store = {
        "1": {"name": "Alice", "age": 30},
        "2": {"name": "Bob", "age": 24},
    }
    for key, value in data_store.items():
        process_entry(key, value)

def process_entry(key, value):
    print(f"Processing key: {key} with value: {value}")

# Example usage
read_nosql()

Processing key: 1 with value: {'name': 'Alice', 'age': 30}
Processing key: 2 with value: {'name': 'Bob', 'age': 24}


Data Ingestion Via API

In [7]:
import requests
import pandas as pd

# Define the API endpoint URL
url = "https://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita"

# Make the API request
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Extract the response JSON data
    data = response.json()

    # Check if the API response contains cocktails data
    if 'drinks' in data:
        # Create DataFrame from drinks data
        df = pd.DataFrame(data['drinks'])

        # Print the resulting DataFrame
        print(df.head())
    else:
        print("No drinks found.")
else:
    print(f"Failed to retrieve data from API. Status code: {response.status_code}")

  idDrink              strDrink strDrinkAlternate                  strTags  \
0   11007             Margarita              None  IBA,ContemporaryClassic   
1   11118        Blue Margarita              None                     None   
2   17216     Tommy's Margarita              None               IBA,NewEra   
3   16158    Whitecap Margarita              None                     None   
4   12322  Strawberry Margarita              None                     None   

  strVideo      strCategory                 strIBA strAlcoholic  \
0     None   Ordinary Drink  Contemporary Classics    Alcoholic   
1     None   Ordinary Drink                   None    Alcoholic   
2     None   Ordinary Drink         New Era Drinks    Alcoholic   
3     None  Other / Unknown                   None    Alcoholic   
4     None   Ordinary Drink                   None    Alcoholic   

                   strGlass  \
0            Cocktail glass   
1            Cocktail glass   
2       Old-Fashioned glass   
3  M