In [0]:
%sql USE mpy.raster



#Converting .TIF to CSV

In [0]:
import os
import rasterio
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("RasterProcessing").getOrCreate()

# Define the directory containing the .tif files
tif_dir = "/Volumes/mpy/raster/tif_files/FC v mean/"
tif_files = [os.path.join(tif_dir, f) for f in os.listdir(tif_dir) if f.endswith('.tif')]

# Read the raster files
rasters = [rasterio.open(tif_file) for tif_file in tif_files]
raster_names = [os.path.splitext(os.path.basename(f))[0] for f in tif_files]  # for column names

# Read coordinates from Spark and collect to Pandas
coords = spark.read.table("mpy.raster.location_coordinates")
coords_pd = coords.select("Longitude", "Latitude", "Location", "LocationID", "Region").toPandas()

# Extract raster values for each coordinate
extracted_values = []
for _, row in coords_pd.iterrows():
    x, y = row["Longitude"], row["Latitude"]
    values = [r.sample([(x, y)]).__next__()[0] for r in rasters]  # extract value from generator
    extracted_values.append(values)

# Combine extracted values with original data
values_df = pd.DataFrame(extracted_values, columns=raster_names)
final_pd_df = pd.concat([coords_pd, values_df], axis=1)

# Convert back to Spark DataFrame
spark_df = spark.createDataFrame(final_pd_df)

# Save the final DataFrame as a CSV
spark_df.write.mode('overwrite').saveAsTable('mpy.raster.fcmean_bronze') 

# Show in notebook (if supported)
spark_df.show()



In [0]:

# Define the directory containing the .tif files
tif_dir = "/Volumes/mpy/raster/tif_files/FC sd/"
tif_files = [os.path.join(tif_dir, f) for f in os.listdir(tif_dir) if f.endswith('.tif')]

# Read the raster files
rasters = [rasterio.open(tif_file) for tif_file in tif_files]
raster_names = [os.path.splitext(os.path.basename(f))[0] for f in tif_files]  # for column names

# Read coordinates from Spark and collect to Pandas
coords = spark.read.table("mpy.raster.location_coordinates")
coords_pd = coords.select("Longitude", "Latitude", "Location", "LocationID", "Region").toPandas()

# Extract raster values for each coordinate
extracted_values = []
for _, row in coords_pd.iterrows():
    x, y = row["Longitude"], row["Latitude"]
    values = [r.sample([(x, y)]).__next__()[0] for r in rasters]  # extract value from generator
    extracted_values.append(values)

# Combine extracted values with original data
values_df = pd.DataFrame(extracted_values, columns=raster_names)
final_pd_df = pd.concat([coords_pd, values_df], axis=1)

# Convert back to Spark DataFrame
spark_df = spark.createDataFrame(final_pd_df)

# Save the final DataFrame as a CSV
spark_df.write.mode('overwrite').saveAsTable('mpy.raster.fcsd_bronze') 

# Show in notebook (if supported)
spark_df.show()



# Reading the mean FC table

In [0]:
sdf_fc_mean = spark.read.table('fcmean_bronze')
display(sdf_fc_mean)



In [0]:
var = ['x','y','Location','LocationID','Region']
value_var = list(set(sdf_fc_mean.columns).difference(var))
sdf_fc_mean_pd = sdf_fc_mean.toPandas()
sdf_fc_mean_pd = pd.melt(
    sdf_fc_mean_pd,
    id_vars=var,
    value_vars=value_var,
    var_name='Week',
    value_name='FC_mean'
)



In [0]:
display(sdf_fc_mean_pd)



In [0]:
sdf_fc_mean_pd['Week'] = sdf_fc_mean_pd['Week'].str[-2:]
sdf_fc_mean_pd = sdf_fc_mean_pd.sort_values(['LocationID','Week'])
display(sdf_fc_mean_pd)



