## End-to-End Cloud Data Pipeline: AWS & Snowflake Architecture

Initial setup: Importing core libraries for data manipulation, cloud connectivity (Boto3, Snowflake Connector), and geospatial processing (Shapely).

In [None]:
import json
import math
import numpy as np
import pandas as pd
import snowflake.connector
import time
import boto3
from shapely.geometry import Point, Polygon
import uuid
from datetime import datetime
from decimal import Decimal
from math import radians, sin, cos, sqrt, atan2

Establishing a secure connection to Snowflake using the Python Connector. The environment uses a pre-configured role with optimized privileges for DML (Insert/Delete) and DQL (Read) operations to ensure efficient pipeline execution.

In [None]:

conn = snowflake.connector.connect (
    user='PRACT3SNOWPYTHON',
    password='XXXXXXXX',
    account='XXXXXX.XXXXXX',
    warehouse='COMPUTE_WH',
    database='UEV_MU_ADM',
    schema='MODULO_2'
)

Audit Step: Verifying active grants and role permissions to ensure compliance with security protocols during the session.

In [None]:
cursor = conn.cursor()
cursor.execute("SHOW GRANTS TO USER PRACT3SNOWPYTHON")
grants = cursor.fetchall()
print("Tus permisos actuales:")
for grant in grants:
    print(grant)

Tus permisos actuales:
(datetime.datetime(2026, 1, 10, 2, 30, 21, 665000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), 'USAGE', 'ROLE', 'ACCOUNTADMIN', 'ACCOUNTADMIN', 'USER', 'PRACT3SNOWPYTHON', 'false', '')
(datetime.datetime(2026, 1, 13, 1, 4, 14, 300000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), 'USAGE', 'ROLE', 'PRACT3_ROLE', 'PRACT3_ROLE', 'USER', 'PRACT3SNOWPYTHON', 'false', 'ACCOUNTADMIN')
(datetime.datetime(2026, 1, 10, 4, 38, 38, 342000, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>), 'USAGE', 'ROLE', 'QA_ROLE', 'QA_ROLE', 'USER', 'PRACT3SNOWPYTHON', 'false', 'ACCOUNTADMIN')


Connection Validation: Executing a limit-fetch query on an existing schema to confirm successful integration between the local environment and the Snowflake warehouse.

In [4]:
cursor.execute("SELECT * FROM C_1_0_0 LIMIT 10")

<snowflake.connector.cursor.SnowflakeCursor at 0x2b847761550>

Step 1: Extract, Transform, and Load (ETL). Filtering primary datasets to retain only critical features (ID, Latitude, Longitude, and Price). Data is converted to .parquet format to optimize storage costs and query performance in cloud environments.

In [None]:
# For Valencia
df_vlc = pd.read_csv('Valencia_data.csv', usecols=['id', 'latitude', 'longitude', 'price'])
print(f"Filas cargadas Valencia: {len(df_vlc)}")
print(df_vlc.head())

# Save the parquet
df_vlc.to_parquet('Valencia_data.parquet', index=False)
print("‚úì Archivo Valencia_data.parquet creado")

Filas cargadas Valencia: 7844
       id  latitude  longitude    price
0   48154  39.48375   -0.37502   $83.00
1  137143  39.36335   -0.31932  $390.00
2  149715  39.46746   -0.32813  $245.00
3  165971  39.46790   -0.38206  $124.00
4  182221  39.46343   -0.34325  $137.00
‚úì Archivo Valencia_data.parquet creado


In [None]:
# For Madrid
df_mad = pd.read_csv('Madrid_data.csv', usecols=['id', 'latitude', 'longitude', 'price'])
print(f"Filas cargadas Madrid: {len(df_mad)}")
print(df_mad.head())

# Save the parquet
df_mad.to_parquet('Madrid_data.parquet', index=False)
print("‚úì Archivo Madrid_data.parquet creado")

Filas cargadas Madrid: 25000
      id  latitude  longitude    price
