In [135]:
# Main notebook for the project
import requests
from bs4 import BeautifulSoup
import os
import re
import pandas as pd
from urllib.parse import urljoin
import shutil

def create_folder_if_not_exists(folder_name):
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)

def download_file(url, folder):
    file_name = url.split('/')[-1]
    file_path = os.path.join(folder, file_name)
    response = requests.get(url)
    if response.status_code == 200:
        with open(file_path, 'wb') as file:
            file.write(response.content)
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")        

def find_data_start_row(file_path):
    with open(file_path, 'r') as file:
        for i, line in enumerate(file):
            if line.startswith("   "):
                return i
    return 0

def extract_links(url, href_contains):
    urls = []
    try:
        response = requests.get(url)
        if response.ok:
            soup = BeautifulSoup(response.text, 'html.parser')
            links = soup.find_all('a', href=True)
            for link in links:
                if href_contains in link['href']:
                    full_url = urljoin(url, link['href'])  # Correctly use urljoin here
                    urls.append(full_url)
        else:
            print(f"Error accessing page: Status code {response.status_code}")
    except requests.RequestException as e:
        print(f"Error during requests to {url} : {str(e)}")
    
    return urls

def download_main_dataset(url, output_folder, pattern):
    csv_links = extract_links(url, pattern)
    create_folder_if_not_exists(output_folder)
    for link in csv_links:
        download_file(link, output_folder)
        
def txt_to_csv(file_path, output_folder):    #Dataset 1
    start_row = find_data_start_row(file_path)
    df = pd.read_csv(file_path, skiprows=start_row, delim_whitespace=True, usecols=[0, 1, 2, 3, 4, 5, 6])
    df.reset_index(drop=True, inplace=True)
    file_name = os.path.basename(file_path)
    output_file = os.path.join(output_folder, file_name.replace('.txt', '.csv'))
    df.to_csv(output_file, index=False)
    return output_folder

def convert_txts_to_csvs(input_folder, output_folder):    #Dataset 1
    create_folder_if_not_exists(output_folder)
    for file in os.listdir(input_folder):
        file_path = os.path.join(input_folder, file)
        if file_path.endswith('.txt'):
            txt_to_csv(file_path, output_folder)

def clean_and_rename_csv_in_folder(input_folder, output_folder):    #Dataset 1
    create_folder_if_not_exists(output_folder)
    for file_name in os.listdir(input_folder):
        file_path = os.path.join(input_folder, file_name)
        if file_path.endswith('.csv'):
            df = pd.read_csv(file_path)
            df = df.drop(0).reset_index(drop=True)
            df.columns = ['yyyy', 'mm', 'tmax (degC)', 'tmin (degC)', 'af (days)', 'rain (mm)', 'sun (hours)']
            df.replace('---', pd.NA, inplace=True)
            output_file_path = os.path.join(output_folder, file_name)
            df.to_csv(output_file_path, index=False)

def process_happiness_excel(file_path, sheet_name, word):
    # Load the CSV data, ignoring the first column which is just an index
    data = pd.read_excel(file_path, sheet_name=sheet_name, header=None, skiprows=2)

    col_names = [
        "Codes", "Area names", "Region", "Location", "Total % 0-4", "Total % 5-6", "Total % 7-8", "Total % 9-10", 
        "Total Average rating", "Total Standard deviation", "0-4 CV", "0-4 Lower limit", "0-4 Upper limit", 
        "5-6 CV", " 5-6 Lower limit", "5-6 Upper limit", "7-8 CV", "7-8 Lower limit", "7-8 Upper limit", 
        "9-10 CV", "9-10 Lower limit", "9-10 Upper limit", "Total CV", "Total Lower limit", "Total Upper limit", 
        "Sample size"
    ]
    for i in range(len(data)):
        if word in str(data.iloc[i, 0]):
            data.columns = data.iloc[i]
            data = data.iloc[i+1:]
            break
    data = data.dropna(how="all")
    data.reset_index(drop=True, inplace=True)
    data.columns = data.columns.str.strip()
    if "2012" in file_path:
        new_columns = col_names
    else:
        del col_names[9]
        new_columns = col_names
    data.columns = new_columns[:len(data.columns)]

    return data

def process__excel_folder(input_folder, output_folder, sheet_name, word):
    create_folder_if_not_exists(output_folder)
    for filename in os.listdir(input_folder):
        if filename.endswith(".xls"):
            file_path = os.path.join(input_folder, filename)
            updated_filename = filename[:-3] + "xlsx"
            updated_file_path = os.path.join(input_folder, updated_filename)
            shutil.copy(file_path, updated_file_path)
            updated_file_path = os.path.join(input_folder, updated_filename)
            processed_data = process_happiness_excel(updated_file_path, sheet_name, word)
            output_file_path = os.path.join(output_folder, filename.replace(".xls", ".csv"))
            processed_data.to_csv(output_file_path, index=False)
            print(f"Processed and saved: {filename}")

