In [None]:
import pandas as pd
import fsspec

In [None]:
root_dir = 'abfss://lakebeehaven@beehaven.dfs.core.windows.net/'
source = root_dir+'silver/processing/'

silver_path = root_dir + 'silver/'

gold_processing = root_dir+'gold/processing/'


# You will find your account name and key here:
# datalake storage account -> Security+Networking -> Access keys 
storage_options = {
    "account_name": "beehaven",
    "account_key": "aKey"
}

# Read the csv files from Silver/processing

In [None]:
# Filesystem with storage options
fs = fsspec.filesystem('abfss', account_name='beehaven')

# Use full path inside the container
csv_files = fs.glob('abfss://lakebeehaven@beehaven.dfs.core.windows.net/silver/processing/*.csv')

In [None]:
csv_files[0].split('/')[-1]#.split('.')[0].split('_')[-1]

In [None]:
# Filesystem with storage options
fs = fsspec.filesystem('abfss', account_name='beehaven')

# Use full path inside the container
csv_files = fs.glob('abfss://lakebeehaven@beehaven.dfs.core.windows.net/silver/processing/*.csv')

# Store the locations in a list
locations = []
for name in csv_files:
    locations.append(name.split('/')[-1].split('.')[0].split('_')[1])


# Read each CSV into a dictionary using the filename as the key
dfs = {}
for file in csv_files:

    # Extract the name without folder
    name = file.split('/')[-1]
    dfs[name.split('.')[0]] = pd.read_csv(source + name)
    #dfs[name.split('_')[0]] = pd.read_csv(source + name)

# Automatically create variables which will store the dfs from dictionary keys
for name, df in dfs.items():
    globals()[name] = df


# Create functions to clean the csvs and write them as parquete files to Silver

## Flow

