### Importing the necessary libraies

In [3]:
import os
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

### Configuring the Extaction of Data
###### Looad the .env environment and 
###### Ensure all varibles are correct (API_KEY and SYMBOLS)

In [4]:
load_dotenv()
API_KEY = os.environ.get("API_KEY")
SYMBOLS = ['AAPL', 'GOOG', 'MSFT', 'AMZN', 'IBM']

def extract_data(symbols, api_key):
   
    all_rows = []

    for symbol in symbols:
        print(f"Fetching data for {symbol}...")
        url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&apikey={api_key}'
        response = requests.get(url)
        data = response.json()

        # Extract time series data
        try:
            time_series = data['Time Series (Daily)']
        except KeyError:
            print(f"No data found for {symbol}. Skipping...")
            continue

        # Loop through the time series data
        for date, values in time_series.items():
            row = {
                "Date": datetime.strptime(date, "%Y-%m-%d"),
                "Symbol": symbol,
                "Open": float(values["1. open"]),
                "High": float(values["2. high"]),
                "Low": float(values["3. low"]),
                "Close": float(values["4. close"]),
                "Volume": int(values["5. volume"]),
            }
            all_rows.append(row)

    return pd.DataFrame(all_rows)

In [10]:
df = extract_data(SYMBOLS, API_KEY)

Fetching data for AAPL...
Fetching data for GOOG...
Fetching data for MSFT...
Fetching data for AMZN...
Fetching data for IBM...


### Transforming the Data
######  Transform the extracted data by cleaning and deduplicating it.
###### Removing a empty cell
###### Dropping duplicates 

In [11]:
def transform_data(df):
    # Drop null values and duplicates
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    return df

In [16]:
transformed_df= df

In [17]:
transformed_df

Unnamed: 0,Date,Symbol,Open,High,Low,Close,Volume
0,2024-12-31,AAPL,252.44,253.2800,249.4300,250.42,39480718
1,2024-12-30,AAPL,252.23,253.5000,250.7500,252.20,35557542
2,2024-12-27,AAPL,257.83,258.7000,253.0600,255.59,42355321
3,2024-12-26,AAPL,258.19,260.1000,257.6300,259.02,27262983
4,2024-12-24,AAPL,255.49,258.2100,255.2900,258.20,23234705
...,...,...,...,...,...,...,...
495,2024-08-15,IBM,193.51,194.2500,193.2800,193.95,2471985
496,2024-08-14,IBM,191.15,193.0900,190.7300,192.32,1895114
497,2024-08-13,IBM,190.29,191.3100,189.2100,190.99,2178862
498,2024-08-12,IBM,191.25,191.5761,189.0001,189.48,2290421


### Loading of the File 
###### This session, the transformed data will be loaded to Azure blob storage in a CSV format. The loading is designed in such a way that the upload file will be read, a previous data will be identified, downloaded, temporarilly,  read, concacnated with the new data. A further cleaning will takeplace by  removing  duplicate, and dropping null values. This new data will now be renamed to the latest datetime stamp and  uploaded to the container

In [None]:
### install the relevant libraries


In [27]:
# Load environment variables
load_dotenv()
# Defining the storage parameters

CONNECTION_STRING =os.environ.get("AZURE_CONNECTION_STRING")
CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME")
from azure.storage.blob import BlobServiceClient

#Loading the Data
def load_data_to_azure(df, connection_string, container_name):
    from azure.storage.blob import BlobServiceClient, BlobClient
    import pandas as pd
    from datetime import datetime
    import logging
    import io

    logging.basicConfig(level=logging.INFO)

    # Connect to Azure Blob Storage
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service_client.get_container_client(container_name)

    # Define the pattern for identifying previous files
    file_prefix = "stock_data_"
    file_extension = ".csv"
    previous_blob_name = None

    # Fetch the latest blob with matching pattern
    blobs = list(container_client.list_blobs())
    matching_blobs = [blob.name for blob in blobs if blob.name.startswith(file_prefix) and blob.name.endswith(file_extension)]

    if matching_blobs:
        previous_blob_name = max(matching_blobs)  # Get the latest file (lexicographical order works with timestamps)

    if previous_blob_name:
        try:
            logging.info(f"Previous data file found: {previous_blob_name}")
            
            # Download the existing blob content
            previous_blob_client = container_client.get_blob_client(previous_blob_name)
            blob_stream = io.BytesIO(previous_blob_client.download_blob().readall())
            previous_data = pd.read_csv(blob_stream)

            # Clean existing data
            previous_data["Date"] = pd.to_datetime(previous_data["Date"])
            previous_data.drop_duplicates(inplace=True)
            previous_data.dropna(inplace=True)

            # Concatenate with new data
            combined_data = pd.concat([previous_data, df]).drop_duplicates().reset_index(drop=True)

            # Rename the existing blob
            timestamp = datetime.now().strftime('%d%m%y_%H%M')
            backup_blob_name = f"{file_prefix}{timestamp}_backup{file_extension}"
            container_client.get_blob_client(previous_blob_name).start_copy_from_url(previous_blob_client.url)
            previous_blob_client.delete_blob()
            logging.info(f"Previous blob renamed to: {backup_blob_name}")
        
        except Exception as e:
            logging.error(f"Error processing previous blob: {e}")
            combined_data = df
    else:
        logging.info("No previous data file found. Creating a new one.")
        combined_data = df

    # Upload the new file
    current_timestamp = datetime.now().strftime('%d%m%y_%H%M')
    new_blob_name = f"{file_prefix}{current_timestamp}{file_extension}"

    # Save to a temporary buffer and upload
    output_stream = io.BytesIO()
    combined_data.to_csv(output_stream, index=False)
    output_stream.seek(0)

    try:
        new_blob_client = container_client.get_blob_client(new_blob_name)
        new_blob_client.upload_blob(output_stream, overwrite=True)
        logging.info(f"Data successfully uploaded to: {new_blob_name}")
    except Exception as e:
        logging.error(f"Error uploading new blob: {e}")


In [28]:
load_data_to_azure(transformed_df, CONNECTION_STRING, CONTAINER_NAME)

INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'https://capitaledgestorage.blob.core.windows.net/real-data?restype=REDACTED&comp=REDACTED'
Request method: 'GET'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'User-Agent': 'azsdk-python-storage-blob/12.23.1 Python/3.12.6 (Windows-10-10.0.19045-SP0)'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'e322c5a9-c877-11ef-ad60-448500d68147'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Transfer-Encoding': 'chunked'
    'Content-Type': 'application/xml'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'cb35d1b0-701e-0083-5d84-5c78c3000000'
    'x-ms-client-request-id': 'e322c5a9-c877-11ef-ad60-448500d68147'
    'x-ms-version': 'REDACTED'
    'Date': 'Wed, 01 Jan 2025 19:37:58 GMT'
INFO:root:No previous data file found. Creati