Dataset 1 Download and Schema Enforcement

In [41]:
# Download and convert the files for dataset 1
output_folder_ds1_txt = '../data/ds1/stations_txt'
output_folder_ds1_csv = '../data/ds1/stations_csv'
output_folder_ds1_csv_clean = '../data/ds1/stations_csv_clean'
station_url = 'https://www.metoffice.gov.uk/research/climate/maps-and-data/historic-station-data'
pattern_station = '.txt'

download_main_dataset(station_url, output_folder_ds1_txt, pattern_station)
convert_txts_to_csvs(output_folder_ds1_txt, output_folder_ds1_csv)
clean_and_rename_csv_in_folder(output_folder_ds1_csv, output_folder_ds1_csv_clean)

Dataset 2 Download and Schema Enforcement

In [137]:
# Download and convert the files for dataset 2
output_folder_ds2_xlsx = '../data/ds2/census_xlsx'
output_folder_ds2_csv = '../data/ds2/census_csv'
output_folder_ds2_csv_clean = '../data/ds2/census_csv_clean'
census_url = "https://www.ons.gov.uk/peoplepopulationandcommunity/wellbeing/datasets/personalwellbeingestimatesgeographicalbreakdown"
pattern_census = 'tcm'

download_main_dataset(census_url, output_folder_ds2_xlsx, pattern_census)
process__excel_folder(output_folder_ds2_xlsx, output_folder_ds2_csv, 'Happiness', 'Codes')

Processed and saved: 20112012referencetabletcm77332122.xls
Processed and saved: 20122013referencetable_tcm77-332116.xls
Processed and saved: geographicbreakdownreferencetable_tcm77-417203.xls
Processed and saved: referencetable1geographicalbreakdown_tcm77-378058.xls


In [131]:
process__excel_folder(output_folder_ds2_xlsx, output_folder_ds2_csv, 'Happiness', 'Codes')

In [114]:
def process_happiness_excel(file_path, sheet_name, word):
    # Load the CSV data, ignoring the first column which is just an index
    data = pd.read_excel(file_path, sheet_name=sheet_name, header=None, skiprows=2)

    col_names = [
        "Codes", "Area names", "Region", "Location", "Total % 0-4", "Total % 5-6", "Total % 7-8", "Total % 9-10", 
        "Total Average rating", "Total Standard deviation", "0-4 CV", "0-4 Lower limit", "0-4 Upper limit", 
        "5-6 CV", " 5-6 Lower limit", "5-6 Upper limit", "7-8 CV", "7-8 Lower limit", "7-8 Upper limit", 
        "9-10 CV", "9-10 Lower limit", "9-10 Upper limit", "Total CV", "Total Lower limit", "Total Upper limit", 
        "Sample size"
    ]

    # Find the first row where the first cell contains the word and use it as the header
    for i in range(len(data)):
        if word in str(data.iloc[i, 0]):
            data.columns = data.iloc[i]
            data = data.iloc[i+1:]
            break

    # Drop any rows that are completely NaN
    data = data.dropna(how="all")

    # Reset the index after dropping rows
    data.reset_index(drop=True, inplace=True)

    # Remove empty spaces from column names
    data.columns = data.columns.str.strip()


    if "2012" in file_path:
        new_columns = col_names
    else:
        del col_names[9]
        new_columns = col_names

    # Assign the new column names to the dataframe
    data.columns = new_columns[:len(data.columns)]

    return data


In [109]:
c = [
        "Codes", "Area names", "Region", "Location", "0-4", "5-6", "7-8", "9-10", 
        "Average rating", "Standard deviation", "CV", "Lower limit", "Upper limit", 
        "CV", "Lower limit", "Upper limit", "CV", "Lower limit", "Upper limit", 
        "CV", "Lower limit", "Upper limit", "CV", "Lower limit", "Upper limit", 
        "Sample size"
    ]


In [111]:
c

['Codes',
 'Area names',
 'Region',
 'Location',
 '0-4',
 '5-6',
 '7-8',
 '9-10',
 'Average rating',
 'CV',
 'Lower limit',
 'Upper limit',
 'CV',
 'Lower limit',
 'Upper limit',
 'CV',
 'Lower limit',
 'Upper limit',
 'CV',
 'Lower limit',
 'Upper limit',
 'CV',
 'Lower limit',
 'Upper limit',
 'Sample size']

In [110]:
del c[9]