In [23]:
!pip install pandas --upgrade
!pip install google-cloud-storage
!pip install gcsfs
!pip install sqlalchemy pandas psycopg2
!pip install python-dotenv
!pip install kaggle --upgrade
!pip install google-cloud-secret-manager



In [20]:
from kaggle.api.kaggle_api_extended import KaggleApi
from google.cloud import storage
import pandas as pd
import numpy as np
import os
import tempfile
from io import StringIO
import psycopg2
from concurrent.futures import ThreadPoolExecutor
from psycopg2.extensions import register_adapter, AsIs
from datetime import date
register_adapter(np.int64, AsIs)

In [47]:
# set key credentials file path
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r'C:/Disk_D/Course Work/Data Warehousing/Project-2/Keys/alien-grove-405422-bb57fda72219.json'
# Set your Kaggle API key
api = KaggleApi()
# api.CONFIG_NAME_USER='saivarunkumarnamburi'
# api.CONFIG_NAME_KEY='4e18a51c732b29ec81207232f7381392'
# api.read_config_environment({"username":"saivarunkumarnamburi","key":"4e18a51c732b29ec81207232f7381392"})
api.authenticate()

def connect_db_bulk(df,tbname):
    #Get Credentials
    db_host = os.getenv("DB_HOST")
    db_port = os.getenv("DB_PORT")
    db_name = os.getenv("DB_NAME")
    db_user = os.getenv("DB_USER")
    db_password = os.getenv("DB_PASSWORD")

    # Use the variables in your database connection logic
    connection_params = {
        'host': db_host,
        'port': db_port,
        'database': db_name,
        'user': db_user,
        'password': db_password,
    }
    # Establish a connection to your PostgreSQL database
    connection = psycopg2.connect(**connection_params)

    # Create a cursor to execute SQL statements
    cursor = connection.cursor()
    table_name = tbname
    schema_name = 'public'
    df['date'] = pd.to_datetime(df['date']).dt.date
    max_dates_by_company = df.groupby('company')['date'].max()
    # max_dates_by_company_df = max_dates_by_company.reset_index()
    max_dates_by_company = max_dates_by_company.reset_index().sort_values(by='company')
    # print(max_dates_by_company)
    sql_query = "SELECT company, MAX(date) as date FROM public.testmonthly GROUP BY company ORDER BY company;"
    # Execute the query and store the result in a DataFrame
    db_result = pd.read_sql_query(sql_query, connection)
    db_result['date'] = pd.to_datetime(db_result['date']).dt.date
    res_cd = pd.DataFrame()

    if len(db_result) == 0:
        # res_cd = max_dates_by_company
        insert_query = f"INSERT INTO {schema_name}.{table_name} VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
        records = df.to_records(index=False)
        values = [tuple(record) for record in records]
        cursor.executemany(insert_query, values)
    else:
        db_result['date'] = pd.to_datetime(db_result['date']).dt.date
        for i in range(len(db_result)):
            if db_result.loc[i, "date"] < max_dates_by_company.loc[i, "date"]:
                res_cd = res_cd._append(db_result.loc[i])
        # print("Res Cd:", res_cd)
        
        res = pd.DataFrame()
        for i in range(len(res_cd)):
            company_res = df[(df['company'] == res_cd.loc[i, 'company']) & (df['date'] > res_cd.loc[i,'date'])]
            res = res._append(company_res)
    
        print("result: \n",res)
        if(len(res)!=0):
            insert_query = f"INSERT INTO {schema_name}.{table_name} VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
            records = res.to_records(index=False)
            values = [tuple(record) for record in records]
            cursor.executemany(insert_query, values)

    connection.commit()
    cursor.close()
    connection.close()

def connect_db_bulk_parallel(df):
    # Split the DataFrame into chunks for parallel processing
    chunk_size = 5000  # Adjust the chunk size as needed
    chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]

    # Create a ThreadPoolExecutor with the desired number of threads
    with ThreadPoolExecutor(max_workers=4) as executor:  # Adjust max_workers based on your system capacity
        # Submit each chunk for parallel processing
        futures = [executor.submit(connect_db_bulk, chunk,'testmonthly') for chunk in chunks]

        # Wait for all tasks to complete
        for future in futures:
            future.result()

