In [None]:
# Automatically install the necessary libraries
!pip install pandas pyarrow google-cloud-storage google-cloud-bigquery yfinance

In [None]:
# Importing the necessary libraries
try:
    import datetime as dt
    import pytz
    import numpy as np
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    from google.cloud import storage
    import time
    from google.cloud import bigquery
    import yfinance as yf

    print("Libraries imported correctly.")
except ImportError as e:
    print(f"Error importing libraries: {e}")

In [None]:
def download_parquet_from_gcs(bucket_name, source_blob_name, destination_file_name,credential_path):
    """
    Downloads a Parquet file from Google Cloud Storage.

    Args:
        bucket_name (str): The name of the GCS bucket.
        source_blob_name (str): The path of the file in the bucket.
        destination_file_name (str): The local path where the file will be saved.
    """

    # Initialize the Google Cloud Storage client
    storage_client = storage.Client.from_service_account_json(credential_path)

    # Get the bucket and the blob (file)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)

    # Download the file to the local path
    blob.download_to_filename(destination_file_name)
    print(f"File {source_blob_name} successfully downloaded to {destination_file_name}")




In [None]:
bucket_name = "parquet-dataset-financial-data"
source_blob_name = "main_financial_data.parquet"
destination_file_name = "/Users/danifila/Downloads/main_financial_data.parquet"
credential_path = "/Users/danifila/Desktop/UpWork/dani-financial-1ca621e0a4c6.json"

download_parquet_from_gcs(bucket_name,source_blob_name,destination_file_name,credential_path)

In [None]:
def download_yahoo_finance_data(isin,symbol_isin_dict):
    """
    Downloads historical data and summary information from Yahoo Finance for a given ISIN.
    
    Args:
        isin (str): The ISIN of the financial instrument.
    """
    
    try:
        # Use the ISIN to create a Ticker object
        ticker = yf.Ticker(isin)
        
        # Download historical data
        historical_data = ticker.history(period="max",actions=True)  # Adjust the period if needed
        
        # Download summary data
        summary = ticker.info
        
        # Extract analyst price targets and market cap if available
        analyst_targets = summary.get('targetMeanPrice', np.nan)
        market_cap = summary.get('marketCap', np.nan)
        
         # Add ISIN, scraping_time_stamp, and url_scraping columns
        scraping_time_stamp = dt.datetime.now() # Get the current timestamp
        url_scraping = f"https://finance.yahoo.com/quote/{isin}"  # Construct the URL for scraping


        # Add ISIN as a column
        historical_data['ISIN'] = isin
        historical_data['scraping_time_stamp'] = scraping_time_stamp
        historical_data["url_scraping"] = url_scraping

        
        if historical_data.index.name == 'Date':
            # Convert index to a column and reset the index
            historical_data = historical_data.reset_index()
            # Ensure 'Date' column is datetime and remove timezone
            historical_data['Date'] = pd.to_datetime(historical_data['Date'])

            
        # Add ISIN as the first column
        historical_data['ISIN'] = isin
        historical_data = historical_data[['ISIN'] + [col for col in historical_data.columns if col not in ['ISIN']]]



        return {
            'historical_data': historical_data,
            'summary': { 
                'ISIN':isin,
                'analyst_price_targets': analyst_targets,
                'market_cap': market_cap,
                'scraping_time_stamp': scraping_time_stamp,
                'url_scraping': url_scraping
            }
        }
    except Exception as e:
        print(f"Error downloading data for ISIN {isin}: {e}")
        # Try using the Symbol if the ISIN fails
        symbol = symbol_isin_dict.get(isin)
        if symbol:
            print(f"Trying again with Symbol: {symbol}")
            try:
                # Use the Symbol to create a Ticker object
                ticker = yf.Ticker(symbol)
                
                # Download historical data
                historical_data = ticker.history(period="max", actions=True)
                
                # Download summary data
                summary = ticker.info

                # Extract analyst price targets and market cap if available
                analyst_targets = summary.get('targetMeanPrice', np.nan)
                market_cap = summary.get('marketCap', np.nan)

                # Add ISIN, scraping_time_stamp, and url_scraping columns
                scraping_time_stamp = dt.datetime.now()
                url_scraping = f"https://finance.yahoo.com/quote/{symbol}"

                historical_data['ISIN'] = isin
                historical_data['scraping_time_stamp'] = scraping_time_stamp
                historical_data["url_scraping"] = url_scraping

                if historical_data.index.name == 'Date':
                    historical_data = historical_data.reset_index()
                    historical_data['Date'] = pd.to_datetime(historical_data['Date'])

                historical_data['ISIN'] = isin
                historical_data = historical_data[['ISIN'] + [col for col in historical_data.columns if col not in ['ISIN']]]



                return {
                    'historical_data': historical_data,
                    'summary': {
                        'ISIN': isin,
                        'analyst_price_targets': analyst_targets,
                        'market_cap': market_cap,
                        'scraping_time_stamp': scraping_time_stamp,
                        'url_scraping': url_scraping
                    }
                }

            except Exception as e2:
                print(f"Error downloading data with Symbol {symbol}: {e2}")
        
        return None

