# Assignment DS-06

# 1. Data Ingestion Pipeline:

### a. Design a data ingestion pipeline that collects and stores data from various sources such as databases, APIs, and streaming platforms.
```python
import pymongo

def collect_data(database, collection):
  """Collects data from the specified MongoDB database and collection."""
  client = pymongo.MongoClient()
  db = client[database]
  collection = db[collection]
  documents = collection.find()
  return documents

def store_data(documents, output_file):
  """Stores the collected data in the specified output file."""
  with open(output_file, "w") as f:
    for document in documents:
      f.write(json.dumps(document))

def main():
  """The main function that runs the data ingestion pipeline."""
  database = "my_database"
  collection = "my_collection"
  output_file = "data.json"
  documents = collect_data(database, collection)
  store_data(documents, output_file)

if __name__ == "__main__":
  main()
  ```


##  b. Implement a real-time data ingestion pipeline for processing sensor data from IoT devices.

```python
import os
import sys
from src.logger import logging
from src.exception import CustomException

import pandas as pd
from sklearn.model_selection import train_test_split

from dataclasses import dataclass

## intitialize the Data Ingestion configuration

@dataclass
class DataIngestionconfig:
    train_data_path:str=os.path.join('artifacts','train.csv')
    test_data_path:str=os.path.join('artifacts','test.csv')
    raw_data_path:str=os.path.join('artifacts','raw.csv')

## create the data ingestion class

class DataIngestion:
    def __init__(self):
        self.ingestion_config=DataIngestionconfig()

    def initiate_data_ingestion(self):
        logging.info('Data Ingestion method starts')

        try:
            df=pd.read_csv(os.path.join('notebooks/data','gemstone.csv'))
            logging.info('Dataset read as pandas Dataframe')

            os.makedirs(os.path.dirname(self.ingestion_config.raw_data_path),exist_ok=True)
            df.to_csv(self.ingestion_config.raw_data_path,index=False)

            logging.info('Raw data is created')

            train_set,test_set=train_test_split(df,test_size=0.30,random_state=42)

            train_set.to_csv(self.ingestion_config.train_data_path,index=False,header=True)
            test_set.to_csv(self.ingestion_config.test_data_path,index=False,header=True)

            logging.info('Ingestion of Data is completed')

            return(
                self.ingestion_config.train_data_path,
                self.ingestion_config.test_data_path

            )

        except Exception as e:
            logging.info('Exception occured at Data Ingestion Stage')
            raise CustomException(e,sys)

    ```

## c. Develop a data ingestion pipeline that handles data from different file formats (CSV, JSON, etc.) and performs data validation and cleansing.

```python
import csv
import json
import re

def load_data(file_path):
  """Loads data from the specified file."""
  if file_path.endswith(".csv"):
    return load_csv_data(file_path)
  elif file_path.endswith(".json"):
    return load_json_data(file_path)
  else:
    raise ValueError("Unsupported file format: " + file_path)

def load_csv_data(file_path):
  """Loads data from a CSV file."""
  with open(file_path, "r") as f:
    reader = csv.reader(f, delimiter=",")
    data = list(reader)
  return data

def load_json_data(file_path):
  """Loads data from a JSON file."""
  with open(file_path, "r") as f:
    data = json.load(f)
  return data

def validate_data(data):
  """Validates the data and raises an exception if the data is invalid."""
  for row in data:
    for field in row:
      if not re.match(r"^\d+$", field):
        raise ValueError("Invalid data: " + field)

def cleanse_data(data):
  """Cleanses the data by removing invalid characters."""
  for row in data:
    for i, field in enumerate(row):
      row[i] = field.replace(",", "")

def main():
  """The main function that runs the data ingestion pipeline."""
  file_path = "data.csv"
  data = load_data(file_path)
  validate_data(data)
  cleanse_data(data)

if __name__ == "__main__":
  main()
```


# 2. Model Training:

##  a. Build a machine learning model to predict customer churn based on a given dataset. Train the model using appropriate algorithms and evaluate its performance.

