# <center>Energy Market Tracker<center> <center> Pipeline for Apache Cassandra DW<center>

This project aims to **collect daily data** on energy sector prices (such as oil, natural gas, solar energy, wind energy, nuclear energy, and coal) and **store it** in an **Apache Cassandra Data Warehouse** for analysis.

For this project I have used the following sources:
* www.oilprice.com - Oilprice.com is the most popular energy news site in the world
* www.trandingeconomics.com - provides over 20 million economic indicators for 196 countries
* www.markets.businessinsider.com - an American media company known for publishing financial news

In [None]:
from bs4 import BeautifulSoup as bs
import urllib.request
import datetime
from cassandra.cluster import Cluster 
import requests

## System TIMESTAMP

In [2]:
# SYSTEM TIMESTAMP

system_timestamp = datetime.datetime.now()
system_time = str(system_timestamp).split('.')[0]

## Oil & natural gas

In [3]:
#OIL, NATURAL GAS

page = urllib.request.urlopen("https://oilprice.com/oil-price-charts/#prices")
soup1 = bs(page)

oil1 = soup1.find_all('span', class_='blend_name_span')
lprice = soup1.find_all('td', class_ = 'last_price')
oilchange = soup1.find_all('td', class_ = 'change_up flat_change_cell')
oilchange_percent = soup1.find_all('td', class_ = 'change_up_percent percent_change_cell')
op = soup1.find_all('td', class_ = 'change_up')
op1 = soup1.find_all('td', class_ = 'change_up_percent')

wti_crude_name = oil1[0].text
brent_crude_name= oil1[1].text
wti_crude_price = lprice[0].text
brent_crude_price = lprice[1].text
wti_crude_change = oilchange[0].text
brent_crude_change = oilchange[1].text
wti_crude_percent = oilchange_percent[0].text[:6]
brent_crude_percent = oilchange_percent[1].text[:6]
ng_name = oil1[2].text
ng_price = lprice[2].text
ng_change = oilchange[2].text
ng_percent = oilchange_percent[2].text[:6]
opec_name = oil1[5].text
opec_price = lprice[5].text
opec_change = op[5].text
opec_percent = op1[5].text[:6]

wti = {'name':wti_crude_name, 'last':wti_crude_price, 'change':wti_crude_change, '% change':wti_crude_percent}
brent = {'name':brent_crude_name, 'last':brent_crude_price, 'change':brent_crude_change, '% change':brent_crude_percent}
opec = {'name':opec_name, 'last':opec_price, 'change':opec_change, '% change':opec_percent}
natural_gas = {'name':ng_name, 'last':ng_price, 'change':ng_change, '% change':ng_percent}

In [4]:
print(opec)

{'name': 'Heating Oil', 'last': '2.642', 'change': '+0.000', '% change': '+0%(13'}


## Solar, wind and nuclear energy

In [None]:
# SOLAR, WIND, NUCLEAR

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36'
}

url2 = 'https://tradingeconomics.com/commodities'

response = requests.get(url2, headers=headers)
soup2 = bs(response.content, 'html.parser')

In [None]:
# SOLAR, WIND, NUCLEAR
#page = urllib.request.urlopen("https://tradingeconomics.com/commodities")
#soup2 = bs(page)

w = soup2.find_all('tr', class_ = 'datatable-row')
pr = soup2.find_all('tr', class_ = 'datatable-row')
n = soup2.find_all('tr', class_ = 'datatable-row-alternating')

wind1 = str(w[36]).split('<')
wind_name = wind1[4][2::]
wind_price = wind1[10].split(" ")[83]
wind_change = wind1[12].split('"')[3]
wind_percent = wind1[16].split('"')[3] + '%'

solar = str(pr[35]).split('<')
solar_name = solar[4][2::]
solar_price = solar[10].split(" ")[83]
solar_change = solar[12].split('"')[3]
solar_percent = solar[16].split('"')[3] + '%'

nuclear1 = str(n[30]).split('<')
nuclear_name = nuclear1[4][2::]
nuclear_price = nuclear1[10].split(" ")[83]
nuclear_change = nuclear1[12].split('"')[3]
nuclear_percent = nuclear1[16].split('"')[3] + '%'

wind = {'name':wind_name, 'last':wind_price, 'change':wind_change, '% change':wind_percent}
solar = {'name':solar_name, 'last':solar_price, 'change':solar_change, '% change':solar_percent}
nuclear = {'name':nuclear_name, 'last':nuclear_price, 'change':nuclear_change, '% change':nuclear_percent}

