In [98]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
from streamlit_idealista.config import  INPUT_DATA_PATH, INPUT_OPERATION_TYPES_PATH, INPUT_TYPOLOGY_TYPES_PATH, INPUT_OPERATION_TYPES_PATH, INPUT_SUPERILLES_INTERVENTIONS_GEOJSON, INPUT_DTYPES_COUPLED_JSON_PATH, INPUT_INE_CENSUSTRACT_GEOJSON 

# import streamlit as st
# import folium as folium
# from folium.plugins import Draw
# from streamlit_folium import st_folium
# from pathlib import Path
# from shapely.geometry import GeometryCollection, shape
# from shapely.ops import transform
from typing import Union,Optional,List
# import numpy as np
import pandas as pd
from pathlib import Path
import json
import geopandas as gpd
import shapely
# from prophet import Prophet
import plotly.graph_objects as go
# from plotly.subplots import make_subplots
# from pyproj import Transformer
# from PIL import Image

In [None]:
# load data
def load_dtypes(dtypes_path: Path) -> dict:
    with open(dtypes_path, 'r') as f:
        return json.load(f)
dtypes_coupled_dict = load_dtypes(INPUT_DTYPES_COUPLED_JSON_PATH)

def load_main_data(main_data_path: Path) -> pd.DataFrame:
    return pd.read_csv( main_data_path, sep = ';', dtype=dtypes_coupled_dict)
df = load_main_data(INPUT_DATA_PATH)

def load_censustract_geojson(censustract_geojson_path: Path) -> gpd.GeoDataFrame:
    gdf_ine = gpd.read_file(censustract_geojson_path)
    gdf_ine['CENSUSTRACT'] = gdf_ine['CENSUSTRACT'].astype(int).astype(str)
    return gdf_ine
gdf_ine = load_censustract_geojson(INPUT_INE_CENSUSTRACT_GEOJSON)

def load_operation_types(operation_types_path: Path) -> pd.DataFrame:
    return pd.read_csv(operation_types_path, sep=";", dtype=dtypes_coupled_dict)
operation_types_df = load_operation_types(INPUT_OPERATION_TYPES_PATH)

def load_typology_types(typology_types_path: Path) -> pd.DataFrame:
    return pd.read_csv( typology_types_path, sep=";", dtype=dtypes_coupled_dict)
typology_types_df = load_typology_types(INPUT_TYPOLOGY_TYPES_PATH)

def load_interventions(interventions_path: Path) -> gpd.GeoDataFrame:
    return gpd.read_file(interventions_path)
interventions_gdf =  load_interventions(INPUT_SUPERILLES_INTERVENTIONS_GEOJSON)

def process_df(df: pd.DataFrame) -> pd.DataFrame:
    copy_df = df.copy(deep=True)
    return (
        copy_df
        .astype({'ADOPERATIONID': 'int',
                'ADTYPOLOGYID': 'int'
                })
        .join(operation_types_df.set_index('ID'), on='ADOPERATIONID', how="left", validate="m:1")
        .rename(columns={
            'SHORTNAME': 'ADOPERATION',
                        }
                )
        .astype({'ADOPERATION': 'category',
                'ADOPERATIONID': 'category'
                })
        .drop(columns=("DESCRIPTION"))
        .join(typology_types_df.set_index('ID'), on='ADTYPOLOGYID', how="left", validate="m:1")
        .rename(columns={
            'SHORTNAME': 'ADTYPOLOGY',
                        }
                )
        .astype({'ADTYPOLOGY': 'category',
                'ADTYPOLOGYID': 'category'
                })
        .drop(columns=("DESCRIPTION"))
    )
processed_df = process_df(df)

In [None]:
def get_timeseries_of_census_tracts(df: pd.DataFrame, 
                                    censustract_list: Optional[List[str]] = None, 
                                    operation: str = "mean") -> Optional[pd.DataFrame]:
  """
  Get the timeseries of prices (rent, sale) for the given census tracts.
  If more than one census tract, the mean or other specified operation is taken.

  Args:
    df (pd.DataFrame): The dataframe containing the data.
    censustract_list (Optional[List[str]]): The list of census tracts to filter.
    operation (str): Aggregation operation (mean, median).

  Returns:
    Optional[pd.DataFrame]: The timeseries for the given census tracts,
      applying the specified aggregation operation.
      If censustract_list is None, returns None.
  """
  if censustract_list is None:
      return None

  # Check if the operation is valid
  if operation not in ["mean", "median"]:
      raise ValueError("Operation must be 'mean' or 'median'")

  # Filter the dataframe for the given census tracts
  filtered_df = df[df["CENSUSTRACT"].isin(censustract_list)]

  # Define the aggregation methods based on the requested statistics
  # Group by 'PERIOD' and 'ADOPERATION', then apply the aggregation methods
  aggregated_df = (
      filtered_df
      .groupby(["PERIOD", "ADOPERATION"], observed=False)
      .agg({"UNITPRICE_ASKING": operation})
      .reset_index()  # Reset index to flatten the dataframe
      .pivot(index="PERIOD", columns="ADOPERATION", values="UNITPRICE_ASKING")  # Pivot on 'ADOPERATION'
  )

  #print("Resultado dentro de get_timeseries_of_census_tracts:", aggregated_df)

  return aggregated_df