0  21853  40.40381   -3.74130      NaN
1  30320  40.41476   -3.70418  $157.00
2  30959  40.41259   -3.70105      NaN
3  40916  40.42247   -3.70577  $143.00
4  62423  40.41884   -3.69655   $65.00
‚úì Archivo Madrid_data.parquet creado


In [None]:
# Read the parquet files and verify their correct saving
df_vlc = pd.read_parquet('Valencia_data.parquet')
print(f"Filas cargadas Valencia: {len(df_vlc)}")
print(df_vlc.head())
df_mad = pd.read_parquet('Madrid_data.parquet')
print(f"Filas cargadas Madrid: {len(df_mad)}")
print(df_mad.head())

Filas cargadas Valencia: 7844
       id  latitude  longitude    price
0   48154  39.48375   -0.37502   $83.00
1  137143  39.36335   -0.31932  $390.00
2  149715  39.46746   -0.32813  $245.00
3  165971  39.46790   -0.38206  $124.00
4  182221  39.46343   -0.34325  $137.00
Filas cargadas Madrid: 25000
      id  latitude  longitude    price
0  21853  40.40381   -3.74130     None
1  30320  40.41476   -3.70418  $157.00
2  30959  40.41259   -3.70105     None
3  40916  40.42247   -3.70577  $143.00
4  62423  40.41884   -3.69655   $65.00


------------------------------------------------------------------Step 2: Serverless Analytics with AWS.------------------------------------------------------------------------

Pre-requisites: Infrastructure provisioning including S3 partitioned buckets, Glue Crawlers for metadata discovery, and IAM policies for inter-service communication. 

Analytics: Executing SQL queries via Athena to identify high-value listings in Valencia and performing cross-city JOIN operations between Valencia and Madrid based on price parity.

"""
SELECT 

    id,

    latitude,

    longitude,

    price

FROM valencia

WHERE CAST(REPLACE(REPLACE(price, '$', ''), ',', '') AS DOUBLE) > 100

ORDER BY CAST(REPLACE(REPLACE(price, '$', ''), ',', '') AS DOUBLE) DESC;
"""

"""
SELECT 

    id,

    latitude,

    longitude,

    price

FROM valencia

ORDER BY CAST(REPLACE(REPLACE(price, '$', ''), ',', '') AS DOUBLE) DESC

LIMIT 10;
"""

"""
SELECT 

    v.id AS valencia_id,

    m.id AS madrid_id,

    v.price AS shared_price,

    v.latitude AS valencia_lat,

    v.longitude AS valencia_lon,

    m.latitude AS madrid_lat,

    m.longitude AS madrid_lon

FROM valencia v

INNER JOIN madrid m

    ON v.price = m.price

ORDER BY CAST(REPLACE(REPLACE(v.price, '$', ''), ',', '') AS DOUBLE) DESC; """

------------------------------------------------------------------Step 3: Warehouse Ingestion-------------------------------------------------------------------------------

Automating the data flow from AWS S3 to Snowflake. This phase involves configuring Storage Integrations, External Stages, and specialized File Formats (Parquet) to execute high-speed 'COPY INTO' operations.


---------------------------------------UPDATE THE STORAGE INTEGRATION----------------------------------------------------------

USE DATABASE UEV_MU_ADM;

USE SCHEMA MODULO_2;

ALTER STORAGE INTEGRATION UEV_MU_ADM_PRACT_1

  SET STORAGE_ALLOWED_LOCATIONS = (
    
    's3://uev-mu-adm-modulo-2-snow/kaggle/c_1_0_0/1_0_0/'
  );

-- Verification of the update

DESC INTEGRATION UEV_MU_ADM_PRACT_1;

---------------------------------------CREATE A STAGE----------------------------------------------------------

-- Stage that reference the Valencia partition

CREATE OR REPLACE STAGE UEV_MU_ADM.MODULO_2.VALENCIA_STAGE

  STORAGE_INTEGRATION = UEV_MU_ADM_PRACT_1

  URL = 's3://uev-mu-adm-modulo-2-snow/kaggle/c_1_0_0/1_0_0/valencia/';