```python
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def build_model(data):
  """Builds a machine learning model to predict customer churn."""
  # Split the data into a training set and a test set.
  X_train, X_test, y_train, y_test = train_test_split(data, data["Churn"], test_size=0.2)

  # Create a logistic regression model.
  model = LogisticRegression()

  # Train the model on the training set.
  model.fit(X_train, y_train)

  # Evaluate the model on the test set.
  y_pred = model.predict(X_test)
  accuracy = accuracy_score(y_test, y_pred)

  return model, accuracy

def main():
  """The main function that builds and evaluates the model."""
  data = pd.read_csv("churn_data.csv")
  model, accuracy = build_model(data)
  print("Accuracy:", accuracy)

if __name__ == "__main__":
  main()
```

##  b. Develop a model training pipeline that incorporates feature engineering techniques such as one-hot encoding, feature scaling, and dimensionality reduction.

```python
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, StandardScaler, PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

def build_model(data):
  """Builds a machine learning model to predict customer churn."""
  # One-hot encode the categorical features.
  encoder = OneHotEncoder()
  X_cat = encoder.fit_transform(data[["Gender", "Location"]])

  # Scale the numerical features.
  scaler = StandardScaler()
  X_num = scaler.fit_transform(data[["Age", "Tenure"]])

  # Combine the categorical and numerical features.
  X = np.concatenate([X_cat, X_num], axis=1)

  # Reduce the dimensionality of the data.
  pca = PCA(n_components=5)
  X = pca.fit_transform(X)

  # Create a logistic regression model.
  model = LogisticRegression()

  # Train the model on the training set.
  model.fit(X_train, y_train)

  # Evaluate the model on the test set.
  y_pred = model.predict(X_test)
  accuracy = accuracy_score(y_test, y_pred)

  return model, accuracy

def main():
  """The main function that builds and evaluates the model."""
  data = pd.read_csv("churn_data.csv")
  X_train, X_test, y_train, y_test = train_test_split(data, data["Churn"], test_size=0.2)
  model, accuracy = build_model(data)
  print("Accuracy:", accuracy)

if __name__ == "__main__":
  main()
```

##  c. Train a deep learning model for image classification using transfer learning and fine-tuning techniques.

```python
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Model

def train_model(data):
  """Trains a deep learning model for image classification."""
  # Load the VGG16 model.
  base_model = VGG16(weights="imagenet", include_top=False)

  # Freeze the base model.
  for layer in base_model.layers:
    layer.trainable = False

  # Add a new classification head.
  x = base_model.output
  x = Flatten()(x)
  x = Dense(10, activation="softmax")(x)

  # Create the model.
  model = Model(base_model.input, x)

  # Train the model.
  model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
  model.fit(data["train_images"], data["train_labels"], epochs=10)

  # Evaluate the model.
  score = model.evaluate(data["test_images"], data["test_labels"], verbose=0)
  print("Test accuracy:", score[1])

def main():
  """The main function that trains and evaluates the model."""
  data = load_data()
  train_model(data)

if __name__ == "__main__":
  main()
```

# 3. Model Validation:

##  a. Implement cross-validation to evaluate the performance of a regression model for predicting housing prices.

```python
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

def cross_validate(data):
  """Cross-validates a regression model for predicting housing prices."""
  # Create a KFold object.
  kf = KFold(n_splits=10, shuffle=True, random_state=42)

  # Create a linear regression model.
  model = LinearRegression()

  # Evaluate the model using cross-validation.
  scores = []
  for train_index, test_index in kf.split(data):
    X_train, X_test = data.iloc[train_index], data.iloc[test_index]
    y_train, y_test = X_train["Price"], X_test["Price"]
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    scores.append(mse)

  return scores

def main():
  """The main function that runs the cross-validation."""
  data = pd.read_csv("housing_data.csv")
  scores = cross_validate(data)
  print("Mean squared error:", np.mean(scores))

if __name__ == "__main__":
  main()
```

## b. Perform model validation using different evaluation metrics such as accuracy, precision, recall, and F1 score for a binary classification problem.

```python
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_model(model, X_test, y_test):
  """Evaluates a model using different evaluation metrics."""
  # Calculate the accuracy.
  accuracy = accuracy_score(y_test, model.predict(X_test))

  # Calculate the precision.
  precision = precision_score(y_test, model.predict(X_test))

  # Calculate the recall.
  recall = recall_score(y_test, model.predict(X_test))

  # Calculate the F1 score.
  f1 = f1_score(y_test, model.predict(X_test))

  # Print the evaluation metrics.
  print("Accuracy:", accuracy)
  print("Precision:", precision)
  print("Recall:", recall)
  print("F1 score:", f1)

def main():
  """The main function that runs the model validation."""
  data = pd.read_csv("binary_classification_data.csv")
  X_train, X_test, y_train, y_test = train_test_split(data, data["Label"], test_size=0.2)
  model = LogisticRegression()
  model.fit(X_train, y_train)
  evaluate_model(model, X_test, y_test)

if __name__ == "__main__":
  main()
```