In [None]:
# Function to extract data from HTML elements
def extract_data(row):
    columns = row.find_all('td')
    name = columns[1].text.strip()
    last_price = columns[2].text.strip()
    change = columns[3].text.strip()
    percent_change = columns[4].text.strip()
    return {'name': name, 'last': last_price, 'change': change, '% change': percent_change}

# Find rows containing data for solar, wind, and nuclear
rows = soup2.find_all('tr', class_='datatable-row') + soup2.find_all('tr', class_='datatable-row-alternating')

# Extract data for solar, wind, and nuclear using try-except blocks
solar_data, wind_data, nuclear_data = {}, {}, {}

for row in rows:
    try:
        name = row.find('a').text.strip().lower()
        if 'solar' in name:
            solar_data = extract_data(row)
        elif 'wind' in name:
            wind_data = extract_data(row)
        elif 'nuclear' in name:
            nuclear_data = extract_data(row)
    except Exception as e:
        print(f"Error occurred: {e}")

# Print the extracted data
print("Solar Data:", solar_data)
print("Wind Data:", wind_data)
print("Nuclear Data:", nuclear_data)

## Coal

In [8]:
#COAL
page = urllib.request.urlopen("https://markets.businessinsider.com/commodities/coal-price")
soup3 = bs(page)

c = soup3.find_all('div', class_ = 'price-section__row')

cc = str(c).split('<')
coal_name = cc[3][34:38]
coal_price = cc[9].split('>')[-1]
coal_change = cc[13].split('>')[-1]
coal_percent = cc[15].split('>')[-1]

coal = {'name':coal_name, 'last':coal_price, 'change':coal_change, '% change':coal_percent}

## Sources

In [9]:
# SOURCES

s1_id = 1
s2_id = 2
s3_id = 3

s1_name = 'oilprice.com'
s2_name = 'TRANDING ECONOMICS'
s3_name = 'U.S. Insider Inc.'

s1_description = 'Oilprice.com is the most popular energy news site in the world'
s2_description = 'provides over 20 million economic indicators for 196 countries'
s3_description = 'an American online media company known for publishing the financial news website Insider'

s1_publisher = 'www.oilprice.com'
s2_publisher = 'www.trandingeconomics.com'
s3_publisher = 'www.markets.businessinsider.com'

s1_attr = {'last':'', 'change':'', '% change':''}
s2_attr = {'last':'', 'change':'', '% change':''}
s3_attr = {'last':'', 'change':'', '% change':''}

s1 = {'id':s1_id, 'name':s1_name, 'description':s1_description, 
      'Publisher':s1_publisher, 'SystemDate':system_time, 'Attributes':s1_attr}
s2 = {'id':s2_id, 'name':s2_name, 'description':s2_description, 
      'Publisher':s2_publisher, 'SystemDate':system_time, 'Attributes':s2_attr}
s3 = {'id':s3_id, 'name':s3_name, 'description':s3_description, 
      'Publisher':s3_publisher, 'SystemDate':system_time, 'Attributes':s3_attr}

## Assets

In [None]:
#ASSET

as1_attr = {'last':wti_crude_price, 'change':wti_crude_change, '% change':wti_crude_percent}
as2_attr = {'last':brent_crude_price, 'change':brent_crude_change, '% change':brent_crude_percent}
as3_attr = {'last':opec_price, 'change':opec_change, '% change':opec_percent}
as4_attr = {'last':ng_price, 'change':ng_change, '% change':ng_percent}
as5_attr = {'last':wind_price, 'change':wind_change, '% change':wind_percent}
as6_attr = {'last':solar_price, 'change':solar_change, '% change':solar_percent}
as7_attr = {'last':nuclear_price, 'change':nuclear_change, '% change':nuclear_percent}
as8_attr = {'last':coal_price, 'change':coal_change, '% change':coal_percent}

as1_name = 'WTI CRUDE'
as2_name = 'BRENT CRUDE'
as3_name = 'OPEC BASKET'
as4_name = 'NATURAL GAS'
as5_name = 'Coal'
as6_name = 'Wind Energy Index'
as7_name = 'Solar Energy Index'
as8_name = 'Nuclear Energy Index'

as1_id = 1
as2_id = 2
as3_id = 3
as4_id = 4
as5_id = 5
as6_id = 6
as7_id = 7
as8_id = 8