In [0]:
sdf_fc_mean = spark.createDataFrame(sdf_fc_mean_pd)
sdf_fc_mean.write.mode('overwrite').saveAsTable('fc_mean_silver') 



#Reading the sd FC table

In [0]:
sdf_fc_sd = spark.read.table('fcsd_bronze')
display(sdf_fc_sd)



In [0]:
var = ['Longitude','Latitude','Location','LocationID','Region']
value_var = list(set(sdf_fc_sd.columns).difference(var))
sdf_fc_sd_pd = sdf_fc_sd.toPandas()
sdf_fc_sd_pd = pd.melt(
    sdf_fc_sd_pd,
    id_vars=var,
    value_vars=value_var,
    var_name='Week',
    value_name='FC_sd'
)



In [0]:
display(sdf_fc_sd_pd)



In [0]:
sdf_fc_sd_pd['Week'] = sdf_fc_sd_pd['Week'].str[-2:]
sdf_fc_sd_pd = sdf_fc_sd_pd.rename(columns={'Latitude': 'y', 'Longitude': 'x'}).sort_values(['LocationID','Week'])
display(sdf_fc_sd_pd)



In [0]:
sdf_fc_sd = spark.createDataFrame(sdf_fc_sd_pd)
sdf_fc_sd.write.mode('overwrite').saveAsTable('fc_sd_silver')



# Merging both mean and sd FC tables

In [0]:
df_fc_mean_sd = pd.merge(sdf_fc_mean_pd,sdf_fc_sd_pd,on=['x','y','Location','LocationID','Region','Week'])
display(df_fc_mean_sd)



In [0]:
sdf_fc_mean_sd = spark.createDataFrame(df_fc_mean_sd)
sdf_fc_mean_sd.write.mode('overwrite').saveAsTable('fc_mean_sd_gold')



# Visualisation

In [0]:
df = pd.read_csv("C://Users/UmeshU/OneDrive - AkzoNobel/Documents/C Squirt Raft Data/FC_MEAN_SD.csv")

In [None]:
df = df.dropna()

In [None]:
data = df
# Create the initial figure
fig = go.Figure()

# Add a scattermapbox trace for each unique week
weeks = data['Week'].unique()
for week in weeks:
    week_data = data[data['Week'] == week]
    fig.add_trace(go.Scattermapbox(
        lat=week_data['Latitude'],
        lon=week_data['Longitude'],
        mode='markers',
        marker=go.scattermapbox.Marker(
            size=week_data['Mean_FC'] / 2,  # Scale size for better visibility
            color=week_data['Mean_FC'],
            colorscale='Viridis',
            showscale=True,
            colorbar=dict(title="Mean FC")
        ),
        text=week_data['Location'] + "<br>Mean FC: " + week_data['Mean_FC'].astype(str) + "<br>SD FC: " + week_data['SD_FC'].astype(str),
        hoverinfo='text',
        name=f"Week {week}",
        visible=(week == '01')  # Show only week 01 initially
    ))

# Configure layout for the map
fig.update_layout(
    mapbox=dict(
        style="carto-positron",
        zoom=1,
        center={"lat": data["Latitude"].mean(), "lon": data["Longitude"].mean()},
    ),
    title="Weekly Mean FC Across Locations",
    margin={"r":0,"t":0,"l":0,"b":0},
)

# Create dropdown menu for each week
dropdown_buttons = [
    dict(
        label=f"Week {week}",
        method="update",
        args=[{"visible": [week == w for w in weeks]}],  # Set visibility based on the selected week
    )
    for week in weeks
]

# Add the dropdown menu to the layout
fig.update_layout(
    updatemenus=[dict(
        buttons=dropdown_buttons,
        direction="down",
        showactive=True,
        x=0.1,
        xanchor="left",
        y=1.15,
        yanchor="top"
    )]
)

# Save as HTML
fig.write_html("weekly_mean_fc_map_with_dropdown.html")

# Display in Jupyter Notebook (optional)
fig.show()