##   c. Design a model validation strategy that incorporates stratified sampling to handle imbalanced datasets.

```python
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def stratified_validation(data):
  """Performs stratified validation on a imbalanced dataset."""
  # Create a StratifiedKFold object.
  kf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

  # Evaluate the model using stratified validation.
  scores = []
  for train_index, test_index in kf.split(data, data["Label"]):
    X_train, X_test = data.iloc[train_index], data.iloc[test_index]
    y_train, y_test = X_train["Label"], X_test["Label"]
    model = LogisticRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred)
    rec = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    scores.append((acc, prec, rec, f1))

  return scores

def main():
  """The main function that runs the stratified validation."""
  data = pd.read_csv("imbalanced_classification_data.csv")
  scores = stratified_validation(data)
  print("Accuracy:", np.mean(scores[0]))
  print("Precision:", np.mean(scores[1]))
  print("Recall:", np.mean(scores[2]))
  print("F1 score:", np.mean(scores[3]))

if __name__ == "__main__":
  main()
```

# 4. Deployment Strategy:

## a. Create a deployment strategy for a machine learning model that provides real-time recommendations based on user interactions.

When deploying a machine learning model that provides real-time recommendations based on user interactions, you need to consider several factors to ensure a reliable and scalable deployment strategy. Here's an example of a deployment strategy for such a model:

1. **Preprocessing and Feature Engineering**: Prepare the data by performing any necessary preprocessing steps and feature engineering to transform the raw user interaction data into meaningful features that can be used by the machine learning model.

2. **Model Training**: Train the machine learning model using historical user interaction data. Consider using techniques such as collaborative filtering, matrix factorization, or deep learning models like recurrent neural networks (RNNs) or transformers to generate recommendations based on user behavior patterns.

3. **Model Persistence**: Save the trained model to a persistent storage system or file format that can be easily loaded during the deployment process. Common options include pickle files, TensorFlow SavedModel, or ONNX format for interoperability.

4. **Real-time Data Collection**: Set up a mechanism to collect real-time user interaction data. This can be achieved by integrating with tracking systems or by capturing events from user interactions using tools like Apache Kafka, RabbitMQ, or cloud-based event streaming platforms.

5. **Scaling and Performance**: Ensure that your deployment strategy can handle high volumes of incoming user interactions. Consider using distributed processing frameworks like Apache Spark or scalable cloud-based solutions that can handle large-scale data processing and model serving.

6. **Infrastructure and Deployment Options**: Choose the appropriate infrastructure and deployment options based on your requirements and resources. Some options include:
   - **Cloud Services**: Utilize managed services like AWS Lambda, Google Cloud Functions, or Azure Functions for serverless deployments.
   - **Containerization**: Package your model into a container using Docker and deploy it using container orchestration platforms like Kubernetes or cloud-based container services.
   - **Microservices Architecture**: Break down your deployment into microservices, where each service handles a specific aspect of the recommendation system (e.g., data collection, feature generation, model serving).
   - **API-based Deployment**: Build a RESTful API using frameworks like Flask or Django to expose the recommendation functionality for integration with other systems.

7. **Model Serving**: Set up a service that loads the trained model into memory and serves real-time recommendations based on incoming user interactions. This service should be scalable, reliable, and able to handle concurrent requests efficiently. Consider using technologies like TensorFlow Serving, FastAPI, or Flask for model serving.

8. **Monitoring and Logging**: Implement a monitoring system to track the health and performance of the deployed model. This includes monitoring data quality, tracking latency, monitoring prediction accuracy, and logging relevant metrics for troubleshooting and analysis.

9. **A/B Testing and Experimentation**: Consider implementing an A/B testing framework to evaluate the performance of different recommendation strategies. This allows you to test different variations of the model or algorithms and collect feedback to continuously improve the recommendation system.

