<a href="https://colab.research.google.com/github/SravanGatla/AWS-Snowflake-DataPipeline/blob/main/processing_and_analytics_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import boto3
import pandas as pd
from snowflake.connector import connect, PermissionError
from snowflake.connector.pandas_tools import write_pandas


class DataExtractor:
  def __init__(self, bucket_name, file_key, chunk_size = 10000):
    self.bucket_name = bucket_name
    self.file_key = file_key # Path of the file/ file which we want to extract.
    self.chunk_size = chunk_size

  def extract_data(self):
    raise NotImplementedError("Subclasses must implement extract_data method")

  def create_external_stage(self, conn, stage_name):
    cursor = conn.cursor()
    create_stage_query = f"create or replace stage {stage_name}"
    cursor.execute(create_stage_query)

In [None]:
from your_module import DataExtractor

class CSVDataExtractor(DataExtractor):
  def extract_data(self):
    s3 = boto3.client('path of s3')
    obj = s3.get_object(Bucket = self.bucket_name, key = self.file_key)
    chunks = pd.read_csv(obj['Body'], lines = True, chunksize = self.chunk_size)
    data_chunks = []
    for i, chunk in enumerate(chunks):
      data_chunks.append(chunk)
    return data_chunks

In [None]:
from your_module import DataExtractor

class JSONDataExtractor(DataExtractor):
  def extract_data(self):
    s3 = boto3.client('path of s3')
    obj = s3.get_object(Bucket = self.bucket_name, file_key = self.file_key)
    chunks = pd.read_json(obj['Body'], lines = True, chunks = self.chunk_size)
    data_chunks = []
    for i, chunk in enumerate(chunks):
      data_chunks.append(chunk)
    return data_chunks

In [None]:
from your_module import JSONDataExtractor, CSVDataExtractor

class DataProcessor:
  def __init__(self, data):
    self.data = data

  def process_and_analyse_data(self):
    self.data['age'] = self.data['age'].astype(int)
    self.data['date'] = pd.to_datetime(self.data['date'])
    # Map 'gender' values to numeric values
    gender_mapping = {
            'male': 1,
            'female': 0,
            'Not Specified': 2
        }
    self.data['gender'] = self.data['gender'].map(gender_mapping).fillna(self.data['gender'])
    return self.data

In [None]:
from your_module import DataProcessor

class DataMasker:
  def __init__(self, processed_data):
    self.processed_data = processed_data

  def mask_sensitive_data(self):
    def mask_ssn_or_account(x, column_name):
      if isinstance(x,str) and column_name in ['SSN', 'account_number']:
        return 'X' * len(x)
      else:
        return x


    masked_data = [chunk.applymap(lambda x: mask_ssn_or_account(x, column_name)) for column_name, chunk in self.processed_data.items()]
    return masked_data


In [None]:
from your_module import DataMasker
from snowflake.connector import connect, ProgrammingError
from snowflake.connector.pandas_tools import write_pandas

class SnowflakeLoader:
  def __init__(self, processed_data, snowflake_connection_params, table_name):
    self.processed_data = processed_data
    self.snowflake_connection_params = snowflake_connection_params
    self.table_name = table_name

  def load_data_to_snowflake(self):
    try:
      conn = connect(**self.snowflake_connection_params)
      create_stage_query = f"CREATE OR REPLACE STAGE my_external_stage"
      conn.cursor().execute(create_stage_query)

      for i, chunk in enumerate(self.processed_data):
        stage_file_path = f"/data_chunk_{i}.csv"
        write_pandas(conn, chunk, stage_name = 'my_external_stage', table_name = stage_file_path)

      copy_into_table_query = f"COPY INTO {self.table_name} FROM '@my_external_stage' FILE_FORMAT = (TYPE = CSV)"
      conn.cursor().execute(copy_into_table_query)
      conn.commit()
      print("Data Loaded into Snowflake successfully.")
    except ProgrammingError as e:
     print("Error:", e)



In [None]:
from your_module import process_and_analyse_data, mask_sensitive_data, load_data_to_snowflake

def process_data_in_parallel(process, data):
  return process.process_and_analyse_data(data)

def mask_data_in_parallel(process, data):
  return process.mask_sensitive_data(data)

def load_data_in_parallel(process, data):
  return process.load_data_in_parallel(data)


In [None]:
import boto3
import pandas as pd
import multiprocessing
from snowflake.connector import connect,ProgrammingError
from your_module import CSVDataExtractor, JSONDataExtractor
from your_module import process_and_analyse_data, mask_sensitive_data, load_data_to_snowflake
from your_module import process_data_in_parallel, mask_data_in_parallel, load_data_in_parallel

def main():
  s3_bucket_name = 'name of the S3 bucket'
  S3_file_key = 'path/your_file_key.csv'
  snowflake_connection_params = {
      'user': 'your-username',
      'password': 'your-password',
      'account': 'your-account',
      'warehouse': 'your-warehouse',
      'database': 'your-database',
      'schema': 'your-schema'
  }
  table_name = 'your-table-name'

  extractor = CSVDataExtractor(s3_bucket_name, S3_file_key)
  data_chunks = extractor.extract_data()

  processors = [DataProcessor(chunk) for chunk in data_chunks]
  with multiprocessing.Pool() as pool:
    processed_chunks = pool.starmap(process_data_in_parallel, zip(processors, data_chunks))

  maskers = [DataMasker(chunk) for chunk in processed_chunks]
  with multiprocessing.Pool() as pool:
    masked_chunks = pool.starmap(mask_data_in_parallel, zip(maskers, processed_chunks))

  snowflake_loaders = [SnowflakeLoader(chunk, snowflake_connection_params, table_name) for chunk in masked_chunks]
  with multiprocessing.Pool() as pool:
    pool.map(load_data_in_parallel, snowflake_loaders)

if __name__ == "__main__":
  main()