# Extended Case #2: ETL Pipeline 

### Import Libraries

In [1]:
import pandas as pd
import numpy as np
import warnings

# Filter out warnings
warnings.filterwarnings("ignore")



### Download necessary resources from NLTK

In [2]:
import nltk

# Download all the NLTK resources
nltk.download('all')

[nltk_data] Downloading collection 'all'
[nltk_data]    | 
[nltk_data]    | Downloading package abc to /Users/igmark/nltk_data...
[nltk_data]    |   Package abc is already up-to-date!
[nltk_data]    | Downloading package alpino to
[nltk_data]    |     /Users/igmark/nltk_data...
[nltk_data]    |   Package alpino is already up-to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger to
[nltk_data]    |     /Users/igmark/nltk_data...
[nltk_data]    |   Package averaged_perceptron_tagger is already up-
[nltk_data]    |       to-date!
[nltk_data]    | Downloading package averaged_perceptron_tagger_ru to
[nltk_data]    |     /Users/igmark/nltk_data...
[nltk_data]    |   Package averaged_perceptron_tagger_ru is already
[nltk_data]    |       up-to-date!
[nltk_data]    | Downloading package basque_grammars to
[nltk_data]    |     /Users/igmark/nltk_data...
[nltk_data]    |   Package basque_grammars is already up-to-date!
[nltk_data]    | Downloading package bcp47 to
[nltk_data]

True

### Fetch news headlines

In [3]:
import requests
import json
from bs4 import BeautifulSoup

# Send a request to the CNBC website
url = 'https://www.cnbc.com'
response = requests.get(url)

# Create BeautifulSoup object to parse the HTML content
soup = BeautifulSoup(response.content, 'html.parser')

# Find the HTML elements containing the news headlines
headline_elements = soup.find_all('a', class_='Card-title')

# Extract the headlines and save them
headlines = [headline.text.strip() for headline in headline_elements]

# Get the timestamp of the moment the request was made
timestamp = response.headers['Date']

# Create a dictionary to store the headlines and timestamp
data = {
    "headlines": headlines,
    "timestamp": timestamp
}

# Convert the dictionary to JSON
json_data = json.dumps(data)

# Print the JSON data
print("Headlines as JSON:")
print(json_data)


Headlines as JSON:
{"headlines": ["What's next for stocks partly depends on how much you believe the Fed", "Stocks making the biggest moves before the bell: American Express, Domino's, Coinbase and more", "Bitcoin drops below $25,000, Tether's stablecoin falls under its dollar peg", "Stock futures slip after Fed signals it will restart rate hikes: Live updates"], "timestamp": "Thu, 15 Jun 2023 12:29:19 GMT"}


#### Save Headlines to Dataframe 

In [4]:
import os

# Create a DataFrame
df = pd.DataFrame({'headline': headlines})

# Define the directory path
directory_path = 'data/headlines'

# Create the directory if it doesn't exist
os.makedirs(directory_path, exist_ok=True)

# Define the file path
file_path = os.path.join(directory_path, 'headlines_data.csv')

# Save the DataFrame as a CSV file
df.to_csv(file_path, index=False)

print("Headlines data saved successfully.")


Headlines data saved successfully.


In [5]:
df['url'] = url
df['timestamp'] = timestamp

### Analyze sentiment

In [6]:
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer

# Initialize the sentiment analyzer
sia = SentimentIntensityAnalyzer()

# Create empty lists to store sentiment scores and classifications
sentiment_scores = []
sentiments = []

# Analyze the sentiment of each headline
print("Sentiment Analysis:")
for headline in df['headline']:
    try:
        # Analyze the sentiment of the headline
        scores = sia.polarity_scores(headline)

        # Calculate the compound sentiment score
        compound_score = scores['compound']

        # Add the sentiment score and classification to the lists
        sentiment_scores.append(compound_score)

        # Classify the sentiment based on the compound score
        if compound_score >= 0.05:
            sentiment = 'Positive'
        elif compound_score <= -0.05:
            sentiment = 'Negative'
        else:
            sentiment = 'Neutral'
        
        sentiments.append(sentiment)

        # Print the headline, sentiment, and sentiment scores
        print("Headline:", headline)
        print("Sentiment:", sentiment)
        print("Sentiment Score:", compound_score)
        print("--------------------------------------")
    except Exception as e:
        print(f"Sentiment analysis failed for headline: {headline}. Error: {str(e)}")
        sentiment_scores.append(None)
        sentiments.append(None)

# Add the sentiment scores and classifications as new columns to the DataFrame
df['sentiment_score'] = sentiment_scores
df['sentiment'] = sentiments

print("Headlines data saved successfully.")


