# Ds_ASSIGNMENT_06

`1. Data Ingestion Pipeline:`

#### a. Design a data ingestion pipeline that collects and stores data from various sources such as databases, APIs, and streaming platforms.

To design a data ingestion pipeline in Python that collects and stores data from various sources, such as databases, APIs, and streaming platforms, you can follow these general steps:

1. Identify Data Sources:
- Determine the data sources you want to collect from, such as databases, APIs, or streaming platforms.
- Understand the data formats, access methods, and any authentication or authorization requirements for each source.

2. Choose Data Collection Tools and Libraries:
- Select appropriate Python libraries and tools that can interact with the different data sources.
- For databases, you can use libraries like SQLAlchemy, psycopg2, or pymongo, depending on the database type.
- For APIs, libraries like requests or specialized API wrappers can be used.
- For streaming platforms, libraries like Kafka-Python, Pulsar, or Apache Beam can be utilized.

3. Establish Connection and Authentication:
- Set up connections to the data sources using appropriate connection parameters or credentials.
- Configure authentication methods, such as API keys or OAuth tokens, as required by the data sources.

4. Fetch and Collect Data:
- Write functions or classes to fetch data from each source, using the corresponding libraries.
- For databases, you can write SQL queries or use ORM (Object-Relational Mapping) techniques.
- For APIs, make HTTP requests and process the responses to extract the required data.
- For streaming platforms, set up consumer or subscriber clients to consume the data stream.

5. Handle Data Transformations and Preprocessing:
- Perform any necessary data transformations or preprocessing steps to clean, format, or enrich the collected data.
- Use appropriate libraries for data manipulation, cleaning, and transformation, such as pandas or NumPy.

6. Define Storage Mechanisms:
- Determine the storage mechanisms based on your requirements, such as databases, data lakes, or file systems.
- Choose suitable storage technologies like PostgreSQL, MySQL, MongoDB, Apache Hadoop, Apache Parquet, or Amazon S3.

7. Write Data to Storage:
- Develop code to write the collected and processed data to the chosen storage mechanisms.
- Utilize appropriate libraries or database connectors to insert or write the data.
- Ensure data integrity, consistency, and error handling during the writing process.

8. Implement Scheduling and Automation:
- Set up scheduling mechanisms, such as cron jobs or task schedulers, to automate the data ingestion pipeline.
- Determine the frequency of data collection and define the intervals or triggers accordingly.

9. Implement Error Handling and Logging:
- Include error handling mechanisms to handle exceptions or failures during data collection or storage.
- Use logging frameworks, such as Python's built-in logging module or third-party libraries like loguru or structlog, to log pipeline activities, errors, and information.

10. Monitor and Maintain:
- Monitor the data ingestion pipeline for performance, data quality, and any potential issues.
- Implement monitoring and alerting mechanisms to identify and address any pipeline failures or anomalies.
- Regularly review and maintain the pipeline to adapt to changes in data sources or requirements.

Remember, the specific implementation details and libraries used may vary depending on the exact data sources, storage mechanisms, and requirements of your data ingestion pipeline.

Here's an example Python code snippet that demonstrates the data ingestion pipeline for collecting and storing data from various sources:

```python
import requests
import json
import psycopg2
from sqlalchemy import create_engine
import pymongo
from kafka import KafkaConsumer
import pandas as pd

# Connect to the database
db_conn = psycopg2.connect(database="your_database", user="your_username", password="your_password", host="localhost", port="5432")
db_cursor = db_conn.cursor()

# Connect to MongoDB
mongo_client = pymongo.MongoClient("mongodb://localhost:27017")
mongo_db = mongo_client["your_database"]
mongo_collection = mongo_db["your_collection"]

# Create an engine for SQLAlchemy
db_engine = create_engine('postgresql://your_username:your_password@localhost:5432/your_database')

# Connect to Kafka
consumer = KafkaConsumer('your_topic', bootstrap_servers=['localhost:9092'])

# Fetch data from API
response = requests.get('your_api_url')
api_data = json.loads(response.text)

# Process and store the API data
processed_api_data = process_api_data(api_data)
db_cursor.execute("INSERT INTO your_table (column1, column2) VALUES (%s, %s)", (processed_api_data['value1'], processed_api_data['value2']))
db_conn.commit()

# Fetch and process data from Kafka
for message in consumer:
    kafka_data = json.loads(message.value)
    processed_kafka_data = process_kafka_data(kafka_data)
    mongo_collection.insert_one(processed_kafka_data)

# Fetch data from a database table
query = "SELECT * FROM your_table"
df = pd.read_sql_query(query, db_engine)

# Perform data transformations and preprocessing
transformed_data = transform_data(df)

# Store the transformed data in a file
transformed_data.to_csv('transformed_data.csv', index=False)

# Close database connections and Kafka consumer
db_cursor.close()
db_conn.close()
mongo_client.close()
consumer.close()
```

Please note that this is just a basic example to give you an idea of how the data ingestion pipeline can be implemented in Python. You would need to customize and expand this code according to your specific requirements, including proper error handling, authentication, and other necessary components.

#### b. Implement a real-time data ingestion pipeline for processing sensor data from IoT devices.

To implement a real-time data ingestion pipeline for processing sensor data from IoT devices in Python, you can utilize a combination of technologies such as MQTT (Message Queuing Telemetry Transport) protocol, MQTT broker, and a Python MQTT client library. Here's an example code snippet that demonstrates the basic implementation of such a pipeline:

```python
import paho.mqtt.client as mqtt
import json
import time

# MQTT broker settings
broker_address = "mqtt_broker_address"
broker_port = 1883
topic = "your_topic"

# Define callback functions for MQTT events
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT broker with result code: " + str(rc))
    client.subscribe(topic)

def on_message(client, userdata, msg):
    payload = msg.payload.decode("utf-8")
    data = json.loads(payload)
    
    # Process and analyze the received sensor data
    process_sensor_data(data)

# Create an MQTT client instance
client = mqtt.Client()

# Set MQTT event callbacks
client.on_connect = on_connect
client.on_message = on_message

# Connect to the MQTT broker
client.connect(broker_address, broker_port, 60)

# Start the MQTT client loop to handle incoming messages
client.loop_start()

# Continuously process sensor data until interrupted
try:
    while True:
        time.sleep(1)

except KeyboardInterrupt:
    print("Interrupted, stopping data ingestion.")

# Disconnect from the MQTT broker
client.disconnect()
```

In the above code snippet, we're using the Paho MQTT client library (`paho.mqtt.client`) to connect to an MQTT broker, subscribe to a specific topic, and receive real-time sensor data messages from IoT devices. The `on_connect` and `on_message` callback functions handle the corresponding MQTT events. The `on_message` function processes and analyzes the received sensor data using the `process_sensor_data` function (which you can define as per your requirements).

To utilize this code, you need to replace `"mqtt_broker_address"` with the actual address of your MQTT broker, update the `"your_topic"` placeholder with the desired topic to subscribe to, and implement the `process_sensor_data` function to handle the received data.

Remember to install the `paho-mqtt` library before running the code. You can install it via pip using the command: `pip install paho-mqtt`.

Additionally, you may need to handle authentication, encryption, and other security aspects based on the MQTT broker configuration and requirements of your IoT infrastructure.

#### c. Develop a data ingestion pipeline that handles data from different file formats (CSV, JSON, etc.) and performs data validation and cleansing.

To develop a data ingestion pipeline in Python that handles data from different file formats (such as CSV, JSON) and performs data validation and cleansing, you can utilize libraries like `pandas` and `json`. Here's an example code snippet that demonstrates the basic implementation of such a pipeline:

```python
import pandas as pd
import json

# Function to read and process CSV files
def process_csv_file(file_path):
    try:
        df = pd.read_csv(file_path)
        
        # Perform data validation and cleansing operations on the DataFrame
        # ...
        
        # Save the cleaned data to a new file or perform further processing
        df.to_csv('cleaned_data.csv', index=False)
        
    except Exception as e:
        print(f"Error processing CSV file: {str(e)}")

# Function to read and process JSON files
def process_json_file(file_path):
    try:
        with open(file_path) as json_file:
            data = json.load(json_file)
        
        # Perform data validation and cleansing operations on the JSON data
        # ...
        
        # Save the cleaned data to a new file or perform further processing
        with open('cleaned_data.json', 'w') as json_output:
            json.dump(data, json_output)
        
    except Exception as e:
        print(f"Error processing JSON file: {str(e)}")

# File paths for example CSV and JSON files
csv_file_path = 'example.csv'
json_file_path = 'example.json'

# Process CSV file
process_csv_file(csv_file_path)

# Process JSON file
process_json_file(json_file_path)
```

In the above code snippet, we define two functions, `process_csv_file` and `process_json_file`, to handle CSV and JSON files, respectively. These functions use `pandas` and `json` libraries to read the files and perform data validation and cleansing operations.

Inside the functions, you can add your specific data validation and cleansing logic to handle missing values, incorrect data types, outliers, or any other necessary checks. Once the data is validated and cleaned, you can save it to a new file or perform further processing as per your requirements.

To use this code, replace the `csv_file_path` and `json_file_path` variables with the actual file paths of your CSV and JSON files, respectively. Also, make sure you have the necessary libraries (`pandas` and `json`) installed before running the code.

Remember to adapt the code based on the specific data validation and cleansing operations you need to perform on your files. This example provides a starting point, and you can modify and expand it according to your specific needs.

`2. Model Training:`

a. Build a machine learning model to predict customer churn based on a given dataset. Train the model using appropriate algorithms and evaluate its performance.<br>
b. Develop a model training pipeline that incorporates feature engineering techniques such as one-hot encoding, feature scaling, and dimensionality reduction.

In [78]:
#libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [36]:
df = pd.read_csv("WA_Fn-UseC_-Telco-Customer-Churn.csv").drop("customerID",axis=1)

In [37]:
df.head()

Unnamed: 0,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,Male,0,No,No,45,No,No phone service,DSL,Yes,No,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