def download_dataset(request):
    # Specify the dataset to download
    dataset_name = "nikhil1e9/netflix-stock-price"

    # Set the Cloud Storage bucket name
    bucket_name = "dwdi-de-project"

    # Specify the Cloud Storage directory to store downloaded files
    storage_directory = "testdatasets/"

    # Download the dataset from Kaggle
    with tempfile.TemporaryDirectory() as temp_dir:
        api.dataset_download_files(dataset=dataset_name, path=temp_dir, unzip=True)

        # List downloaded files
        downloaded_files = os.listdir(temp_dir)
        # Upload each file to Cloud Storage
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for file_name in downloaded_files:
            local_path = os.path.join(temp_dir, file_name)
            storage_path = os.path.join(storage_directory, file_name)

            # Upload the file to Cloud Storage
            blob = bucket.blob(storage_path)
            blob.upload_from_filename(local_path)

        # Process the uploaded files
        process_uploaded_files(bucket_name, storage_directory)

    return "Dataset downloaded and processed successfully."

def delete_objects(bucket_name, prefix):
    # Create a Storage client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # List all objects with the specified prefix
    blobs = bucket.list_blobs(prefix=prefix)

    # Delete each object
    for blob in blobs:
        blob.delete()

def process_uploaded_files(bucket_name, storage_directory):
    data_dict = {"monthly": []}
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List files in the Cloud Storage directory
    blobs = list(bucket.list_blobs(prefix=storage_directory))
    monthly_blobs = [blob for blob in blobs if 'monthly' in blob.name]
    print("Monthly blobs: ", monthly_blobs)
    for blob in monthly_blobs:
        # Process only CSV files
        if blob.name.endswith('.csv'):
            # Download the file content
            content = blob.download_as_text()
            # Process the content (example: convert to DataFrame)
            df = pd.read_csv(StringIO(content))
            # Example: Append the DataFrame to the 'monthly' list
            company=blob.name.split('_')[0]
            company_updated = company.split('/')[1]
            data_dict["monthly"].append(df.assign(company=company_updated))

    delete_objects(bucket_name,storage_directory)
    # Concatenate DataFrames
    monthly_data = pd.concat(data_dict["monthly"], ignore_index=True)
    column_mapping = {
    'Date': 'date',
    'Open': 'open',
    'High': 'high',
    'Low': 'low',
    'Close': 'close',
    'Adj Close': 'adj_close',
    'Volume': 'volumn',
    'Company': 'company'
    }
    # Map DataFrame columns to PostgreSQL columns
    df_monthly_mapped = monthly_data.rename(columns=column_mapping)
    connect_db_bulk_parallel(df_monthly_mapped)
    print(monthly_data.tail(10))

download_dataset(None)


Monthly blobs:  [<Blob: dwdi-de-project, testdatasets/AMAZON_monthly.csv, 1701308551221295>, <Blob: dwdi-de-project, testdatasets/APPLE_monthly.csv, 1701308553671219>, <Blob: dwdi-de-project, testdatasets/GOOGLE_monthly.csv, 1701308555877880>, <Blob: dwdi-de-project, testdatasets/META_monthly.csv, 1701308557770466>, <Blob: dwdi-de-project, testdatasets/NETFLIX_monthly.csv, 1701308559722407>]
result: 
 Empty DataFrame
Columns: []
Index: []
            Date        Open        High         Low       Close   Adj Close  \
1402  2023-02-01  353.859985  379.429993  314.299988  322.130005  322.130005   
1403  2023-03-01  321.549988  345.839996  285.329987  345.480011  345.480011   
1404  2023-04-01  341.829987  349.799988  316.100006  329.929993  329.929993   
1405  2023-05-01  329.440002  405.109985  315.619995  395.230011  395.230011   
1406  2023-06-01  397.410004  448.649994  393.079987  440.489990  440.489990   
1407  2023-07-01  439.760010  485.000000  411.880005  438.970001  438.970001 

  db_result = pd.read_sql_query(sql_query, connection)