-- Check the status of the parquet in the stage

LIST @UEV_MU_ADM.MODULO_2.VALENCIA_STAGE;

---------------------------------------CREATE FINAL TABLE----------------------------------------------------------

CREATE TABLE IF NOT EXISTS UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS (
  
    ID VARCHAR(100),
  
    LATITUDE NUMBER(10,6),
  
    LONGITUDE NUMBER(10,6),
  
    PRICE NUMBER(10,2),
  
    LOAD_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

---------------------------------------CREATE FILE FORMAT----------------------------------------------------------

CREATE OR REPLACE FILE FORMAT UEV_MU_ADM.MODULO_2.PARQUET_FORMAT

  TYPE = 'PARQUET';

---------------------------------------COPY DATA TO THE TABLE----------------------------------------------------------

-- loading of the parquet to the table

COPY INTO UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS (ID, LATITUDE, LONGITUDE, PRICE)

FROM (

  SELECT 

    $1:id::VARCHAR,

    $1:latitude::NUMBER(10,6),

    $1:longitude::NUMBER(10,6),

    TO_NUMBER(REPLACE(REPLACE($1:price::VARCHAR, '$', ''), ',', ''), 10, 2)

  FROM @UEV_MU_ADM.MODULO_2.VALENCIA_STAGE/FileYear=2026/FileMonth=01/FileDay=16/Valencia_data.parquet

)

FILE_FORMAT = UEV_MU_ADM.MODULO_2.PARQUET_FORMAT;

---------------------------------------VERIFY THE LOADED DATA----------------------------------------------------------

-- Check the number of rows

SELECT COUNT(*) FROM UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS;

-- Check the first 10 rows

SELECT * FROM UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS 

ORDER BY LOAD_TIMESTAMP DESC 

LIMIT 10;


------------------------------------------------------------------Step 4: Data Transformation & Task Automation-------------------------------------------------------------------------------

For this step we need to:

1. Create a Transient Table for high-performance temporary storage.

2. Develop a SQL Stored Procedure to calculate the 90th percentile of listing prices.

3. Implement a Snowflake Task (CRON-scheduled) to automate daily data refreshments and warehouse management.`


---------------------------------------CREATE A TRANSIENT TABLE----------------------------------------------------------

CREATE OR REPLACE TRANSIENT TABLE UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES (

    ID VARCHAR(100),
    
    LATITUDE NUMBER(10,6),
    
    LONGITUDE NUMBER(10,6),
    
    PRICE NUMBER(10,2),
    
    PERCENTILE_RANK NUMBER(5,2),
    
    LOAD_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

---------------------------------------CREATE A STORED PROCEDURE----------------------------------------------------------


CREATE OR REPLACE PROCEDURE UEV_MU_ADM.MODULO_2.LOAD_TOP_90_PRICES()

RETURNS STRING

LANGUAGE SQL

AS

$$

BEGIN
    
    TRUNCATE TABLE UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES;
    
    
    INSERT INTO UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES (ID, LATITUDE, LONGITUDE, PRICE, PERCENTILE_RANK)
    
    SELECT 
    
        ID,
    
        LATITUDE,
    
        LONGITUDE,
    
        PRICE,
    
        PERCENT_RANK() OVER (ORDER BY PRICE DESC) * 100 AS PERCENTILE_RANK
    
    FROM UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS
    
    QUALIFY PERCENT_RANK() OVER (ORDER BY PRICE DESC) <= 0.90;
    
    RETURN 'Top 90% de precios cargado exitosamente';

END;

$$;

---------------------------------------CREATE TASK----------------------------------------------------------


CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH

  WITH WAREHOUSE_SIZE = 'X-SMALL'

  AUTO_SUSPEND = 60

  AUTO_RESUME = TRUE;


CREATE OR REPLACE TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES
  
  WAREHOUSE = COMPUTE_WH
  
  SCHEDULE = 'USING CRON 0 2 * * * UTC'  -- Todos los d√≠as a las 2 AM UTC
  
  AS
  
  CALL UEV_MU_ADM.MODULO_2.LOAD_TOP_90_PRICES();


ALTER TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES RESUME;

---------------------------------------EXECUTE MANUALLY (ONLY THE FIRST TIME)----------------------------------------------------------

CALL UEV_MU_ADM.MODULO_2.LOAD_TOP_90_PRICES();

---------------------------------------VERIFY RESULTS----------------------------------------------------------

SELECT COUNT(*) as total_top_90 FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES;


SELECT 

    (SELECT COUNT(*) FROM UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS) as total_original,
    
    (SELECT COUNT(*) FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES) as total_top_90,
    
    ROUND((SELECT COUNT(*) FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES) * 100.0 / 
          
          (SELECT COUNT(*) FROM UEV_MU_ADM.MODULO_2.VALENCIA_LISTINGS), 2) as porcentaje;


SELECT 
    
    MIN(PRICE) as precio_minimo_top90,
    
    MAX(PRICE) as precio_maximo_top90,
    
    AVG(PRICE) as precio_promedio_top90

FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES;


SELECT * FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES

ORDER BY PRICE DESC

LIMIT 10;

---------------------------------------REVIEW THE TASK----------------------------------------------------------


SELECT *

FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(

    TASK_NAME => 'TASK_LOAD_TOP_90_PRICES',

    SCHEDULED_TIME_RANGE_START => DATEADD('day', -7, CURRENT_TIMESTAMP())

))

ORDER BY SCHEDULED_TIME DESC;


SHOW TASKS LIKE 'TASK_LOAD_TOP_90_PRICES' IN SCHEMA UEV_MU_ADM.MODULO_2;

EXECUTE TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES;

---------------------------------------PAUSE/EXECUTE/DROP THE TASK----------------------------------------------------------


ALTER TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES SUSPEND;


ALTER TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES RESUME;


DROP TASK UEV_MU_ADM.MODULO_2.TASK_LOAD_TOP_90_PRICES;


Automation Logic: Using CRON expressions (e.g., '0 * * * * UTC') to ensure the pipeline remains autonomous and data stays updated without manual intervention.

SCHEDULE = 'USING CRON 0 * * * * UTC'

------------------------------------------------------------------Step 5: Semi-Structured Data Handling (GeoJSON)-------------------------------------------------------------------------------

Processing Semi-Structured Data. Ingesting GeoJSON files into Snowflake using the VARIANT data type. This allows for schema-on-read flexibility and efficient parsing of complex JSON geometries within a relational warehouse.

In [8]:
create_table_query_variant = """
CREATE TABLE IF NOT EXISTS UEV_MU_ADM.MODULO_2.GEOJSON_WITH_VARIANT (
    ID INT AUTOINCREMENT,
    CITY VARCHAR(100) NOT NULL,
    GEOJSON_DATA VARIANT,  -- Columna VARIANT para almacenar JSON completo
    LOAD_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
"""

cursor.execute(create_table_query_variant)
print("‚úÖ Tabla GEOJSON_WITH_VARIANT creada")

# Leer el archivo GeoJSON
with open('Valencia_geojson.json', 'r') as f:
    geojson_data = json.load(f)

# Insertar el GeoJSON completo en la columna VARIANT
insert_variant_query = """
INSERT INTO UEV_MU_ADM.MODULO_2.GEOJSON_WITH_VARIANT (CITY, GEOJSON_DATA)
SELECT 'Valencia', PARSE_JSON(%s)
"""

cursor.execute(insert_variant_query, (json.dumps(geojson_data),))
conn.commit()
print("‚úÖ GeoJSON completo insertado")

# Consultar y extraer datos del VARIANT
cursor.execute("""
SELECT 
    CITY,
    GEOJSON_DATA:type::STRING as geometry_type,
    GEOJSON_DATA:features[0]:geometry:coordinates[0] as coordinates
FROM UEV_MU_ADM.MODULO_2.GEOJSON_WITH_VARIANT
""")

for row in cursor:
    print(row)

‚úÖ Tabla GEOJSON_WITH_VARIANT creada
‚úÖ GeoJSON completo insertado
('Valencia', 'FeatureCollection', '[\n  [\n    -0.37093004277016917,\n    39.47664277831467\n  ],\n  [\n    -0.3753860808744207,\n    39.479222421744\n  ],\n  [\n    -0.37888723908901056,\n    39.48075783178862\n  ],\n  [\n    -0.3840597244534081,\n    39.48011312643061\n  ],\n  [\n    -0.3871626530358583,\n    39.478454640403044\n  ],\n  [\n    -0.390226561200393,\n    39.47747202511246\n  ],\n  [\n    -0.3888334463088654,\n    39.47394023652714\n  ],\n  [\n    -0.38628724389192826,\n    39.471882574627756\n  ],\n  [\n    -0.38437754229090615,\n    39.469118454509584\n  ],\n  [\n    -0.38318395190526644,\n    39.467183522452046\n  ],\n  [\n    -0.38246796887293044,\n    39.46491052053989\n  ],\n  [\n    -0.3808367392757077,\n    39.463497607593695\n  ],\n  [\n    -0.37805174582172185,\n    39.463159684893895\n  ],\n  [\n    -0.3739935579543783,\n    39.46398905109291\n  ],\n  [\n    -0.3705321703033633,\n    39.46537

------------------------------------------------------------------Step 6: Geospatial Intelligence & Proximity Analysis-------------------------------------------------------------------------------

For the following step, we need to:

1- Implementing the Haversine formula for precise distance calculations.

2- Defining urban perimeters using Shapely Polygons.

3- Filtering listings strictly located within administrative boundaries and calculating proximity to the city center (Plaza del Ayuntamiento).

In [None]:


# ===================== Calculate the distance (Haversine) =====================
def calcular_distancia_km(lat1, lon1, lat2, lon2):
    """
    Calcula la distancia en kil√≥metros entre dos puntos usando la f√≥rmula de Haversine
    """
    R = 6371  # Radio de la Tierra en kil√≥metros
    
    lat1_rad = radians(lat1)
    lat2_rad = radians(lat2)
    delta_lat = radians(lat2 - lat1)
    delta_lon = radians(lon2 - lon1)
    
    a = sin(delta_lat/2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(delta_lon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    
    distancia = R * c
    return round(distancia, 2)

# ===================== CENTER OF VALENCIA =====================
# Coordenates (Plaza del Ayuntamiento)
CENTRO_VALENCIA_LAT = 39.4699
CENTRO_VALENCIA_LON = -0.3763

print(f"üìç Centro de Valencia: ({CENTRO_VALENCIA_LAT}, {CENTRO_VALENCIA_LON})")

# ===================== READ TRANSIENT TABLE OF VALENCIA =====================
query_valencia = """
SELECT 
    ID,
    LATITUDE,
    LONGITUDE,
    PRICE,
    PERCENTILE_RANK
FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES
"""

cursor.execute(query_valencia)
df_valencia = cursor.fetch_pandas_all()
print(f"\n‚úÖ Apartamentos cargados desde Snowflake: {len(df_valencia)}")

# ===================== READ GEOJSON DATA=====================
query_geojson = """
SELECT 
    ORDEN,
    CITY,
    LATITUDE,
    LONGITUDE
FROM UEV_MU_ADM.MODULO_2.GEOJSON
ORDER BY ORDEN
"""

cursor.execute(query_geojson)
df_geojson = cursor.fetch_pandas_all()
print(f"\n‚úÖ Puntos del pol√≠gono GeoJSON: {len(df_geojson)}")
print(df_geojson.head())

# ===================== CREATE POLYGON OF VALENCIA =====================
# Convert GeoJSON coordinates to a Shapely polygon
coordenadas_poligono = [(row['LONGITUDE'], row['LATITUDE']) for _, row in df_geojson.iterrows()]
poligono_valencia = Polygon(coordenadas_poligono)

print(f"\nüó∫Ô∏è Pol√≠gono creado con {len(coordenadas_poligono)} puntos")
print(f"√Årea del pol√≠gono: {poligono_valencia.area:.6f} grados cuadrados")

# ===================== FILTER APARTMENTS WITHIN THE AREA =====================
apartamentos_dentro = []

for _, row in df_valencia.iterrows():
    punto = Point(row['LONGITUDE'], row['LATITUDE'])
    
    if poligono_valencia.contains(punto):
        # Calculate distance to the center of Valencia
        distancia = calcular_distancia_km(
            row['LATITUDE'], 
            row['LONGITUDE'],
            CENTRO_VALENCIA_LAT,
            CENTRO_VALENCIA_LON
        )
        
        apartamentos_dentro.append({
            'ID': row['ID'],
            'LATITUDE': row['LATITUDE'],
            'LONGITUDE': row['LONGITUDE'],
            'PRICE': row['PRICE'],
            'PERCENTILE_RANK': row['PERCENTILE_RANK'],
            'DISTANCIA_AL_CENTRO_KM': distancia,
            'DENTRO_DEL_AREA': True
        })

# ===================== CREATE FINAL DATAFRAME =====================
df_apartamentos_filtrados = pd.DataFrame(apartamentos_dentro)
df_apartamentos_filtrados = df_apartamentos_filtrados.dropna(
    subset=[
        'LATITUDE',
        'LONGITUDE',
        'PRICE',
        'PERCENTILE_RANK',
        'DISTANCIA_AL_CENTRO_KM'
    ]
)

print(f"\nüè† Apartamentos dentro del √°rea de Valencia: {len(df_apartamentos_filtrados)}")
print(f"üìä Porcentaje dentro del √°rea: {len(df_apartamentos_filtrados) / len(df_valencia) * 100:.2f}%")

# ===================== STATISTICS =====================
if len(df_apartamentos_filtrados) > 0:
    print("\nüìà ESTAD√çSTICAS:")
    print(f"Distancia m√≠nima al centro: {df_apartamentos_filtrados['DISTANCIA_AL_CENTRO_KM'].min():.2f} km")
    print(f"Distancia m√°xima al centro: {df_apartamentos_filtrados['DISTANCIA_AL_CENTRO_KM'].max():.2f} km")
    print(f"Distancia promedio al centro: {df_apartamentos_filtrados['DISTANCIA_AL_CENTRO_KM'].mean():.2f} km")
    print(f"Precio promedio: ${df_apartamentos_filtrados['PRICE'].mean():.2f}")
    
    print("\nüèÜ Top 10 apartamentos m√°s cercanos al centro:")
    print(df_apartamentos_filtrados.nsmallest(10, 'DISTANCIA_AL_CENTRO_KM')[
        ['ID', 'PRICE', 'DISTANCIA_AL_CENTRO_KM', 'LATITUDE', 'LONGITUDE']
    ])
    
    print("\nüí∞ Top 10 apartamentos m√°s caros dentro del √°rea:")
    print(df_apartamentos_filtrados.nlargest(10, 'PRICE')[
        ['ID', 'PRICE', 'DISTANCIA_AL_CENTRO_KM', 'LATITUDE', 'LONGITUDE']
    ])
else:
    print("\n‚ö†Ô∏è No se encontraron apartamentos dentro del √°rea definida")


üìç Centro de Valencia: (39.4699, -0.3763)

‚úÖ Apartamentos cargados desde Snowflake: 7072

‚úÖ Puntos del pol√≠gono GeoJSON: 30
   ORDEN      CITY   LATITUDE  LONGITUDE
0      0    Madrid  40.424564  -3.711566
1      0  Valencia  39.476643  -0.370930
2      1    Madrid  40.422141  -3.714362
3      1  Valencia  39.479222  -0.375386
4      2    Madrid  40.417244  -3.712703

üó∫Ô∏è Pol√≠gono creado con 30 puntos
√Årea del pol√≠gono: 0.018114 grados cuadrados

üè† Apartamentos dentro del √°rea de Valencia: 1563
üìä Porcentaje dentro del √°rea: 22.10%

üìà ESTAD√çSTICAS:
Distancia m√≠nima al centro: 0.03 km
Distancia m√°xima al centro: 3.26 km
Distancia promedio al centro: 0.76 km
Precio promedio: $153.95

üèÜ Top 10 apartamentos m√°s cercanos al centro:
                      ID   PRICE  DISTANCIA_AL_CENTRO_KM   LATITUDE  LONGITUDE
665  1512890541606203698   168.0                    0.03  39.470011  -0.376613
268             48823848  1256.0                    0.05  39.470350  -0.37

------------------------------------------------------------------Step 7: Reverse ETL & S3 Export-------------------------------------------------------------------------------

Syncing processed geospatial insights back to Snowflake and performing a Reverse ETL to export the final refined dataset to AWS S3, maintaining a partitioned architecture for downstream consumption.

In [None]:
# ===================== SAVE THE RESULTS IN SNOWFLAKE=====================

#Insert INTO Snowflake
create_filtered_table = """
CREATE OR REPLACE TABLE UEV_MU_ADM.MODULO_2.VALENCIA_FILTERED_BY_AREA (
    ID VARCHAR(100),
    LATITUDE NUMBER(10,6),
    LONGITUDE NUMBER(10,6),
    PRICE NUMBER(10,2),
    PERCENTILE_RANK NUMBER(5,2),
    DISTANCIA_AL_CENTRO_KM NUMBER(10,2),
    DENTRO_DEL_AREA BOOLEAN,
    LOAD_TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
"""

cursor.execute(create_filtered_table)
print("\n‚úÖ Tabla VALENCIA_FILTERED_BY_AREA creada en Snowflake")

# Insert data into the new table
data_to_insert = [
    (
        row['ID'],
        float(row['LATITUDE']),
        float(row['LONGITUDE']),
        float(row['PRICE']),
        float(row['PERCENTILE_RANK']),
        float(row['DISTANCIA_AL_CENTRO_KM']),
        True
    )
    for _, row in df_apartamentos_filtrados.iterrows()
]

insert_query = """
INSERT INTO UEV_MU_ADM.MODULO_2.VALENCIA_FILTERED_BY_AREA 
(ID, LATITUDE, LONGITUDE, PRICE, PERCENTILE_RANK, DISTANCIA_AL_CENTRO_KM, DENTRO_DEL_AREA)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""

cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"‚úÖ {len(data_to_insert)} apartamentos insertados en Snowflake")

# ===================== VIEW FINAL DATAFRAME =====================
print("\nüìã DATAFRAME FINAL (primeras 20 filas):")
print(df_apartamentos_filtrados.head(20))



‚úÖ Tabla VALENCIA_FILTERED_BY_AREA creada en Snowflake
‚úÖ 1563 apartamentos insertados en Snowflake

üìã DATAFRAME FINAL (primeras 20 filas):
                      ID   LATITUDE  LONGITUDE    PRICE  PERCENTILE_RANK  \
266  1484165221264769599  39.474504  -0.384803  10000.0            11.05   
267  1264452543403641618  39.471844  -0.394216   9143.0            11.17   
268             48823848  39.470350  -0.376530   1256.0            11.76   
269             25360723  39.467160  -0.368720   1252.0            11.77   
270             48837508  39.470100  -0.374850   1215.0            11.81   
271  1420861031773004759  39.465469  -0.376300   1000.0            11.83   
272   783233881834500810  39.472845  -0.384255    765.0            11.93   
273   812051449237491927  39.472642  -0.379957    742.0            12.00   
274   597542675554668322  39.473732  -0.372222    689.0            12.04   
275             51860040  39.474630  -0.374320    665.0            12.06   
276             13

This final snowflake table was then uploaded to S3. To do this, we went to snowflake and entered the following code:



COPY INTO @UEV_MU_ADM.MODULO_2.VALENCIA_STAGE

FROM (

    SELECT

        ID,

        LATITUDE,

        LONGITUDE,

        PRICE,

        PERCENTILE_RANK,

        LOAD_TIMESTAMP,

        'FileYear=' || YEAR(CURRENT_DATE()) ||

        '/FileMonth=' || LPAD(MONTH(CURRENT_DATE()), 2, '0') ||

        '/FileDay=' || LPAD(DAY(CURRENT_DATE()), 2, '0') AS partition_path

    FROM UEV_MU_ADM.MODULO_2.VALENCIA_TOP_90_PRICES
)

PARTITION BY (partition_path)

FILE_FORMAT = UEV_MU_ADM.MODULO_2.PARQUET_FORMAT



--We check the data
LIST @UEV_MU_ADM.MODULO_2.VALENCIA_STAGE;

------------------------------------------------------------------Step 8: NoSQL Integration (Amazon DynamoDB)-------------------------------------------------------------------------------

Deploying the geospatial results to Amazon DynamoDB for low-latency access.

Infrastructure: Provisioning tables using Boto3 (Python SDK).

Transformation: Recursively converting Python float types to Decimals to ensure compatibility with DynamoDB's strict data modeling.`

In [None]:
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')

table = dynamodb.create_table(
    TableName='Valencia_GeoJSON',
    KeySchema=[
        {'AttributeName': 'pk', 'KeyType': 'HASH'},
        {'AttributeName': 'sk', 'KeyType': 'RANGE'}
    ],
    AttributeDefinitions=[
        {'AttributeName': 'pk', 'AttributeType': 'S'},
        {'AttributeName': 'sk', 'AttributeType': 'S'}
    ],
    BillingMode='PAY_PER_REQUEST'
)

table.wait_until_exists()
print("‚úÖ Tabla creada")


‚úÖ Tabla creada


In [None]:


def float_to_decimal(obj):
    """
    Convierte recursivamente floats a Decimal
    (necesario para DynamoDB)
    """
    if isinstance(obj, float):
        return Decimal(str(obj))
    elif isinstance(obj, list):
        return [float_to_decimal(i) for i in obj]
    elif isinstance(obj, dict):
        return {k: float_to_decimal(v) for k, v in obj.items()}
    else:
        return obj

# ===================== CONFIG =====================
TABLE_NAME = 'Valencia_GeoJSON'
REGION = 'eu-west-1'
GEOJSON_FILE = 'Valencia_geojson.json'

# ===================== CONNECTION =====================
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)

