In [1]:
import pandas as pd
import numpy as np
import json
import requests
from io import StringIO
from google.cloud import storage
from math import ceil
import datetime
import calendar
from sqlalchemy import create_engine
from snowflake.connector import connect
from snowflake.connector import connect
from sqlalchemy.dialects import registry


In [None]:
# Connect to snowflake 
# Read credentials from config file
with open('config_dw.json', 'r') as f:
    config = json.load(f)

snowflake_config = config['snowflake']

# Your Snowflake database credentials
account_name = snowflake_config['account_name']
user = snowflake_config['user']
password = snowflake_config['password']
database = snowflake_config['database']
schema = snowflake_config['schema']

# Establish a connection to Snowflake
conn = connect(
    user=user,
    password=password,
    account=account_name,
    database=database,
    schema=schema
)

registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')

# Create a SQLAlchemy engine for Snowflake
snowflake_engine = create_engine(f'snowflake://{user}:{password}@{account_name}/{database}/{schema}')
connection = snowflake_engine.connect()
results = connection.execute('select current_version()').fetchone()
print(results[0])

In [None]:
!curl ipecho.net/plain

In [None]:
# Configuration data
with open('config.json', 'r') as config_file:
    config_data = json.load(config_file)

# Authenticate with Google Cloud Storage using the service account JSON
storage_client = storage.Client.from_service_account_info(config_data)

# Google Cloud Storage Configuration
BUCKET_NAME = "housingproject_cis9440"

# Get the bucket
bucket = storage_client.get_bucket(BUCKET_NAME)

# List all blobs in the specified container
blob_list = bucket.list_blobs()

# Iterate over each blob
for blob in blob_list:
    blob_name = blob.name
    print(f"Processing blob: {blob_name}")

    # Check if the blob is a file (not a directory)
    if '.' in blob_name:
        # Get the blob
        blob = bucket.blob(blob_name)

        # Download the blob content
        blob_content = blob.download_as_string()

        # Convert blob content to DataFrame
        df = pd.read_csv(StringIO(blob_content.decode('utf-8')))

        # Display the shape of the DataFrame
        print(f"Shape of {blob_name}: {df.shape}")
    else:
        print("Blob is a directory, skipping...")

In [None]:
# Recreate the iterator blob_list
blob_list = bucket.list_blobs()

# Iterate over each blob
for blob in blob_list:
    blob_name = blob.name
    print(f"Processing blob: {blob_name}")

    # Check if the blob is a file (not a directory) 
    if '.' in blob_name in blob_name:
        # Get the blob
        blob = bucket.blob(blob_name)

        # Download the blob content
        blob_content = blob.download_as_string()

        # Convert blob content to DataFrame
        df = pd.read_csv(StringIO(blob_content.decode('utf-8')))

        # Display the shape of the DataFrame
        print(f"Shape of {blob_name}: {df.shape}")

        # Store the DataFrame into Snowflake
        table_name = blob_name.split('.')[0]  # Extract table name from blob name
        df.to_sql(table_name, con=snowflake_engine, if_exists='append', index=False)
        print(f"DataFrame stored in Snowflake table: {table_name}")
    else:
        print("Blob is not a file, skipping...")

In [None]:
# Recreate the iterator blob_list
blob_list = bucket.list_blobs()

# Initialize a counter for the number of processed files
file_count = 0

# Flag to indicate if we have encountered the starting point
start_processing = False

# Iterate over each blob
for blob in blob_list:
    blob_name = blob.name
    print(f"Processing blob: {blob_name}")

    # Check if the blob is a file (not a directory) 
    if '.' in blob_name:
        # Check if the file is in the "Sales/NY" directory
        if blob_name.startswith("SALES/NY/") or blob_name == "SALES/NY":
            # Increment the file count
            file_count += 1

            # Check if we have reached the starting point
            if not start_processing:
                if file_count == 13:
                    start_processing = True
                else:
                    continue  # Skip until we reach the starting point

            # Check if we have processed 50 files
            if file_count > 50:
                print("Reached 50 files, stopping processing.")
                break  # Stop processing further files

            # Get the blob
            blob = bucket.blob(blob_name)

            # Download the blob content
            blob_content = blob.download_as_string()

            # Convert blob content to DataFrame
            df = pd.read_csv(StringIO(blob_content.decode('utf-8')))

            # Display the shape of the DataFrame
            print(f"Shape of {blob_name}: {df.shape}")

            # Store the DataFrame into Snowflake
            table_name = blob_name.split('/')[-1].split('.')[0]  # Extract table name from blob name
            df.to_sql(table_name, con=snowflake_engine, if_exists='append', index=False)
            print(f"DataFrame stored in Snowflake table: {table_name}")
        else:
            print("Blob is not in the 'Sales/NY' directory, skipping...")
    else:
        print("Blob is not a file, skipping...")