'Dataset downloaded and processed successfully.'

In [48]:
# set key credentials file path
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r'C:/Disk_D/Course Work/Data Warehousing/Project-2/Keys/alien-grove-405422-bb57fda72219.json'
# Set your Kaggle API key
api = KaggleApi()
# api.CONFIG_NAME_USER='saivarunkumarnamburi'
# api.CONFIG_NAME_KEY='4e18a51c732b29ec81207232f7381392'
# api.read_config_environment({"username":"saivarunkumarnamburi","key":"4e18a51c732b29ec81207232f7381392"})
api.authenticate()

def connect_db_bulk(df,tbname):
    #Get Credentials
    db_host = os.getenv("DB_HOST")
    db_port = os.getenv("DB_PORT")
    db_name = os.getenv("DB_NAME")
    db_user = os.getenv("DB_USER")
    db_password = os.getenv("DB_PASSWORD")

    # Use the variables in your database connection logic
    connection_params = {
        'host': db_host,
        'port': db_port,
        'database': db_name,
        'user': db_user,
        'password': db_password,
    }
    # Establish a connection to your PostgreSQL database
    connection = psycopg2.connect(**connection_params)

    # Create a cursor to execute SQL statements
    cursor = connection.cursor()
    table_name = tbname
    schema_name = 'public'
    # df['date'] = pd.to_datetime(df['date']).dt.date
    # Convert 'date' to datetime
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    # Drop rows with missing dates
    df = df.dropna(subset=['date'])
    # Extract date component
    df['date'] = df['date'].dt.date
    
    max_dates_by_company_df = df.groupby('company')['date'].max()
    max_dates_by_company_df = max_dates_by_company_df.reset_index()
    max_dates_by_company = max_dates_by_company_df.reset_index().sort_values(by='company')
    # max_dates_by_company_df = df.groupby('company')['date'].max().reset_index()
    # max_dates_by_company = max_dates_by_company_df.sort_values(by='company')
    print("Main df dates \n",max_dates_by_company)
    sql_query = "SELECT company, MAX(date) as date FROM public.weekly GROUP BY company ORDER BY company;"
    # Execute the query and store the result in a DataFrame
    db_result = pd.read_sql_query(sql_query, connection)
    db_result['date'] = pd.to_datetime(db_result['date']).dt.date
    res_cd = pd.DataFrame()

    if len(db_result) == 0:
        # res_cd = max_dates_by_company
        insert_query = f"INSERT INTO {schema_name}.{table_name} VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
        records = df.to_records(index=False)
        values = [tuple(record) for record in records]
        cursor.executemany(insert_query, values)
    else:
        db_result['date'] = pd.to_datetime(db_result['date']).dt.date
        print("Database Result \n",db_result)
        for i in range(len(db_result)):
            if db_result.iloc[i]["date"] < max_dates_by_company.loc[i, "date"]:
                res_cd = res_cd._append(db_result.loc[i])
        # print("Res Cd:", res_cd)
        
        res = pd.DataFrame()
        for i in range(len(res_cd)):
            company_res = df[(df['company'] == res_cd.loc[i, 'company']) & (df['date'] > res_cd.loc[i,'date'])]
            res = res._append(company_res)
    
        print("result: \n",res)
        if(len(res)!=0):
            insert_query = f"INSERT INTO {schema_name}.{table_name} VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
            records = res.to_records(index=False)
            values = [tuple(record) for record in records]
            cursor.executemany(insert_query, values)

    # connection.commit()
    cursor.close()
    connection.close()

def connect_db_bulk_parallel(df):
    # Split the DataFrame into chunks for parallel processing
    chunk_size = 5000  # Adjust the chunk size as needed
    chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]

    # Create a ThreadPoolExecutor with the desired number of threads
    with ThreadPoolExecutor(max_workers=4) as executor:  # Adjust max_workers based on your system capacity
        # Submit each chunk for parallel processing
        futures = [executor.submit(connect_db_bulk, chunk,'weekly') for chunk in chunks]

        # Wait for all tasks to complete
        for future in futures:
            future.result()