Sentiment Analysis:
Headline: What's next for stocks partly depends on how much you believe the Fed
Sentiment: Neutral
Sentiment Score: 0.0
--------------------------------------
Headline: Stocks making the biggest moves before the bell: American Express, Domino's, Coinbase and more
Sentiment: Neutral
Sentiment Score: 0.0
--------------------------------------
Headline: Bitcoin drops below $25,000, Tether's stablecoin falls under its dollar peg
Sentiment: Neutral
Sentiment Score: 0.0
--------------------------------------
Headline: Stock futures slip after Fed signals it will restart rate hikes: Live updates
Sentiment: Neutral
Sentiment Score: 0.0
--------------------------------------
Headlines data saved successfully.


In [7]:
df

Unnamed: 0,headline,url,timestamp,sentiment_score,sentiment
0,What's next for stocks partly depends on how m...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral
1,Stocks making the biggest moves before the bel...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral
2,"Bitcoin drops below $25,000, Tether's stableco...",https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral
3,Stock futures slip after Fed signals it will r...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral


### Translator

In [8]:
from googletrans import Translator

# Initialize the translator
translator = Translator()

# Translate the headlines to Spanish
headlines_spanish = []
for headline in headlines:
    if headline is not None:
        try:
            translated_text = translator.translate(headline, dest='es').text
            headlines_spanish.append(translated_text)
            print(f"Translated headline to Spanish: {headline} -> {translated_text}")
        except Exception as e:
            print(f"Translation to Spanish failed for headline: {headline}. Error: {str(e)}")
            headlines_spanish.append(None)
    else:
        headlines_spanish.append(None)

# Translate the headlines to Italian
headlines_italian = []
for headline in headlines:
    if headline is not None:
        try:
            translated_text = translator.translate(headline, dest='it').text
            if translated_text is not None:
                headlines_italian.append(translated_text)
                print(f"Translated headline to Italian: {headline} -> {translated_text}")
            else:
                print(f"Translation to Italian failed for headline: {headline}. Translated text is None.")
                headlines_italian.append(None)
        except Exception as e:
            print(f"Translation to Italian failed for headline: {headline}. Error: {str(e)}")
            headlines_italian.append(None)
    else:
        headlines_italian.append(None)


Translation to Spanish failed for headline: What's next for stocks partly depends on how much you believe the Fed. Error: the JSON object must be str, bytes or bytearray, not NoneType
Translation to Spanish failed for headline: Stocks making the biggest moves before the bell: American Express, Domino's, Coinbase and more. Error: the JSON object must be str, bytes or bytearray, not NoneType
Translated headline to Spanish: Bitcoin drops below $25,000, Tether's stablecoin falls under its dollar peg -> Bitcoin cae por debajo de $ 25,000, Stablecoin de Tether cae bajo su clavija en dólar
Translated headline to Spanish: Stock futures slip after Fed signals it will restart rate hikes: Live updates -> Slip de futuros de acciones después de las señales alimentadas, reiniciará aumentos de tarifas: actualizaciones en vivo
Translated headline to Italian: What's next for stocks partly depends on how much you believe the Fed -> Ciò che è il prossimo per le azioni dipende in parte da quanto credi all

In [9]:
df['headlines_spanish']= headlines_spanish
df['headlines_italian']= headlines_italian

### Extract relevant words and Tokenzie

In [10]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Download stopwords if not already downloaded
nltk.download('stopwords')

# Set the language for stopwords
stop_words = set(stopwords.words('english'))

# Tokenize and extract relevant words from the headlines
relevant_words = []
for headline in headlines:
    if headline is not None:
        tokens = word_tokenize(headline)  # Tokenize the headline
        words = [word.lower() for word in tokens if word.isalpha()]  # Keep only alphabetical words
        words = [word for word in words if word not in stop_words]  # Remove stop words
        relevant_words.append(words)  # Append the list of relevant words for each headline

# Add the relevant_words column to the existing DataFrame
df['relevant_words'] = relevant_words

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/igmark/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Relavent individual words

In [11]:
# Print the relevant words
print("Relevant Words:")
for word in relevant_words:
    print(word)


Relevant Words:
['next', 'stocks', 'partly', 'depends', 'much', 'believe', 'fed']
['stocks', 'making', 'biggest', 'moves', 'bell', 'american', 'express', 'domino', 'coinbase']
['bitcoin', 'drops', 'tether', 'stablecoin', 'falls', 'dollar', 'peg']
['stock', 'futures', 'slip', 'fed', 'signals', 'restart', 'rate', 'hikes', 'live', 'updates']


### Remove stop words from the text

In [13]:
# Convert the list of words in 'relevant_words' column to a string
df['relevant_words'] = df['relevant_words'].apply(lambda words: ' '.join(words))