In [None]:
def clean_flow(flow, place):

    # Convert the timestamp to dtype data type
    flow['timestamp'] = pd.to_datetime(flow['timestamp'])

    # looks like all counts in one direction are listed ordered by timestamp
    # then all counts in the other direction are listed ordered by timestamp
    # split the flow into two sets
    departures = flow.iloc[:flow.shape[0]//2].copy()
    arrivels = flow.iloc[flow.shape[0]//2:].copy()

    # because local time changes with season, convert to UTC
    departures['timestamp'] = departures['timestamp'].dt.tz_localize('Europe/Berlin', ambiguous='infer').dt.tz_convert('UTC')
    arrivels['timestamp'] = arrivels['timestamp'].dt.tz_localize('Europe/Berlin', ambiguous='infer').dt.tz_convert('UTC')

    # Reasambly. Merge flow figures back into one dataframe
    flow_cl = departures.merge(arrivels, on='timestamp', suffixes=('_out', '_in'))

    # write the data
    flow_cl.to_parquet(f'{silver_path}/flow/{place}__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)
    flow_cl.to_parquet(f'{gold_processing}{place}_flow__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)

    return flow_cl

## Humidity

In [None]:
def clean_humidity(humidity, place):

    # Convert the timestamp to dtype data type
    humidity['timestamp'] = pd.to_datetime(humidity['timestamp'])

    # humidity values cannot be negative
    humidity['humidity'] = humidity['humidity'].abs()

    # convert to UTC
    # any ambiguous times ARE in daylight saving (summer time)
    humidity['timestamp'] = humidity['timestamp'].dt.tz_localize("Europe/Berlin", ambiguous=True).dt.tz_convert("UTC")

    # drop null values
    #humidity = humidity.dropna(subset="humidity")

    # drop duplicates
    humidity = humidity.drop_duplicates().copy()

    # write the data
    humidity.to_parquet(f'{silver_path}/humidity/{place}__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)
    humidity.to_parquet(f'{gold_processing}{place}_humidity__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)

    return humidity

## Temperature

In [None]:
def clean_temperature(temperature, place):

    # Convert the timestamp to dtype data type
    temperature['timestamp'] = pd.to_datetime(temperature['timestamp'])

    # convert to UTC
    temperature['timestamp'] = temperature['timestamp'].dt.tz_localize("Europe/Berlin", ambiguous="infer").dt.tz_convert("UTC")

    # drop null values
    #temperature = temperature.dropna(subset="temperature")

    # drop duplicates
    temperature = temperature.drop_duplicates().copy()

    # write the data
    temperature.to_parquet(f'{silver_path}/temperature/{place}__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)
    temperature.to_parquet(f'{gold_processing}{place}_temperature__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)

    return temperature

## Weight

In [None]:
def clean_weight(weight, place):

    # Convert the timestamp to dtype data type
    weight['timestamp'] = pd.to_datetime(weight['timestamp'])

    # weight values cannot be negative
    weight['weight'] = weight['weight'].abs()

    # convert to UTC
    # any ambiguous times ARE in daylight saving (summer time)
    weight['timestamp'] = weight['timestamp'].dt.tz_localize("Europe/Berlin", ambiguous=True).dt.tz_convert("UTC")

    # drop null values
    #weight = weight.dropna(subset="weight")

    # drop duplicates
    weight = weight.drop_duplicates().copy()

    # write the data
    weight.to_parquet(f'{silver_path}/weight/{place}__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)
    weight.to_parquet(f'{gold_processing}{place}_weight__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)

    return weight

## Weather

In [None]:
def get_weater_data(place):

    import requests
    import json


    # get the timestamps from the dfs dictionary
    timestamps = [df["timestamp"] for df in dfs.values() if "timestamp" in df.columns]

    # convert them to UTC
    timestamps = [
    ts.dt.tz_convert('UTC') if ts.dt.tz is not None else ts.dt.tz_localize('UTC')
    for ts in timestamps]

    start_date = min(ts.min() for ts in timestamps)
    end_date = max(ts.max() for ts in timestamps)


    raw_sink = root_dir+"bronze/archive/weather/"

    coords = {
        "schwartau": {"lat": 53.919444, "lon": 10.6975},
        "wurzburg": {"lat": 49.783333, "lon": 9.933333}
    }
    measures_to_ignore = ["timestamp", "source_id", "condition", "precipitation_probability",
                        "precipitation_probability_6h", "icon", "fallback_source_ids"]

    url = "https://api.brightsky.dev/weather"
    headers = {"Accept": "application/json"}
    params = coords[place] | {"date": start_date, "last_date": end_date}

    response = requests.get(url, headers=headers, params=params)

    if response.status_code == 200:
        weather = response.json()

        with fsspec.open(
        raw_sink + f"{place}__{start_date.strftime('%Y-%m-%d')}-{end_date.strftime('%Y-%m-%d')}.json",
        "w",
        **storage_options
    ) as f:
            json.dump(weather, f, indent=4)



        # with open (raw_sink+f"schwartau__{start_date.strftime('%Y-%m-%d')}-{end_date.strftime('%Y-%m-%d')}.json", "w") as f:
        #     json.dump(weather, f, indent=4)

        sources_df = pd.DataFrame(weather["sources"])
        for time in weather["weather"]:
            temp_dict = {}
            for measure in time.keys():
                if measure in measures_to_ignore:
                    continue
                if measure in time.get("fallback_source_ids",{}):
                    temp_dict[f"{measure}_source_distance"] = sources_df.loc[sources_df["id"] == time.get("fallback_source_ids",{}).get(measure), "distance"].values[0]
                else:
                    temp_dict[f"{measure}_source_distance"] = sources_df.loc[sources_df["id"] == time["source_id"], "distance"].values[0]
            time.update(temp_dict)

        weather_df = (
            pd.DataFrame(weather["weather"])
            .drop(["source_id", "visibility", "condition", "icon", "precipitation_probability", "precipitation_probability_6h", "fallback_source_ids"], axis=1)
        )
        weather_df["timestamp"] = pd.to_datetime(weather_df["timestamp"])
    else:
        print(f"Failure to gather weather data for period {start_date} to {end_date}.\nReason: {response.text}")


    # write the data
    processed_sink = root_dir+'silver/weather/'
    weather_df.to_parquet(f'{processed_sink}{place}__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)
    weather_df.to_parquet(f'{gold_processing}{place}_weather__{pd.Timestamp.now().strftime("%Y-%m-%dT%Hh%Mm%Ss.%f")}.parquet', storage_options=storage_options, index=False)

    return weather_df

# Clean and Write the files

In [None]:
if "schwartau" in locations:
    clean_flow(flow_schwartau, "schwartau")
    clean_humidity(humidity_schwartau, "schwartau")
    clean_temperature(temperature_schwartau, "schwartau")
    clean_weight(weight_schwartau, "schwartau")
    get_weater_data("schwartau")
else:
    print("Warning: 'schwartau' is missing in locations.")

if "wurzburg" in locations:
    clean_flow(flow_wurzburg, "wurzburg")
    clean_humidity(humidity_wurzburg, "wurzburg")
    clean_temperature(temperature_wurzburg, "wurzburg")
    clean_weight(weight_wurzburg, "wurzburg")
    get_weater_data("wurzburg")
else:
    print("Warning: 'wurzburg' is missing in locations.")


# Release Spark pool

In [None]:
mssparkutils.session.stop()