def download_dataset(request):
    # Specify the dataset to download
    dataset_name = "nikhil1e9/netflix-stock-price"

    # Set the Cloud Storage bucket name
    bucket_name = "dwdi-de-project"

    # Specify the Cloud Storage directory to store downloaded files
    storage_directory = "testdatasets/"

    # Download the dataset from Kaggle
    with tempfile.TemporaryDirectory() as temp_dir:
        api.dataset_download_files(dataset=dataset_name, path=temp_dir, unzip=True)

        # List downloaded files
        downloaded_files = os.listdir(temp_dir)
        # Upload each file to Cloud Storage
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for file_name in downloaded_files:
            local_path = os.path.join(temp_dir, file_name)
            storage_path = os.path.join(storage_directory, file_name)

            # Upload the file to Cloud Storage
            blob = bucket.blob(storage_path)
            blob.upload_from_filename(local_path)

        # Process the uploaded files
        process_uploaded_files(bucket_name, storage_directory)

    return "Dataset downloaded and processed successfully."

def delete_objects(bucket_name, prefix):
    # Create a Storage client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # List all objects with the specified prefix
    blobs = bucket.list_blobs(prefix=prefix)

    # Delete each object
    for blob in blobs:
        blob.delete()

def process_uploaded_files(bucket_name, storage_directory):
    data_dict = {"weekly": []}
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # List files in the Cloud Storage directory
    blobs = list(bucket.list_blobs(prefix=storage_directory))
    weekly_blobs = [blob for blob in blobs if 'weekly' in blob.name]
    print("Weekly blobs: ", weekly_blobs)
    for blob in weekly_blobs:
        # Process only CSV files
        if blob.name.endswith('.csv'):
            # Download the file content
            content = blob.download_as_text()
            # Process the content (example: convert to DataFrame)
            df = pd.read_csv(StringIO(content))
            # Example: Append the DataFrame to the 'monthly' list
            company=blob.name.split('_')[0]
            company_updated = company.split('/')[1]
            data_dict["weekly"].append(df.assign(company=company_updated))

    delete_objects(bucket_name,storage_directory)
    # Concatenate DataFrames
    weekly_data = pd.concat(data_dict["weekly"], ignore_index=True)
    column_mapping = {
    'Date': 'date',
    'Open': 'open',
    'High': 'high',
    'Low': 'low',
    'Close': 'close',
    'Adj Close': 'adj_close',
    'Volume': 'volumn',
    'Company': 'company'
    }
    # Map DataFrame columns to PostgreSQL columns
    df_weekly_mapped = weekly_data.rename(columns=column_mapping)
    # print(weekly_data.tail(10))
    connect_db_bulk(df_weekly_mapped,'weekly')
    

download_dataset(None)


Weekly blobs:  [<Blob: dwdi-de-project, testdatasets/AMAZON_weekly.csv, 1701308752306413>, <Blob: dwdi-de-project, testdatasets/APPLE_weekly.csv, 1701308754670846>, <Blob: dwdi-de-project, testdatasets/GOOGLE_weekly.csv, 1701308756698798>, <Blob: dwdi-de-project, testdatasets/META_weekly.csv, 1701308758960724>, <Blob: dwdi-de-project, testdatasets/NETFLIX_weekly.csv, 1701308760830063>]
Main df dates 
    index  company        date
0      0   AMAZON  2023-11-27
1      1    APPLE  2023-11-27
2      2   GOOGLE  2023-11-27
3      3     META  2023-11-27
4      4  NETFLIX  2023-11-27
Database Result 
    company        date
0   AMAZON  2023-11-13
1    APPLE  2023-11-13
2   GOOGLE  2023-11-13
3     META  2023-11-13
4  NETFLIX  2023-11-13
result: 
             date        open        high         low       close   adj_close  \
1384  2023-11-20  145.130005  147.740005  141.500000  146.740005  146.740005   
1385  2023-11-27  147.529999  149.259995  146.880005  147.729996  147.729996   
3627  202

  db_result = pd.read_sql_query(sql_query, connection)


'Dataset downloaded and processed successfully.'