Populating the tables on the SQL database through excel file

In [5]:
import cx_Oracle
import pandas as pd
import os

# Connect to Oracle Database
dsn = cx_Oracle.makedsn('localhost', '1521', service_name='orcl')  # Update this as per your details
connection = cx_Oracle.connect(user='c##dwh', password='1234', dsn=dsn)

cursor = connection.cursor()

# Path to your Excel file (update this as per your directory)
excel_file_path = r'C:\Users\imran\Desktop\IBA\2nd Semester\Data Analytics and Warehouisng\Assignments\Assignment 2\AgricultureData.xlsx'

# Sheet names in the Excel file and their corresponding table names in the Oracle DB
sheet_to_table = {
    'Farms': 'Farms',
    'Farmers': 'Farmers',
    'Crops': 'Crops',
    'Livestock': 'Livestock',
    'SoilQuality': 'SoilQuality',
    'WeatherData': 'WeatherData',
    'Irrigation': 'Irrigation',
    'Pesticides': 'Pesticides',
    'Fertilizers': 'Fertilizers',
    'FarmEquipment': 'FarmEquipment',
    'Harvest': 'Harvest',
    'MarketPrices': 'MarketPrices',
    'Storage': 'Storage',
    'SupplyChain': 'SupplyChain',
    'GovernmentSubsidies': 'GovernmentSubsidies'
}

def insert_data_into_table(table_name, data_frame):
    # Generate insert SQL query
    columns = ', '.join(data_frame.columns)
    placeholders = ', '.join([f":{i+1}" for i in range(len(data_frame.columns))])
    sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
    
    # Execute insert for each row in the dataframe
    for row in data_frame.itertuples(index=False):
        cursor.execute(sql, row)
    
    # Commit the transaction after inserting data
    connection.commit()

# Load the Excel file
xls = pd.ExcelFile(r'C:\Users\imran\Desktop\IBA\2nd Semester\Data Analytics and Warehouisng\Assignments\Assignment 2\AgricultureData.xlsx')

# Loop through all sheets and load their data into Oracle tables
for sheet_name, table_name in sheet_to_table.items():
    if sheet_name in xls.sheet_names:
        # Load the sheet data into pandas DataFrame
        df = pd.read_excel(xls, sheet_name=sheet_name)
        
        # Insert data into the corresponding table
        insert_data_into_table(table_name, df)
        print(f"Data inserted into {table_name} table.")
    else:
        print(f"Sheet {sheet_name} not found in the Excel file.")

# Close the cursor and connection
cursor.close()
connection.close()

Data inserted into Farms table.
Data inserted into Farmers table.
Data inserted into Crops table.
Data inserted into Livestock table.
Data inserted into SoilQuality table.
Data inserted into WeatherData table.
Data inserted into Irrigation table.
Data inserted into Pesticides table.
Data inserted into Fertilizers table.
Data inserted into FarmEquipment table.
Data inserted into Harvest table.
Data inserted into MarketPrices table.
Data inserted into Storage table.
Data inserted into SupplyChain table.
Data inserted into GovernmentSubsidies table.


Creating Star schema's tables

In [1]:
import cx_Oracle

# Connect to Oracle Database
dsn = cx_Oracle.makedsn('localhost', '1521', service_name='orcl')
connection = cx_Oracle.connect(user='c##dwh', password='1234', dsn=dsn)
cursor = connection.cursor()

# Create DimDate table
cursor.execute("""
CREATE TABLE DimDate (
    DateKey INT PRIMARY KEY,
    Month VARCHAR2(20),
    Year INT,
    Season VARCHAR2(20)
)
""")

# Create DimFarm table
cursor.execute("""
CREATE TABLE DimFarm (
    FarmID INT PRIMARY KEY,
    FarmName VARCHAR2(100),
    FarmLocation VARCHAR2(100),
    FarmSize INT,
    FarmType VARCHAR2(50)
)
""")

# Create DimCrop table
cursor.execute("""
CREATE TABLE DimCrop (
    CropID INT PRIMARY KEY,
    CropName VARCHAR2(100),
    Season VARCHAR2(20)
)
""")