# Remove stop words from the 'relevant_words' column
stop_words = set(stopwords.words('english'))
df['relevant_words'] = df['relevant_words'].apply(lambda text: ' '.join([word for word in text.split() if word not in stop_words]))

# Print the updated DataFrame
print(df)

                                            headline                   url  \
0  What's next for stocks partly depends on how m...  https://www.cnbc.com   
1  Stocks making the biggest moves before the bel...  https://www.cnbc.com   
2  Bitcoin drops below $25,000, Tether's stableco...  https://www.cnbc.com   
3  Stock futures slip after Fed signals it will r...  https://www.cnbc.com   

                       timestamp  sentiment_score sentiment  \
0  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
1  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
2  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
3  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   

                                   headlines_spanish  \
0                                               None   
1                                               None   
2  Bitcoin cae por debajo de $ 25,000, Stablecoin...   
3  Slip de futuros de acciones después de las señ...   

            

### Lemmatize the words to their base form

In [14]:
from nltk.stem import WordNetLemmatizer

# Initialize the lemmatizer
lemmatizer = WordNetLemmatizer()

# Lemmatize the words in the relevant_words column
df['relevant_words'] = df['relevant_words'].apply(lambda text: ' '.join([lemmatizer.lemmatize(word) for word in nltk.word_tokenize(text)]))

# Print the updated DataFrame
print(df)


                                            headline                   url  \
0  What's next for stocks partly depends on how m...  https://www.cnbc.com   
1  Stocks making the biggest moves before the bel...  https://www.cnbc.com   
2  Bitcoin drops below $25,000, Tether's stableco...  https://www.cnbc.com   
3  Stock futures slip after Fed signals it will r...  https://www.cnbc.com   

                       timestamp  sentiment_score sentiment  \
0  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
1  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
2  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   
3  Thu, 15 Jun 2023 12:29:19 GMT              0.0   Neutral   

                                   headlines_spanish  \
0                                               None   
1                                               None   
2  Bitcoin cae por debajo de $ 25,000, Stablecoin...   
3  Slip de futuros de acciones después de las señ...   

            

In [15]:
df

Unnamed: 0,headline,url,timestamp,sentiment_score,sentiment,headlines_spanish,headlines_italian,relevant_words
0,What's next for stocks partly depends on how m...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral,,Ciò che è il prossimo per le azioni dipende in...,next stock partly depends much believe fed
1,Stocks making the biggest moves before the bel...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral,,Titoli che fanno le mosse più grandi davanti a...,stock making biggest move bell american expres...
2,"Bitcoin drops below $25,000, Tether's stableco...",https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral,"Bitcoin cae por debajo de $ 25,000, Stablecoin...","Bitcoin scende al di sotto di $ 25.000, lo Sta...",bitcoin drop tether stablecoin fall dollar peg
3,Stock futures slip after Fed signals it will r...,https://www.cnbc.com,"Thu, 15 Jun 2023 12:29:19 GMT",0.0,Neutral,Slip de futuros de acciones después de las señ...,Scivolamento dei futures su azioni dopo i segn...,stock future slip fed signal restart rate hike...


### Save Headlines df to csv

In [16]:
df.to_csv(file_path, index=False)

print(f"Headlines data saved to: {file_path}")

Headlines data saved to: data/headlines/headlines_data.csv


### Fetch and Extract Stock Data

In [17]:
import os
from alpha_vantage.timeseries import TimeSeries

api_key = '0PCKKMC5W8W4AEGV'

symbols = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META']
interval = '1min'
outputsize = 'compact'

ts = TimeSeries(key=api_key, output_format='pandas')

stocks_data = pd.DataFrame()

for symbol in symbols:
    data, meta = ts.get_intraday(symbol, interval=interval, outputsize=outputsize)
    extracted_data = data[['1. open', '2. high', '3. low', '4. close', '5. volume']]
    extracted_data['symbol'] = symbol
    extracted_data = extracted_data.rename(columns={'1. open': 'open_price', '2. high': 'highest_price', '3. low': 'lowest_price', '4. close': 'close_price', '5. volume': 'volume'})
    extracted_data = extracted_data.reset_index().rename(columns={'index': 'date'})
    stocks_data = stocks_data.append(extracted_data)

# Create the directory for stocks
stocks_directory = 'data/stocks'
if not os.path.exists(stocks_directory):
    os.makedirs(stocks_directory)

# Save the data to a CSV file
file_path = os.path.join(stocks_directory, 'stocks_data.csv')
stocks_data.to_csv(file_path, index=False)

# Print the stocks_data DataFrame
print(stocks_data)



                  date  open_price  highest_price  lowest_price  close_price  \
