In [1]:
%pwd

'c:\\Users\\HP\\Box\\CMU Fall - 2023\\MLOps\\titanic_mlops_project\\mlops_project_speceship_titanic\\research'

In [2]:
import os
os.chdir("../")

In [3]:
%pwd

'c:\\Users\\HP\\Box\\CMU Fall - 2023\\MLOps\\titanic_mlops_project\\mlops_project_speceship_titanic'

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)  # Define a dataclass with frozen=True to make it immutable
class DataIngestionConfig:
    root_dir: Path  # Path to the root directory where data will be stored
    local_data_file: Path  # Path to the local data file to be ingested
    cloud_config_zipfile: Path  # Path to the zipfile containing cloud configuration
    authentication_token: Path  # Path to the authentication token file


In [5]:
from titanicSpaceShip.utils.common import *
from titanicSpaceShip.utils.common import read_yaml, create_directories

In [6]:
import os 
import urllib.request as request
import zipfile
from titanicSpaceShip import logger
from titanicSpaceShip.utils.common import get_size

In [7]:
import time
import csv
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
    
    def download_file(self):
        delay = 5
        max_retries = 1
        for _ in range(max_retries):
            try:
                if not os.path.exists(self.config.local_data_file):
                    # This secure connect bundle is autogenerated when you download your SCB, 
                    # if yours is different update the file name below
                    cloud_config= {
                    'secure_connect_bundle': self.config.cloud_config_zipfile
                    }
                    # This token JSON file is autogenerated when you download your token, 
                    # if yours is different update the file name below
                    with open(self.config.authentication_token) as f:
                        secrets = json.load(f)
                    CLIENT_ID = secrets["clientId"]
                    CLIENT_SECRET = secrets["secret"]

                    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
                    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
                    session = cluster.connect()

                    row = session.execute('SELECT * FROM "prediction"."data"')
                    if row:
                        # Specify the CSV file path
                        csv_file_path = self.config.local_data_file

                        # Write named tuples to CSV file
                        with open(csv_file_path, 'w', newline='') as csv_file:
                            writer = csv.writer(csv_file)
                            
                            # Write header
                            writer.writerow(row[0]._fields)
                            
                            # Write data
                            for each in row:
                                writer.writerow(each)
                            logger.info(f"{self.config.local_data_file} download! from astra db with following info: \n {row[0]._fields}")
                    else:
                        logger.info(f"An Error occurred while connecting to db")
                    
                else:
                    logger.info(f"File already exists of size: {get_size(Path(self.config.local_data_file))}")
            except Exception as e:
                logger.info(f"Delay for next attempt {e}")
                time.sleep(delay)
                delay *= 2
        else:
            logger.info(f"Failed connecting to astra/casandra db after {max_retries} attempt")


In [8]:
from titanicSpaceShip.constants import *

In [9]:
class ConfigurationManager:
    def __init__(
            self,
            config_filepath = CONFIG_FILE_PATH,
            params_filepath = PARAMS_FILE_PATH):

            self.config = read_yaml(config_filepath)
            self.params = read_yaml(params_filepath)

            create_directories([self.config.artifacts_root])
    def get_data_ingestion_config(self) -> DataIngestionConfig:
          config = self.config.data_ingestion

          create_directories([config.root_dir])

          data_ingestion_config = DataIngestionConfig(
                root_dir=config.root_dir,
                local_data_file=config.local_data_file,
                cloud_config_zipfile=config.cloud_config_zipfile,
                authentication_token=config.authentication_token
          )
          return data_ingestion_config

In [10]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_file()
    # data_ingestion.extract_zip_file()
except Exception as e:
    raise e

[2024-01-28 04:29:18,143: INFO: common: yaml file: config\config.yaml loaded successfully]
[2024-01-28 04:29:18,149: INFO: common: yaml file: params.yaml loaded successfully]
[2024-01-28 04:29:18,151: INFO: common: created directory at: artifacts]
[2024-01-28 04:29:18,154: INFO: common: created directory at: artifacts/data_ingestion]
[2024-01-28 04:29:27,301: ERROR: asyncorereactor: Closing connection <AsyncoreConnection(2068000317968) af73231d-9932-4c00-8d1c-5c541735dce5-us-east1.db.astra.datastax.com:29042:56e52adb-9887-482b-a4ce-9ccf4b574572> due to protocol error: Error from server: code=000a [Protocol error] message="Beta version of the protocol used (5/v5-beta), but USE_BETA flag is unset"]
[2024-01-28 04:29:29,733: INFO: policies: Using datacenter 'us-east1' for DCAwareRoundRobinPolicy (via host 'af73231d-9932-4c00-8d1c-5c541735dce5-us-east1.db.astra.datastax.com:29042:56e52adb-9887-482b-a4ce-9ccf4b574572'); if incorrect, please specify a local_dc to the constructor, or limit co