# Create DimMarketPrice table
cursor.execute("""
CREATE TABLE DimMarketPrice (
    MarketPriceID INT PRIMARY KEY,
    DateKey INT,
    CropID INT,
    PricePerTon DECIMAL(10,2),
    FOREIGN KEY (DateKey) REFERENCES DimDate(DateKey),
    FOREIGN KEY (CropID) REFERENCES DimCrop(CropID)
)
""")

# Create DimHarvest table
cursor.execute("""
CREATE TABLE DimHarvest (
    HarvestID INT PRIMARY KEY,
    CropID INT,
    FarmID INT,
    HarvestDate DATE,
    QuantityTons DECIMAL(10,2),
    FOREIGN KEY (CropID) REFERENCES DimCrop(CropID),
    FOREIGN KEY (FarmID) REFERENCES DimFarm(FarmID)
)
""")

# Create FactFarmPerformance table
cursor.execute("""
CREATE TABLE FactFarmPerformance (
    DateKey INT,
    FarmID INT,
    CropID INT,
    TotalRevenue DECIMAL(12,2),
    TotalYield DECIMAL(12,2),
    MarketPrice DECIMAL(10,2),
    QuantityTons DECIMAL(10,2),
    FOREIGN KEY (DateKey) REFERENCES DimDate(DateKey),
    FOREIGN KEY (FarmID) REFERENCES DimFarm(FarmID),
    FOREIGN KEY (CropID) REFERENCES DimCrop(CropID)
)
""")

connection.commit()
cursor.close()
connection.close()

Transformation Function & ETL Pipeline Implementation

In [2]:
import cx_Oracle
import pandas as pd

# Establish connection to the OLTP Oracle database
dsn = cx_Oracle.makedsn('localhost', '1521', service_name='orcl')
conn = cx_Oracle.connect(user='c##dwh', password='1234', dsn=dsn)
oltp_cursor = conn.cursor()

df_FARMS = pd.read_sql("SELECT FARMID, NAME AS FARMNAME, LOCATION AS FARMLOCATION, SIZE_ACRES AS FARMSIZE FROM FARMS", conn)
df_CROPS = pd.read_sql("SELECT CROPID, NAME AS CROPNAME, FARMID, SEASON, YIELD_TONS AS YIELDTONS FROM CROPS", conn)
df_HARVEST = pd.read_sql("SELECT HARVESTID, CROPID, FARMID, QUANTITY_TONS AS QUANTITYTONS, HARVESTDATE FROM HARVEST", conn)
df_MARKET = pd.read_sql("SELECT MARKETID, CROPNAME, PRICEPERTON, MARKETDATE AS MARKETDATE FROM MARKETPRICES", conn)
conn.close()

def get_season(month):
    if month in [12, 1, 2]:
        return 'WINTER'
    elif month in [3, 4, 5]:
        return 'SPRING'
    elif month in [6, 7, 8]:
        return 'SUMMER'
    else:
        return 'FALL'

# Build DIMDATE from HARVESTDATE and MARKETDATE (monthly aggregation)
date_series = pd.concat([df_HARVEST['HARVESTDATE'], df_MARKET['MARKETDATE']]).dropna().unique()
dim_date_records = []
for d in date_series:
    d = pd.to_datetime(d)
    datekey = int(d.strftime("%Y%m"))
    dim_date_records.append({
        'DATEKEY': datekey,
        'MONTH': d.strftime("%B").upper(),
        'YEAR': d.year,
        'SEASON': get_season(d.month)
    })
df_DIMDATE = pd.DataFrame(dim_date_records).drop_duplicates(subset=['DATEKEY'])

# Build DIMFARM
df_DIMFARM = df_FARMS.copy()
# Columns are already uppercase from query; add FARMTYPE as default
df_DIMFARM['FARMTYPE'] = 'CONVENTIONAL'

# Build DIMCROP
df_DIMCROP = df_CROPS[['CROPID', 'CROPNAME', 'SEASON']].drop_duplicates()