# ===================== READ GEOJSON =====================
with open(GEOJSON_FILE, 'r', encoding='utf-8') as f:
    geojson_data = json.load(f)

features = geojson_data.get('features', [])
print(f"üìç Features encontrados: {len(features)}")

# ===================== INGEST =====================
# Auxiliary function to recursively convert floats to decimal
def dict_to_decimal(obj):
    if isinstance(obj, list):
        return [dict_to_decimal(i) for i in obj]
    elif isinstance(obj, dict):
        return {k: dict_to_decimal(v) for k, v in obj.items()}
    elif isinstance(obj, float):
        # We use str(obj) to avoid floating-point precision errors
        return Decimal(str(obj))
    return obj

with table.batch_writer() as batch:
    for feature in features:
        feature_id = str(uuid.uuid4())

        geometry = dict_to_decimal(feature.get('geometry', {}))
        properties = dict_to_decimal(feature.get('properties', {}))

        item = {
            'pk': feature_id,
            'sk': geometry.get('type', 'UNKNOWN'),
            'city': properties.get('city', 'Valencia'),
            'geometry': geometry,
            'properties': properties,
            'created_at': datetime.utcnow().isoformat()
        }

        batch.put_item(Item=item)

print("‚úÖ GeoJSON cargado correctamente con tipos Decimal")

üìç Features encontrados: 1


  'created_at': datetime.utcnow().isoformat()


‚úÖ GeoJSON cargado correctamente con tipos Decimal


# Additional comments
Evidence of the creation process will be stipulated in the pdf called results.