0  2023-06-14 19:59:00    183.8800       183.8800      183.8800       183.88   
1  2023-06-14 19:58:00    183.8800       183.9000      183.8800       183.89   
2  2023-06-14 19:56:00    183.9200       183.9200      183.9200       183.92   
3  2023-06-14 19:54:00    183.9000       183.9200      183.8500       183.85   
4  2023-06-14 19:52:00    183.8400       183.8400      183.8400       183.84   
..                 ...         ...            ...           ...          ...   
95 2023-06-14 16:33:00    273.4100       273.5500      273.4100       273.55   
96 2023-06-14 16:32:00    273.4000       273.4400      273.3999       273.44   
97 2023-06-14 16:31:00    273.2803       273.4000      273.2800       273.40   
98 2023-06-14 16:30:00    273.2500       273.3384      273.2400       273.33   
99 2023-06-14 16:29:00    273.2300       273.2500      273.2200       273.25   

    volume symbol  
0    603.0   AAPL  

In [18]:
stocks_data

Unnamed: 0,date,open_price,highest_price,lowest_price,close_price,volume,symbol
0,2023-06-14 19:59:00,183.8800,183.8800,183.8800,183.88,603.0,AAPL
1,2023-06-14 19:58:00,183.8800,183.9000,183.8800,183.89,448.0,AAPL
2,2023-06-14 19:56:00,183.9200,183.9200,183.9200,183.92,406.0,AAPL
3,2023-06-14 19:54:00,183.9000,183.9200,183.8500,183.85,1395.0,AAPL
4,2023-06-14 19:52:00,183.8400,183.8400,183.8400,183.84,836.0,AAPL
...,...,...,...,...,...,...,...
95,2023-06-14 16:33:00,273.4100,273.5500,273.4100,273.55,1364.0,META
96,2023-06-14 16:32:00,273.4000,273.4400,273.3999,273.44,2437.0,META
97,2023-06-14 16:31:00,273.2803,273.4000,273.2800,273.40,2842.0,META
98,2023-06-14 16:30:00,273.2500,273.3384,273.2400,273.33,7792.0,META


## Load

In [None]:
# Concatenate all the DataFrames into a single DataFrame

#Load the data into a SQLLite database

#Read the data from the CSV files and load into the SQLLite database

### Create SQLite Database

In [33]:
import sqlite3
import csv
import glob

# Step 1: Create SQLite Database
conn = sqlite3.connect('etl_extended_case.db')
cursor = conn.cursor()

In [34]:
# Connect to the database
conn = sqlite3.connect('etl_extended_case.db')
cursor = conn.cursor()

# Check if the "headlines" table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='headlines'")
table_exists = cursor.fetchone()

if table_exists:
    print("Table 'headlines' already exists. Skipping table creation.")
else:
    cursor.execute('''CREATE TABLE headlines (
                        id INTEGER PRIMARY KEY,
                        headline TEXT,
                        sentiment_score REAL,
                        sentiment TEXT,
                        relevant_words TEXT,
                        url TEXT,
                        timestamp TEXT,
                        headlines_spanish TEXT,
                        headlines_italian TEXT
                    )''')
    print("Table 'headlines' created successfully.")

# Check if the "stocks" table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='stocks'")
table_exists = cursor.fetchone()

if table_exists:
    print("Table 'stocks' already exists. Skipping table creation.")
else:
    cursor.execute('''CREATE TABLE stocks (
                        id INTEGER PRIMARY KEY,
                        date TEXT,
                        open_price REAL,
                        highest_price REAL,
                        lowest_price REAL,
                        close_price REAL,
                        volume REAL,
                        symbol TEXT
                    )''')
    print("Table 'stocks' created successfully.")


Table 'headlines' already exists. Skipping table creation.
Table 'stocks' already exists. Skipping table creation.


In [50]:
# Function to load data from CSV into the SQLite database
def load_data_from_csv(filename, table_name):
    with open(filename, 'r') as file:
        csv_data = csv.reader(file)
        next(csv_data)  # Skip the header row
        for row in csv_data:
            # Ensure the number of values matches the number of placeholders in the query
            if len(row) == 9:
                cursor.execute(f"INSERT INTO {table_name} VALUES (?,?,?,?,?,?,?,?,?)", row)
        conn.commit()

# Read data from "headlines_data.csv" and load into the SQLite database
load_data_from_csv('data/headlines/headlines_data.csv', 'headlines')

# Read data from "stocks_data.csv" and load into the SQLite database
load_data_from_csv('data/stocks/stocks_data.csv', 'stocks')

# Close the cursor and the connection
cursor.close()
conn.close()