# Build DIMMARKETPRICE by aggregating average PRICEPERTON for each unique DATEKEY and CROPNAME
df_MARKET['DATEKEY'] = pd.to_datetime(df_MARKET['MARKETDATE']).dt.strftime("%Y%m").astype(int)
df_DIMMARKET = df_MARKET.groupby(['DATEKEY', 'CROPNAME'], as_index=False)['PRICEPERTON'].mean()
df_DIMMARKET = pd.merge(df_DIMMARKET, df_DIMCROP[['CROPID', 'CROPNAME']], on='CROPNAME', how='left')
df_DIMMARKET['MARKETPRICEID'] = range(1, len(df_DIMMARKET) + 1)
df_DIMMARKET = df_DIMMARKET[['MARKETPRICEID', 'DATEKEY', 'CROPID', 'PRICEPERTON']]

# Build DIMHARVEST
df_DIMHARVEST = df_HARVEST.copy()

# Build FACTFARMPERFORMANCE: aggregate monthly per FARM and CROP
df_HARVEST['DATEKEY'] = pd.to_datetime(df_HARVEST['HARVESTDATE']).dt.strftime("%Y%m").astype(int)
FACT = df_HARVEST.groupby(['DATEKEY', 'FARMID', 'CROPID'])['QUANTITYTONS'].sum().reset_index()
MARKET_PRICE_AVG = df_DIMMARKET.groupby(['DATEKEY', 'CROPID'])['PRICEPERTON'].mean().reset_index()
FACT = pd.merge(FACT, MARKET_PRICE_AVG, on=['DATEKEY', 'CROPID'], how='left')
FACT['TOTALREVENUE'] = FACT['QUANTITYTONS'] * FACT['PRICEPERTON']
FACT['TOTALYIELD'] = FACT['QUANTITYTONS']
df_FACT = FACT[['DATEKEY', 'FARMID', 'CROPID', 'TOTALREVENUE', 'TOTALYIELD', 'PRICEPERTON', 'QUANTITYTONS']]
df_FACT = df_FACT.rename(columns={'PRICEPERTON': 'MARKETPRICE'})

# Connect to the DWH Oracle database to load star schema tables
dsn = cx_Oracle.makedsn('localhost', '1521', service_name='orcl')
dwh_conn = cx_Oracle.connect(user='c##dwh', password='1234', dsn=dsn)
dwh_cursor = dwh_conn.cursor()

def load_dataframe_to_oracle(df, table_name):
    data = [tuple(x) for x in df.to_numpy()]
    cols = ', '.join(df.columns)
    placeholders = ', '.join([f":{i+1}" for i in range(len(df.columns))])
    sql = f"INSERT INTO {table_name} ({cols}) VALUES ({placeholders})"
    dwh_cursor.executemany(sql, data)
    dwh_conn.commit()

# Replace NaN in numeric columns with 0
numeric_cols = ['TOTALREVENUE', 'TOTALYIELD', 'MARKETPRICE', 'QUANTITYTONS']
df_FACT[numeric_cols] = df_FACT[numeric_cols].fillna(0)

# Convert them to float explicitly
for col in numeric_cols:
    df_FACT[col] = df_FACT[col].astype(float)

load_dataframe_to_oracle(df_DIMDATE, "DIMDATE")
load_dataframe_to_oracle(df_DIMFARM, "DIMFARM")
load_dataframe_to_oracle(df_DIMCROP, "DIMCROP")
load_dataframe_to_oracle(df_DIMMARKET, "DIMMARKETPRICE")
load_dataframe_to_oracle(df_DIMHARVEST, "DIMHARVEST")
load_dataframe_to_oracle(df_FACT, "FACTFARMPERFORMANCE")

dwh_cursor.close()
dwh_conn.close()

  df_FARMS = pd.read_sql("SELECT FARMID, NAME AS FARMNAME, LOCATION AS FARMLOCATION, SIZE_ACRES AS FARMSIZE FROM FARMS", conn)
  df_CROPS = pd.read_sql("SELECT CROPID, NAME AS CROPNAME, FARMID, SEASON, YIELD_TONS AS YIELDTONS FROM CROPS", conn)
  df_HARVEST = pd.read_sql("SELECT HARVESTID, CROPID, FARMID, QUANTITY_TONS AS QUANTITYTONS, HARVESTDATE FROM HARVEST", conn)
  df_MARKET = pd.read_sql("SELECT MARKETID, CROPNAME, PRICEPERTON, MARKETDATE AS MARKETDATE FROM MARKETPRICES", conn)