def isin_to_dict_from_parquet(destination_file_name):
    parquet = pd.read_parquet(destination_file_name)

    parquet = parquet.drop_duplicates(subset="ISIN")
    parquet_selected = parquet[["ISIN","Symbol"]]
    symbol_isin_dict = parquet_selected.set_index("ISIN")["Symbol"].to_dict()

    return symbol_isin_dict


def save_to_parquet(data, filename):
    """
    Save the data to a Parquet file, ensuring all columns have appropriate types.
    
    Args:
        data (pd.DataFrame): The DataFrame to be saved.
        filename (str): The output Parquet file name.
    """
    if data is not None:
        # Check if 'Adj Close' column exists and convert it to float
        if 'Adj Close' in data.columns:
            data['Adj Close'] = pd.to_numeric(data['Adj Close'], errors='coerce')  # Coerce to float, replace errors with NaN
            data['Adj Close'] = data['Adj Close'].fillna(0)  # Optional: Replace NaNs with 0 or another value
        
        # Remove 'Dividends' and 'stock_split' columns if they exist
        if 'Dividends' in data.columns:
            data = data.drop(columns=['Dividends'])
        if 'Stock Splits' in data.columns:
            data = data.drop(columns=['Stock Splits'])

        if 'Date' in data.columns:
            data['Date'] = pd.to_datetime(data["Date"]).dt.tz_localize(None).astype('datetime64[us]')

        if 'scraping_time_stamp' in data.columns:
            data['scraping_time_stamp'] = pd.to_datetime(data['scraping_time_stamp']).dt.tz_localize(None).astype('datetime64[us]')
    
        # Convert DataFrame to PyArrow Table
        table = pa.Table.from_pandas(data)

        # Write the table to Parquet file
        pq.write_table(table, filename)
        print(f"Data saved to {filename}")
    else:
        print(f"No data to save for {filename}")


def process_isins(symbol_isin_dict):
    """
    Processes a list of ISINs by downloading data from Yahoo Finance for each and saving it locally.
    
    """
    for i, isin in enumerate(symbol_isin_dict):
        print(isin)
        print(f"Processing ISIN: {isin}")
        data = download_yahoo_finance_data(isin,symbol_isin_dict)
        
        if data:
            historical_data = data.get('historical_data')
            summary = data.get('summary')
            
            # Save historical data to a Parquet file
            historical_filename = f"{isin}_historical_data.parquet"
            save_to_parquet(historical_data, historical_filename)
            
            # Save summary data to a Parquet file
            summary_filename = f"{isin}_summary_data.parquet"
            save_to_parquet(pd.DataFrame([summary]), summary_filename)
            
        # Wait for 60 seconds after every 200 ISINs
        if (i + 1) % 200 == 0:
            print("Waiting for 60 seconds to avoid rate limiting...")
            time.sleep(60)


In [None]:
symbol_isin_dict = isin_to_dict_from_parquet(destination_file_name)
process_isins(symbol_isin_dict)


