# Assignment-5

# Data Ingestion Pipeline

1. Design a data ingestion pipeline that collects and stores data from various sources such as databases, APIs, and streaming platforms.
2. Implement a real-time data ingestion pipeline for processing sensor data from IoT devices.
3. Develop a data ingestion pipeline that handles data from different file formats (CSV, JSON, etc.) and performs data validation and cleansing.


In [None]:
# 1
# Example code for data ingestion pipeline


# Import necessary libraries
import requests
import json
import csv
import pandas as pd
from datetime import datetime

# Database ingestion
def ingest_from_database():
    # Code for connecting to the database and extracting data
    # ...

    # Return the extracted data
    return extracted_data

# API ingestion
def ingest_from_api():
    # Code for making API requests and fetching data
    # ...

    # Return the fetched data
    return fetched_data

# Streaming platform ingestion
def ingest_from_streaming_platform():
    # Code for subscribing to the streaming platform and receiving data
    # ...

    # Return the received data
    return received_data

# Data ingestion pipeline
def data_ingestion_pipeline():
    # Ingest data from database
    database_data = ingest_from_database()

    # Ingest data from API
    api_data = ingest_from_api()

    # Ingest data from streaming platform
    streaming_data = ingest_from_streaming_platform()

    # Store the collected data in a suitable storage system
    store_data(database_data, api_data, streaming_data)

# Store data
def store_data(*datasets):
    # Code for storing data in a storage system
    # ...

    # Example: Store data in a CSV file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"data_{timestamp}.csv"

    for i, dataset in enumerate(datasets):
        dataset.to_csv(filename, index=False, mode='a' if i > 0 else 'w')

# Run the data ingestion pipeline
data_ingestion_pipeline()


In [None]:
# 2
# Example code for real-time data ingestion pipeline


# Import necessary libraries
import paho.mqtt.client as mqtt

# Define MQTT callback functions
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    # Subscribe to the desired topic(s)
    client.subscribe("sensors/#")

def on_message(client, userdata, msg):
    # Process the received sensor data
    process_sensor_data(msg.payload)

# Process sensor data
def process_sensor_data(data):
    # Code for processing the sensor data
    # ...

    # Example: Print the received data
    print(data)

# Configure and connect to the MQTT broker
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("mqtt.broker.com", 1883, 60)

# Loop to continuously listen for incoming messages
client.loop_forever()


In [None]:
# 3
# Example code for data ingestion pipeline handling different file formats


# Import necessary libraries
import pandas as pd

# CSV ingestion
def ingest_from_csv(file_path):
    # Read the CSV file into a DataFrame
    data = pd.read_csv(file_path)

    # Perform data validation and cleansing
    # ...

    # Return the processed data
    return data

# JSON ingestion
def ingest_from_json(file_path):
    # Read the JSON file into a DataFrame
    data = pd.read_json(file_path)

    # Perform data validation and cleansing
    # ...

    # Return the processed data
    return data

# Data ingestion pipeline for handling different file formats
def data_ingestion_pipeline(file_path, file_format):
    if file_format == 'csv':
        data = ingest_from_csv(file_path)
    elif file_format == 'json':
        data = ingest_from_json(file_path)
    else:
        raise ValueError("Unsupported file format.")

    # Store the processed data or perform further operations
    # ...

# Run the data ingestion pipeline
data_ingestion_pipeline('data.csv', 'csv')
data_ingestion_pipeline('data.json', 'json')


## Model Training:

4. Build a machine learning model to predict customer churn based on a given dataset. Train the model using appropriate algorithms and evaluate its performance.
5. Develop a model training pipeline that incorporates feature engineering techniques such as one-hot encoding, feature scaling, and dimensionality reduction.
6. Train a deep learning model for image classification using transfer learning and fine-tuning techniques.


In [None]:
# 4
# Example code for building a customer churn prediction model

# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Load the dataset
dataset = pd.read_csv('churn_dataset.csv')

# Split the dataset into features (X) and target variable (y)
X = dataset.drop('churn', axis=1)
y = dataset['churn']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the logistic regression model
model = LogisticRegression()

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)


In [None]:
# 5
# Example code for a model training pipeline with feature engineering

# Import necessary libraries
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Load the dataset
dataset = pd.read_csv('dataset.csv')

# Split the dataset into features (X) and target variable (y)
X = dataset.drop('target', axis=1)
y = dataset['target']

# Perform one-hot encoding on categorical features
encoder = OneHotEncoder()
X_encoded = encoder.fit_transform(X)

# Perform feature scaling on numerical features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_encoded)

# Perform dimensionality reduction
pca = PCA(n_components=10)
X_reduced = pca.fit_transform(X_scaled)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_reduced, y, test_size=0.2, random_state=42)