In [38]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7043 entries, 0 to 7042
Data columns (total 20 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   gender            7043 non-null   object 
 1   SeniorCitizen     7043 non-null   int64  
 2   Partner           7043 non-null   object 
 3   Dependents        7043 non-null   object 
 4   tenure            7043 non-null   int64  
 5   PhoneService      7043 non-null   object 
 6   MultipleLines     7043 non-null   object 
 7   InternetService   7043 non-null   object 
 8   OnlineSecurity    7043 non-null   object 
 9   OnlineBackup      7043 non-null   object 
 10  DeviceProtection  7043 non-null   object 
 11  TechSupport       7043 non-null   object 
 12  StreamingTV       7043 non-null   object 
 13  StreamingMovies   7043 non-null   object 
 14  Contract          7043 non-null   object 
 15  PaperlessBilling  7043 non-null   object 
 16  PaymentMethod     7043 non-null   object 


In [39]:
for i in range(len(df)):
    try:
        float(df["TotalCharges"].iloc[i])
    except Exception as e:
        print(e)
        print(i)
        df["TotalCharges"].iloc[i] = "0"

could not convert string to float: ''
488
could not convert string to float: ''
753
could not convert string to float: ''
936
could not convert string to float: ''
1082
could not convert string to float: ''
1340
could not convert string to float: ''
3331
could not convert string to float: ''
3826
could not convert string to float: ''
4380
could not convert string to float: ''
5218
could not convert string to float: ''
6670
could not convert string to float: ''
6754


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["TotalCharges"].iloc[i] = "0"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["TotalCharges"].iloc[i] = "0"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["TotalCharges"].iloc[i] = "0"
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["TotalCharges"].iloc[i] = "0"
A value is trying to be set on a cop

In [40]:
df["TotalCharges"] = df["TotalCharges"].astype(float)

In [41]:
df["TotalCharges"] = df["TotalCharges"].replace(0 , np.mean(df["TotalCharges"]))

In [42]:
df.isnull().sum()

gender              0
SeniorCitizen       0
Partner             0
Dependents          0
tenure              0
PhoneService        0
MultipleLines       0
InternetService     0
OnlineSecurity      0
OnlineBackup        0
DeviceProtection    0
TechSupport         0
StreamingTV         0
StreamingMovies     0
Contract            0
PaperlessBilling    0
PaymentMethod       0
MonthlyCharges      0
TotalCharges        0
Churn               0
dtype: int64

In [43]:
df.Churn = df.Churn.map({"No":0,"Yes":1})

In [44]:
categorical_cols = df.select_dtypes(include="object").columns
numerical_cols = df.select_dtypes(exclude='object').columns

In [45]:
for i in df[categorical_cols]:
    print({i:df[i].unique()})

{'gender': array(['Female', 'Male'], dtype=object)}
{'Partner': array(['Yes', 'No'], dtype=object)}
{'Dependents': array(['No', 'Yes'], dtype=object)}
{'PhoneService': array(['No', 'Yes'], dtype=object)}
{'MultipleLines': array(['No phone service', 'No', 'Yes'], dtype=object)}
{'InternetService': array(['DSL', 'Fiber optic', 'No'], dtype=object)}
{'OnlineSecurity': array(['No', 'Yes', 'No internet service'], dtype=object)}
{'OnlineBackup': array(['Yes', 'No', 'No internet service'], dtype=object)}
{'DeviceProtection': array(['No', 'Yes', 'No internet service'], dtype=object)}
{'TechSupport': array(['No', 'Yes', 'No internet service'], dtype=object)}
{'StreamingTV': array(['No', 'Yes', 'No internet service'], dtype=object)}
{'StreamingMovies': array(['No', 'Yes', 'No internet service'], dtype=object)}
{'Contract': array(['Month-to-month', 'One year', 'Two year'], dtype=object)}
{'PaperlessBilling': array(['Yes', 'No'], dtype=object)}
{'PaymentMethod': array(['Electronic check', 'Mailed 

In [46]:
numerical_cols = numerical_cols[:-1]

In [47]:
numerical_cols

Index(['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges'], dtype='object')

In [110]:
# dependent features and independent features
X = df.drop('Churn', axis=1)
y =df.Churn

In [111]:
from sklearn.preprocessing import OneHotEncoder , StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import  Pipeline
from sklearn.impute import SimpleImputer

In [112]:
X_train , X_test , y_train , y_test  = train_test_split(X,y,test_size=0.3 , random_state=0)

In [113]:
categorical_pipeline = Pipeline(
    steps = [
        ('imputer',SimpleImputer(strategy='most_frequent')),
        ('encoder',OneHotEncoder(drop='if_binary'))
    ]
)

numerical_pipeline = Pipeline(
        steps=[
            ('imputer',SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]
    )

preprocessor = ColumnTransformer([
    ('categorical_pipeline',categorical_pipeline , categorical_cols),
    ('numerical_pipeline',numerical_pipeline , numerical_cols)
])


In [114]:
X_train =  pd.DataFrame(preprocessor.fit_transform(X_train),columns=preprocessor.get_feature_names_out())
X_test = pd.DataFrame(preprocessor.transform(X_test),columns=preprocessor.get_feature_names_out())

In [115]:
from sklearn.decomposition import PCA
from sklearn.metrics import classification_report

In [116]:
pipe = Pipeline(
    [
        ('pca', PCA(n_components=5)),
        ('clf' , RandomForestClassifier())
    ]
)

In [117]:
pipe.fit(X_train , y_train)
y_pred = pipe.predict(X_test)
report = classification_report(y_test , y_pred)


In [118]:
print(report)

              precision    recall  f1-score   support

           0       0.82      0.89      0.85      1560
           1       0.59      0.45      0.51       553

    accuracy                           0.78      2113
   macro avg       0.71      0.67      0.68      2113
weighted avg       0.76      0.78      0.76      2113

