In [1]:
import pandas as pd
import warnings
import pymongo
from datetime import datetime
# pd.set_option('display.max_rows', None)

pd.options.mode.chained_assignment = None
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Setup MongoDB connection (local)
mongo_host = "localhost"
mongo_port = 27017
mongo_user = "admin"
mongo_password = "password"
auth_db = "admin"
client_mongo = pymongo.MongoClient(
    host=mongo_host,
    port=mongo_port,
    username=mongo_user,
    password=mongo_password,
    authSource=auth_db
)

db_mongo_denormalization = client_mongo.get_database("denormalization")
db_mongo_datalake = client_mongo.get_database("datalake")


# GET brent_eur_liter (daily data)
collection_mongo = db_mongo_datalake.get_collection("stockmarket_brent_eur_liter")
cursor = collection_mongo.find({},{"_id": 0, "Date": 1, "Close": 1})
df_brent_eur_liter = pd.DataFrame(list(cursor))
df_brent_eur_liter = df_brent_eur_liter.rename(columns={'Close': 'BRENT_eur_liter'})
df_brent_eur_liter['Date'] = pd.to_datetime(df_brent_eur_liter['Date'])
print("df_brent_eur_liter\n", df_brent_eur_liter.head())


# GET refining_margin_brent_eur (monthly data)
collection_mongo = db_mongo_denormalization.get_collection("refining_margin_brent_eur")
cursor = collection_mongo.find({},{"_id": 0, "Date_monthly": 1, "refining_margin_brent_eur_liter": 1})
df_refining_margin_brent_eur_liter = pd.DataFrame(list(cursor))
df_refining_margin_brent_eur_liter = df_refining_margin_brent_eur_liter.rename(columns={'Date_monthly': 'Date'})
df_refining_margin_brent_eur_liter['Date'] = pd.to_datetime(df_refining_margin_brent_eur_liter['Date'])
# Convert monthly data to daily data
df_refining_margin_brent_eur_liter = df_refining_margin_brent_eur_liter.set_index('Date')
df_refining_margin_brent_eur_liter = df_refining_margin_brent_eur_liter.resample('D').ffill()
df_refining_margin_brent_eur_liter = df_refining_margin_brent_eur_liter.reset_index()
print("df_refining_margin_brent_eur_liter\n", df_refining_margin_brent_eur_liter.head())



# GET calc_transp_fees_gas_eur_liter (monthly data)
collection_mongo = db_mongo_denormalization.get_collection("calc_transp_fees_gas_eur_liter")
cursor = collection_mongo.find({})
calc_transp_fees_gas_eur_liter = pd.DataFrame(list(cursor))
calc_transp_fees_gas_eur_liter = calc_transp_fees_gas_eur_liter.drop(columns=['_id'])
calc_transp_fees_gas_eur_liter['Date'] = pd.to_datetime(calc_transp_fees_gas_eur_liter['Date'])
# Convert monthly data to daily data
calc_transp_fees_gas_eur_liter = calc_transp_fees_gas_eur_liter.set_index('Date')
calc_transp_fees_gas_eur_liter = calc_transp_fees_gas_eur_liter.resample('D').ffill()
calc_transp_fees_gas_eur_liter = calc_transp_fees_gas_eur_liter.reset_index()
print("calc_transp_fees_gas_eur_liter\n", calc_transp_fees_gas_eur_liter.head())


# GET station_ttc_gas_eur_liter (daily data)
collection_mongo = db_mongo_denormalization.get_collection("station_ttc_gas_eur_liter")
cursor = collection_mongo.find({},{"_id": 0, "Date": 1, "station_ttc_GAZOLE_eur_liter": 1,
                                   "station_ttc_SP95_eur_liter": 1,  "station_ttc_E10_eur_liter": 1})
df_station_ttc_gas_eur_liter = pd.DataFrame(list(cursor))
print("df_station_ttc_gas_eur_liter\n", df_station_ttc_gas_eur_liter.head())