def get_impacted_censustracts(geometries: Union[shapely.geometry.GeometryCollection, None],
                              ine_gdf: gpd.GeoDataFrame
                               ) -> Optional[List[str]]:
  """
  Get the impacted censustracts.

  Args:
    geometries (shapely.geometry.GeometryCollection): The areas to check.
    ine_gdf (gpd.GeoDataFrame): Geopandas with INE information about
    censustracts and their polygons.

  Returns:
    Optional[List[str]]: The impacted censustracts.
  """
  if geometries is None:
    return None

  mask = ine_gdf['geometry'].intersects(geometries)

  return ine_gdf[mask]['CENSUSTRACT'].unique().tolist()

def filter_data_per_district(df: pd.DataFrame, gdf: gpd.GeoDataFrame) -> pd.DataFrame:

  """
  Filter idealista df to the censustracts of the districts contained in the given geopandas
  (e.g. for interventions_gdf)

  Args:
    df (pd.DataFrame): The dataframe containing idealista data.
    gdf (gpd.GeoDataFrame): GeoPandas with geometry 

  Returns:
    pd.DataFrame: filter dataframe 
  """

  df['district'] = df['CENSUSTRACT'].astype(str).str[4:6]
  df['munucipality'] = df['CENSUSTRACT'].astype(str).str[0:4]

  c1 = df.district.isin(gdf.DISTRITO)
  c2 = df.munucipality.isin(gdf['PROVMUN'].astype(int).astype(str))
  df_district = df[c1 & c2]

  return df_district


def get_censustracts_by_intervention_name(name):
  mask = interventions_gdf['TITOL_WO'] == name
  return list(interventions_gdf[mask]['CENSUSTRACT'].astype(int).astype(str))

In [None]:
interventions_gdf = interventions_gdf.to_crs("EPSG:4326")
gdf_ine = gdf_ine.to_crs("EPSG:4326")

In [None]:
options=list(interventions_gdf["TITOL_WO"].unique())
geometry_selection = options[6:7]
print(geometry_selection)

In [None]:
filtered_interventions_gdf = interventions_gdf[interventions_gdf["TITOL_WO"].isin(geometry_selection)]
impacted_censustracts = get_impacted_censustracts(filtered_interventions_gdf["geometry"].union_all(), gdf_ine)

mask = gdf_ine['geometry'].intersects(filtered_interventions_gdf["geometry"].union_all())
impacted_censustracts_gdf = gdf_ine[mask].copy()

In [None]:
impacted_censustracts_gdf['md'] = impacted_censustracts_gdf['CENSUSTRACT'].astype(str).str[0:6]
gdf_ine['md'] = gdf_ine['CENSUSTRACT'].astype(str).str[0:6]

district_gdf = gdf_ine[gdf_ine['md'].isin(impacted_censustracts_gdf['md'])]

In [None]:
m = impacted_censustracts_gdf.explore(color = 'red')
filtered_interventions_gdf.explore(m = m)
district_gdf.explore(m= m, color='grey')

In [None]:
census_intervention = list(filtered_interventions_gdf['CENSUSTRACT'].astype(int).astype(str))
census_impacted = list(impacted_censustracts_gdf['CENSUSTRACT'].astype(int).astype(str))
census_district = list(district_gdf['CENSUSTRACT'].astype(int).astype(str))

In [None]:
from plotly.subplots import make_subplots

In [None]:
fig = make_subplots(specs=[[{"secondary_y": True}]])

df_intervention_ts = get_timeseries_of_census_tracts(processed_df, census_intervention)
fig.add_trace(
            go.Scatter(x=df_intervention_ts["sale"].index, y=df_intervention_ts["sale"].values),
            secondary_y=False,
        )

df_imp_ts = get_timeseries_of_census_tracts(processed_df, census_impacted)
fig.add_trace(
            go.Scatter(x=df_imp_ts["sale"].index, y=df_imp_ts["sale"].values),
            secondary_y=False,
        )

df_district_ts = get_timeseries_of_census_tracts(processed_df, census_district)
fig.add_trace(
            go.Scatter(x=df_district_ts["sale"].index, y=df_district_ts["sale"].values),
            secondary_y=False,
        )