In [None]:
def upload_to_gcs(bucket_name,source_file_name,destination_blob_name,credential_path):
    """
    Uploads a local Parquet file to a GCS bucket.

    :param bucket_name: The name of the GCS bucket.
    :param source_file_name: The local path to the Parquet file to be uploaded.
    :param destination_blob_name: The name for the file in the GCS bucket.
    """
    try:
        # Create a GCS client
        storage_client = storage.Client.from_service_account_json(credential_path)

        # Get the GCS bucket
        bucket = storage_client.bucket(bucket_name)

        # Create a blob (an object in the GCS bucket) and upload the file
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(source_file_name)

        print(f"File {source_file_name} successfully uploaded as {destination_blob_name} to bucket {bucket_name}.")
    except Exception as e:
        print(f"Error uploading to GCS: {e}")


def upload_isin_files_to_gcs(bucket_name, symbol_isin_dict,credential_path):
    """
    Uploads the Parquet files related to ISINs to the GCS bucket, organizing them into folders by ISIN.
    """
    for isin in symbol_isin_dict:
        historical_filename = f"{isin}_historical_data.parquet"
        summary_filename = f"{isin}_summary_data.parquet"
        
        # Upload the files to the GCS bucket
        upload_to_gcs(bucket_name, historical_filename, f"{isin}/{historical_filename}",credential_path)
        upload_to_gcs(bucket_name, summary_filename, f"{isin}/{summary_filename}",credential_path)

In [None]:
upload_isin_files_to_gcs(bucket_name,symbol_isin_dict,credential_path)

In [None]:
def load_parquet_files_to_bigquery(symbol_isin_dict, table_historical_id, table_summary_id,credential_path):
    """
    Loads Parquet files for each ISIN from GCS into BigQuery tables.
    
    Args:
        bucket_name (str): The name of the GCS bucket.
        isin_list (list): A list of ISINs to process.
        project_id (str): Your Google Cloud project ID.
        table_historical_id (str): The BigQuery table ID for historical data (e.g., 'project.dataset.table_historical').
        table_summary_id (str): The BigQuery table ID for summary data (e.g., 'project.dataset.table_summary').
    """
    
    # Initialize BigQuery and Storage clients
    bigquery_client = bigquery.Client.from_service_account_json(credential_path)
    
    for isin in symbol_isin_dict:
        try:
            # Define GCS paths for historical and summary parquet files for the current ISIN
            historical_uri = f"gs://parquet-dataset-financial-data/{isin}/{isin}_historical_data.parquet"
            summary_uri = f"gs://parquet-dataset-financial-data/{isin}/{isin}_summary_data.parquet"
            
            # Set up the job configuration for loading historical data
            job_config_historical = bigquery.LoadJobConfig(
                source_format=bigquery.SourceFormat.PARQUET,
                write_disposition=bigquery.WriteDisposition.WRITE_APPEND  # Append data to the table
            )
            
            # Load historical data from GCS to BigQuery
            load_job_historical = bigquery_client.load_table_from_uri(
                historical_uri,
                table_historical_id,
                job_config=job_config_historical
            )
            
            print(f"Starting load job for historical data ISIN: {isin}")
            load_job_historical.result()  # Wait for the job to complete
            print(f"Loaded historical data for ISIN: {isin} into {table_historical_id}")
            
            # Set up the job configuration for loading summary data
            job_config_summary = bigquery.LoadJobConfig(
                source_format=bigquery.SourceFormat.PARQUET,
                write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # Append data to the table
            )
            
            # Load summary data from GCS to BigQuery
            load_job_summary = bigquery_client.load_table_from_uri(
                summary_uri,
                table_summary_id,
                job_config=job_config_summary
            )
            
            print(f"Starting load job for summary data ISIN: {isin}")
            load_job_summary.result()  # Wait for the job to complete
            print(f"Loaded summary data for ISIN: {isin} into {table_summary_id}")

        except Exception as e:
            print(f"Error processing ISIN {isin}: {e}")
            continue

    print("Data load for all ISINs completed.")

In [None]:
table_historical_id = "dani-financial.financial_data.historical_price"
table_summary_id = "dani-financial.financial_data.summary_data"
load_parquet_files_to_bigquery(symbol_isin_dict, table_historical_id, table_summary_id,credential_path)