as1_desc = 'West Texas Intermediate'
as2_desc = 'North Sea'
as3_desc = 'OPEC Reference Basket (ORB)'

a1 = {'Id':as1_id, 'Name':as1_name, 'Description':as1_desc, 'SystemDate':system_time, 'Attributes':as1_attr}
a2 = {'Id':as2_id, 'Name':as2_name, 'Description':as2_desc, 'SystemDate':system_time, 'Attributes':as2_attr}
a3 = {'Id':as3_id, 'Name':as3_name, 'Description':as3_desc, 'SystemDate':system_time, 'Attributes':as3_attr}
a4 = {'Id':as4_id, 'Name':as4_name, 'SystemDate':system_time, 'Attributes':as4_attr}
a5 = {'Id':as5_id, 'Name':as5_name, 'SystemDate':system_time, 'Attributes':as5_attr}
a6 = {'Id':as5_id, 'Name':as5_name, 'SystemDate':system_time, 'Attributes':as6_attr}
a7 = {'Id':as6_id, 'Name':as6_name, 'SystemDate':system_time, 'Attributes':as7_attr}
a8 = {'Id':as7_id, 'Name':as7_name, 'SystemDate':system_time, 'Attributes':as8_attr}

## Apache Cassandra ingestion

In [57]:
# CLUSTER connection
############### INSERT INTO energy.asset
cluster = Cluster()
session = cluster.connect('energy')

insert_sql = ("INSERT INTO energy.asset (id, name, description, SystemDate, attributes) "
              "VALUES(%s, %s, %s, %s, %s) ")

insert_data = (1, as1_name, as1_desc, system_time, as1_attr)
session.execute(insert_sql, insert_data)
insert_data = (2, as2_name, as2_desc, system_time, as2_attr)
session.execute(insert_sql, insert_data)
insert_data = (3, as3_name, as3_desc, system_time, as3_attr)
session.execute(insert_sql, insert_data)
insert_data = (4, as4_name, '', system_time, as4_attr)
session.execute(insert_sql, insert_data)
insert_data = (5, as5_name, '', system_time, as5_attr)
session.execute(insert_sql, insert_data)
insert_data = (6, as6_name, '', system_time, as6_attr)
session.execute(insert_sql, insert_data)
insert_data = (7, as7_name, '', system_time, as7_attr)
session.execute(insert_sql, insert_data)
insert_data = (8, as8_name, '', system_time, as8_attr)

In [70]:
# CLUSTER connection
############### INSERT INTO energy.source

cluster = Cluster()
session = cluster.connect('energy')

insert_sql = ("INSERT INTO energy.source (id, name, description, publisher, systemdate, attributes) "
              "VALUES(%s, %s, %s, %s, %s, %s) ")

insert_data = (1, s1_name, s1_description, s1_publisher, system_time, {'last', 'change', '% change'})
session.execute(insert_sql, insert_data)
insert_data = (2, s2_name, s2_description, s2_publisher, system_time, {'last', 'change', '% change'})
session.execute(insert_sql, insert_data)
insert_data = (3, s3_name, s3_description, s3_publisher, system_time, {'last', 'change', '% change'})
session.execute(insert_sql, insert_data)

<cassandra.cluster.ResultSet at 0x244cb4032e0>

In [91]:
assetid = as1_id
sourceid = s1_id
businessdate = '2022-05-09'
systemdata = system_time
valuesdouble1 = {'last':float(wti_crude_price)}
valuestext1 = {'change':wti_crude_change, '% change':wti_crude_percent}

valuesdouble1 = {'last':float(brent_crude_price)}
valuestext1 = {'change':brent_crude_change, '% change':brent_crude_percent}

 

In [92]:
# CLUSTER connection
############### INSERT INTO energy.data

cluster = Cluster()
session = cluster.connect('energy')

insert_sql = ("INSERT INTO energy.data (assetid, sourceid, businessdate, systemdate, valuesdouble, valuesinteger, valuestext) "
              "VALUES(%s, %s, %s, %s, %s, %s, %s) ")

insert_data = (assetid, sourceid, businessdate, system_time, valuesdouble1, None, valuestext1)
session.execute(insert_sql, insert_data)
insert_data = (as2_id, s1_id, businessdate, system_time, valuesdouble2, None, valuestext2)
session.execute(insert_sql, insert_data)

<cassandra.cluster.ResultSet at 0x244caae1400>