10. **Versioning and Model Updates**: Establish a versioning system for your models and implement a mechanism to handle model updates and deployment of new versions. This ensures that you can seamlessly roll out improvements or changes to the recommendation system without disrupting the user experience.

11. **Security and Privacy**: Address security and privacy concerns when dealing with user data. Implement appropriate measures such as data encryption, access controls, and anonymization techniques to protect user privacy and secure sensitive information.

12. **Continuous Integration and Deployment (CI/CD)**: Automate the deployment process using CI/CD practices to ensure efficient and consistent deployments. This includes version control, automated testing, and continuous integration pipelines for streamlined updates and releases.



## b. Develop a deployment pipeline that automates the process of deploying machine learning models to cloud platforms such as AWS or Azure.

```python
import os
import sys
import boto3

def deploy_model(model_path):
  """Deploys a machine learning model to AWS Lambda."""
  client = boto3.client("lambda")
  response = client.create_function(
     FunctionName="my-lambda-function",
     Runtime="python3.7",
     Handler="index.handler",
     Code=os.path.join(model_path, "model.zip"),
     Timeout=300,
  )

if __name__ == "__main__":
  if len(sys.argv) != 2:
    print("Usage: deploy_model.py <model_path>")
    sys.exit(1)

  model_path = sys.argv[1]
  deploy_model(model_path)
```


##  c. Design a monitoring and maintenance strategy for deployed models to ensure their performance and reliability over time.

Designing a monitoring and maintenance strategy for deployed machine learning models is crucial to ensure their performance and reliability over time. Here's an example of a strategy that covers key aspects of monitoring and maintenance:

1. **Define Key Performance Metrics**: Determine the relevant performance metrics for your machine learning model based on the problem domain. This may include metrics like accuracy, precision, recall, F1 score, or custom evaluation metrics specific to your use case.

2. **Data Quality Monitoring**: Continuously monitor the quality of the input data fed into the model. This involves checking for missing values, data inconsistencies, drifts, or outliers that could affect the model's performance. Implement mechanisms to detect and alert anomalies in the data.

3. **Model Performance Monitoring**: Monitor the model's performance in real-time or at regular intervals. Track the selected performance metrics and set up thresholds or alerts for abnormal behavior. This helps identify when the model's performance degrades or when recalibration is required.

4. **Error Analysis and Logging**: Log predictions and errors made by the model in production. Analyze these logs to understand the types of errors the model encounters and identify patterns or specific scenarios where the model performs poorly. This information can guide future improvements or adjustments to the model.

5. **Model Drift Detection**: Continuously monitor for concept drift or data distribution changes over time. Compare the incoming data distribution with the training data distribution to identify potential shifts. If significant drift is detected, retraining or model updating may be necessary.

6. **Model Retraining and Updating**: Establish a process for model retraining or updating. Define triggers for initiating retraining, such as degradation in performance beyond a certain threshold or when significant data drift is observed. Set up a pipeline to automate the retraining process, including data collection, preprocessing, retraining, and deployment of the updated model.

7. **Regular Model Evaluation**: Conduct periodic model evaluations using a holdout dataset or by employing techniques such as cross-validation. This allows you to assess the model's performance over time and identify the need for retraining or potential bias issues that may arise.

8. **Security and Privacy Auditing**: Regularly review and audit the security and privacy measures implemented to protect the deployed models and the associated data. Stay up-to-date with the latest security practices and regulations to ensure compliance and maintain data integrity and user privacy.

9. **Version Control and Tracking**: Implement a version control system to track model versions, code changes, and associated artifacts. This facilitates traceability and makes it easier to roll back to previous versions if necessary.

10. **Documentation and Knowledge Sharing**: Maintain thorough documentation about the deployed models, including information on their architecture, data sources, preprocessing steps, and relevant training details. Encourage knowledge sharing within the team to ensure a shared understanding of the models and their maintenance requirements.

11. **Feedback Loop and User Engagement**: Establish channels for receiving user feedback and monitor user satisfaction with the deployed models. Actively engage with users to understand their experiences, collect feedback on recommendations, and identify areas for improvement.

12. **Continual Improvement**: Foster a culture of continual improvement by regularly reviewing the monitoring process, incorporating lessons learned, and making enhancements to the deployed models and their supporting infrastructure.



# Thank You!