# Initialize the logistic regression model
model = LogisticRegression()

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)


In [None]:
# 6
# Example code for training a deep learning model with transfer learning

# Import necessary libraries
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model

# Load the pre-trained VGG16 model
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Freeze the layers in the base model
for layer in base_model.layers:
    layer.trainable = False

# Add custom dense layers for the classification task
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x)
predictions = Dense(num_classes, activation='softmax')(x)

# Create the model
model = Model(inputs=base_model.input, outputs=predictions)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Data augmentation and preprocessing
train_datagen = ImageDataGenerator(rescale=1./255, shear_range=0.2, zoom_range=0.2, horizontal_flip=True)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(train_dir, target_size=(224, 224), batch_size=batch_size, class_mode='categorical')
validation_generator = test_datagen.flow_from_directory(validation_dir, target_size=(224, 224), batch_size=batch_size, class_mode='categorical')

# Train the model
model.fit(train_generator, epochs=num_epochs, validation_data=validation_generator)

# Save the trained model
model.save('trained_model.h5')


## Model Validation

7. Implement cross-validation to evaluate the performance of a regression model for predicting housing prices.
8. Perform model validation using different evaluation metrics such as accuracy, precision, recall, and F1 score for a binary classification problem.
9. Design a model validation strategy that incorporates stratified sampling to handle imbalanced datasets.


In [None]:
#7
# Example code for cross-validation of a regression model

import numpy as np
from sklearn.model_selection import cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.datasets import load_boston

# Load the Boston housing dataset
X, y = load_boston(return_X_y=True)

# Initialize the regression model
model = LinearRegression()

# Perform cross-validation
scores = cross_val_score(model, X, y, cv=5, scoring='neg_mean_squared_error')

# Calculate the mean squared error
mse_scores = -scores
mean_mse = np.mean(mse_scores)

print("Mean Squared Error:", mean_mse)


In [None]:
#8
# Example code for model validation using evaluation metrics

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.datasets import load_breast_cancer

# Load the breast cancer dataset
X, y = load_breast_cancer(return_X_y=True)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the logistic regression model
model = LogisticRegression()

# Train the model
model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = model.predict(X_test)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)


In [None]:
#9
# Example code for model validation with stratified sampling for imbalanced datasets

from sklearn.model_selection import StratifiedKFold
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris

# Load the Iris dataset
X, y = load_iris(return_X_y=True)

# Initialize the support vector classifier
model = SVC()

# Perform stratified k-fold cross-validation
skf = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)

accuracies = []
for train_index, test_index in skf.split(X, y):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

    # Train the model
    model.fit(X_train, y_train)

    # Make predictions on the test set
    y_pred = model.predict(X_test)

    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    accuracies.append(accuracy)

# Calculate the mean accuracy
mean_accuracy = np.mean(accuracies)

print("Mean Accuracy:", mean_accuracy)


##  Deployment Strategy:

10. Create a deployment strategy for a machine learning model that provides real-time recommendations based on user interactions.
11. Develop a deployment pipeline that automates the process of deploying machine learning models to cloud platforms such as AWS or Azure.
12. Design a monitoring and maintenance strategy for deployed models to ensure their performance and reliability over time.


#### Solution

10. Creating a deployment strategy for a machine learning model that provides real-time recommendations based on user interactions would depend on the specific requirements and infrastructure in place. It may involve designing APIs, microservices, or event-driven architectures. Providing a specific code example without more context would be challenging.

In [None]:
#11
# Example code for deploying a machine learning model using AWS SageMaker

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier

# Load the Iris dataset
X, y = load_iris(return_X_y=True)

# Initialize the random forest classifier
model = RandomForestClassifier()

# Train the model
model.fit(X, y)

# Save the model
model_path = "/path/to/saved_model"
model.save(model_path)

# Set up SageMaker session and role
sagemaker_session = sagemaker.Session()
role = get_execution_role()

# Create a SageMaker model
sagemaker_model = sagemaker_session.create_model(model_path=model_path, role=role)

# Deploy the model to an endpoint
endpoint_name = "your-endpoint-name"
predictor = sagemaker_model.deploy(initial_instance_count=1, instance_type="ml.m5.large", endpoint_name=endpoint_name)

# Make predictions using the deployed model
input_data = [[5.1, 3.5, 1.4, 0.2], [6.2, 3.4, 5.4, 2.3]]
predictor.content_type = "text/csv"
predictor.serializer = csv_serializer
predictions = predictor.predict(input_data)

print("Predictions:", predictions)

# Delete the endpoint
predictor.delete_endpoint()


12. Designing a monitoring and maintenance strategy for deployed models to ensure their performance and reliability over time would involve setting up monitoring tools, logging, alerting mechanisms, and periodic model retraining. Providing a specific code example without more context would be challenging.