# --- Merge the 4 df ---
df_merged = pd.merge(df_brent_eur_liter, df_refining_margin_brent_eur_liter, on='Date', how='left')
df_merged = pd.merge(df_merged, calc_transp_fees_gas_eur_liter, on='Date', how='left')
df_merged = pd.merge(df_merged, df_station_ttc_gas_eur_liter, on='Date', how='left')
df_merged = df_merged[df_merged['refining_margin_brent_eur_liter'].notna()].reset_index(drop=True)
print("df_merged\n", df_merged.head())

df_gas_taxes = pd.DataFrame()
df_gas_taxes['Date'] = df_merged['Date']
df_gas_taxes['taxes_GAZOLE'] = (
    df_merged['station_ttc_GAZOLE_eur_liter'] - (
    df_merged['BRENT_eur_liter'] + df_merged['refining_margin_brent_eur_liter'] + df_merged['calc_transp_fees_GAZOLE_eur_liter']
    )
).round(5)
df_gas_taxes['taxes_SP95'] = (
    df_merged['station_ttc_SP95_eur_liter'] - (
    df_merged['BRENT_eur_liter'] + df_merged['refining_margin_brent_eur_liter'] + df_merged['calc_transp_fees_SP95_eur_liter']
    )
).round(5)
df_gas_taxes['taxes_E10'] = (
    df_merged['station_ttc_E10_eur_liter'] - (
    df_merged['BRENT_eur_liter'] + df_merged['refining_margin_brent_eur_liter'] + df_merged['calc_transp_fees_E10_eur_liter']
    )
).round(5)
print("df_gas_taxes\n", df_gas_taxes.head())

# ----- Push to MongoDB -----
db_mongo_denormalization.drop_collection("gas_taxes")
collection_mongo = db_mongo_denormalization.get_collection("gas_taxes")
collection_mongo.create_index([("Date", pymongo.ASCENDING)])

records = df_gas_taxes.to_dict(orient="records")
collection_mongo.insert_many(records)
print("correctly loaded df_gas_taxes to denormalized collection MongoDB")




# --- Merge the 2 df to create table_all_segments for visualisation on Metabase ---
df_all_segments_for_ttc_gas = pd.merge(df_merged, df_gas_taxes, on='Date', how='left')
print("df_all_segments_for_ttc_gas\n", df_all_segments_for_ttc_gas.head())

# ----- Push to MongoDB -----
db_mongo_denormalization.drop_collection("all_segments_for_ttc_gas")
collection_mongo = db_mongo_denormalization.get_collection("all_segments_for_ttc_gas")
collection_mongo.create_index([("Date", pymongo.ASCENDING)])

records = df_all_segments_for_ttc_gas.to_dict(orient="records")
collection_mongo.insert_many(records)
print("correctly loaded df_all_segments_for_ttc_gas to denormalized collection MongoDB")

df_brent_eur_liter
         Date  BRENT_eur_liter
0 2007-07-30            0.347
1 2007-07-31            0.354
2 2007-08-01            0.347
3 2007-08-02            0.348
4 2007-08-03            0.341
df_refining_margin_brent_eur_liter
         Date  refining_margin_brent_eur_liter
0 2015-01-01                          0.03769
1 2015-01-02                          0.03769
2 2015-01-03                          0.03769
3 2015-01-04                          0.03769
4 2015-01-05                          0.03769
calc_transp_fees_gas_eur_liter
         Date  calc_transp_fees_GAZOLE_eur_liter  \
0 2015-01-01                            0.09499   
1 2015-01-02                            0.09499   
2 2015-01-03                            0.09499   
3 2015-01-04                            0.09499   
4 2015-01-05                            0.09499   

   calc_transp_fees_SP95_eur_liter  calc_transp_fees_E10_eur_liter  
0                          0.08502                         0.08115  
1          