In [1]:
from pyspark.sql import SparkSession, DataFrame

In [2]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel

In [3]:
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd

In [None]:
# Abrimos la sesión de Spark que vamos a necesitar...

In [4]:
spark = SparkSession.builder \
    .appName("spark-learning") \
    .getOrCreate()

24/02/15 12:40:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Abrimos el storage_client para poder trabajar con GCS

In [5]:
# Crear una instancia del cliente de Google Cloud Storage
storage_client = storage.Client()

# Listar los buckets en el proyecto
buckets = list(storage_client.list_buckets())

In [6]:
buckets 

[<Bucket: airbnb_equifax>,
 <Bucket: dataproc-staging-europe-west4-662485454498-ndsis5lp>,
 <Bucket: dataproc-temp-europe-west4-662485454498-zcn4vjdp>]

In [7]:
bucket_name = "airbnb_equifax"

In [8]:
# Obtener la instancia del bucket
bucket = storage_client.get_bucket(bucket_name)

# Listar objetos en el bucket
blobs = list(bucket.list_blobs())

In [9]:
# Imprimir los paths de GCS
for blob in blobs:
    print(f'gs://{bucket_name}/{blob.name}')

gs://airbnb_equifax/calendar.csv
gs://airbnb_equifax/calendar.csv.gz
gs://airbnb_equifax/listings.csv
gs://airbnb_equifax/neighbourhoods.csv
gs://airbnb_equifax/reviews.csv


In [10]:
gcs_path = "gs://airbnb_equifax/calendar.csv"

In [None]:
# Elegimos leer desde GCS el archivo csv que contiene los datos de calendar

In [11]:
calendar = spark.read.csv(gcs_path, header=True, inferSchema=True)

                                                                                

In [14]:
# calendar.storageLevel.useMemory

False

In [24]:
df = pd.read_csv(gcs_path)

In [32]:
type(df) --> esta en la memoria de mi maquina

pandas.core.frame.DataFrame

In [31]:
type(calendar) --> esto no esta, excepto que le diga que lo quieron traer

pyspark.sql.dataframe.DataFrame

In [15]:
# Crear una instancia del cliente de BigQuery
bq_client = bigquery.Client()

In [16]:
# Especificar la consulta de BigQuery
query = "SELECT * FROM `bigquery-learning-405922.airbnb.neighbourhoods`"

In [17]:
# Ejecutar la consulta en BigQuery
query_job = bq_client.query(query)
# Obtener los resultados de la consulta como un DataFrame de pandas
neighbourhoods = query_job.to_dataframe()

In [18]:
new_header = neighbourhoods.iloc[0] 
neighbourhoods = neighbourhoods[1:] 
neighbourhoods.columns = new_header 

In [19]:
neighbourhoods.head()

Unnamed: 0,neighbourhood_group,neighbourhood
1,Arganzuela,Acacias
2,Arganzuela,Atocha
3,Arganzuela,Chopera
4,Arganzuela,Delicias
5,Arganzuela,Imperial


In [20]:
del neighbourhoods

In [21]:
# Especificar el nombre completo de la tabla en formato proyecto.dataset.tabla
table_id = 'bigquery-learning-405922.airbnb.listings'

# Leer la tabla directamente en un DataFrame de Spark
listings = spark.read.format("bigquery").option("table", table_id).load()

In [22]:
# Especificar el nombre completo de la tabla en formato proyecto.dataset.tabla
table_id = 'bigquery-learning-405922.airbnb.neighbourhoods'

# Leer la tabla directamente en un DataFrame de Spark
neighbourhoods = spark.read.format("bigquery").option("table", table_id).load()

In [23]:
# Especificar el nombre completo de la tabla en formato proyecto.dataset.tabla
table_id = 'bigquery-learning-405922.airbnb.calendar'

# Leer la tabla directamente en un DataFrame de Spark
calendar = spark.read.format("bigquery").option("table", table_id).load()

In [24]:
# Especificar el nombre completo de la tabla en formato proyecto.dataset.tabla
table_id = 'bigquery-learning-405922.airbnb.reviews'

# Leer la tabla directamente en un DataFrame de Spark
reviews = spark.read.format("bigquery").option("table", table_id).load()

In [25]:
all_configs = spark.sparkContext.getConf().getAll()
for config in all_configs:
    print(config)

('spark.eventLog.enabled', 'true')
('spark.dynamicAllocation.minExecutors', '1')
http://hjkh6yhxlnfhlmb4uup5zj6nlq-dot-europe-west4.dataproc.googleusercontent.com:80/gateway/default/yarn/proxy/application_1707998419727_0001')
('spark.dataproc.sql.joinConditionReorder.enabled', 'true')
('spark.eventLog.dir', 'gs://dataproc-temp-europe-west4-662485454498-zcn4vjdp/037393d0-4992-4d9e-91cf-99e3863493a9/spark-job-history')
('spark.yarn.historyServer.address', 'cluster-f0de-m:18080')
('spark.dataproc.sql.local.rank.pushdown.enabled', 'true')
('spark.history.fs.logDirectory', 'gs://dataproc-temp-europe-west4-662485454498-zcn4vjdp/037393d0-4992-4d9e-91cf-99e3863493a9/spark-job-history')
('spark.yarn.unmanagedAM.enabled', 'true')
('spark.ui.filters', 'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter')
('spark.sql.optimizer.runtime.bloomFilter.join.pattern.enabled', 'true')
('spark.metrics.namespace', 'app_name:${spark.app.name}.app_id:${spark.app.id}')
('spark.dataproc.sql.optimizer.jo

In [26]:
# Esta configuración controla la cantidad de memoria que se asigna al proceso del controlador (driver) de Spark.
# El controlador es responsable de coordinar la ejecución de tareas en el clúster.
spark.conf.get("spark.executor.memory")

'2893m'

In [27]:
spark.conf.get("spark.executor.cores")

'1'

In [28]:
# Esta configuración controla la cantidad de memoria que se asigna a cada ejecutor en el clúster.
spark.conf.get("spark.driver.memory")

'2048m'

In [52]:
calendar.explain(True)

== Parsed Logical Plan ==
Relation [listing_id#658L,date#659,available#660,price#661L,adjusted_price#662L,minimum_nights#663L,maximum_nights#664L] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@13fbc168

== Analyzed Logical Plan ==
listing_id: bigint, date: date, available: boolean, price: bigint, adjusted_price: bigint, minimum_nights: bigint, maximum_nights: bigint
Relation [listing_id#658L,date#659,available#660,price#661L,adjusted_price#662L,minimum_nights#663L,maximum_nights#664L] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@13fbc168

== Optimized Logical Plan ==
Relation [listing_id#658L,date#659,available#660,price#661L,adjusted_price#662L,minimum_nights#663L,maximum_nights#664L] com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@13fbc168

== Physical Plan ==
*(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@13fbc168 [listing_id#658L,date#659,available#660,price#661L,adjusted_price#662L,minimum_nights#663L,maximum_

In [53]:
# Listar los nombres de las variables en el entorno global
global_variables = globals()

# Filtrar las variables que son instancias de DataFrame de Spark
dataframes_spark = [(var_name, var_value) for var_name, var_value in global_variables.items() if isinstance(var_value, DataFrame)]

# [k for (k, v) in globals().items() if isinstance(v, DataFrame)]
# Imprimir los nombres de los DataFrames de Spark
for df_name, df_value in dataframes_spark:
    print(f"Nombre del DataFrame: {df_name}")

Nombre del DataFrame: _17
Nombre del DataFrame: calendar
Nombre del DataFrame: listings
Nombre del DataFrame: neighbourhoods
Nombre del DataFrame: reviews


In [29]:
[k for (k, v) in globals().items() if isinstance(v, DataFrame)]

['calendar', 'listings', 'neighbourhoods', 'reviews']

Ejemplo : Encontrar el promedio y la desviación estándar del precio por barrio. (tabla listings)

In [30]:
avg_price_by_neighbourhood = listings.groupBy('neighbourhood') \
                                .agg(F.round(F.avg('price'),2).alias('avg_price'), \
                                     F.stddev('price').alias('stddev_price'))

In [40]:
avg_price_by_neighbourhood.rdd.getNumPartitions()

200

In [41]:
avg_repar= avg_price_by_neighbourhood.repartition(10)

In [42]:
avg_repar.rdd.getNumPartitions()



10

In [63]:
display(avg_price_by_neighbourhood.show(5, truncate=False))

+---------------+---------+------------------+
|neighbourhood  |avg_price|stddev_price      |
+---------------+---------+------------------+
|Hellín         |237.69   |399.5739745167484 |
|Butarque       |53.09    |43.43952092978956 |
|Palos de Moguer|107.01   |196.60877351178573|
|Rejas          |138.32   |213.78520708322225|
|Pacífico       |89.36    |85.40661890158263 |
+---------------+---------+------------------+
only showing top 5 rows



None

In [100]:
avg_price_by_neighbourhood.limit(5).toPandas()

Unnamed: 0,neighbourhood,avg_price,stddev_price
0,Hellín,237.69,399.573975
1,Butarque,53.09,43.439521
2,Palos de Moguer,107.01,196.608774
3,Rejas,138.32,213.785207
4,Pacífico,89.36,85.406619


In [None]:
Ejemplo : Encontrar la proporción de días ocupados para cada mes.

In [43]:
# Agregar una columna 'month' al DataFrame con la información del calendario
availability_by_month = calendar.withColumn('month', F.month('date'))

# Calcular la ocupación mensual
monthly_occupancy = (
    availability_by_month
    .groupBy('month')
    .agg(
        F.count(F.when(availability_by_month.available == 'f', 1)).alias('occupied_days'),
        F.count('listing_id').alias('total_days')
    )
)

# Calcular la tasa de ocupación mensual
monthly_occupancy = monthly_occupancy.withColumn('occupancy_rate', monthly_occupancy.occupied_days / monthly_occupancy.total_days)

In [44]:
monthly_occupancy.show(5)



+-----+-------------+----------+------------------+
|month|occupied_days|total_days|    occupancy_rate|
+-----+-------------+----------+------------------+
|   12|       386370|    769637|0.5020158854109145|
|    1|       370274|    769637|0.4811021299651654|
|    6|       429154|    744810|0.5761925860286516|
|    3|       408782|    769637|0.5311361070218817|
|    5|       408534|    769637|0.5308138771914552|
+-----+-------------+----------+------------------+
only showing top 5 rows



                                                                                

EJERCICIO 2 SESIÓN 1
Queremos encontrar las ciudades (neighbourhood_group) con un promedio de precio superior a 100$ por noche,
pero solo queremos incluir las ciudades que tienen al menos 100 listados.

SELECT
  neighbourhood_group,
  AVG(price) AS avg_price,
  COUNT(*) AS num_listings
FROM
  `<<PROYECT_ID>>.<<DATASET_ID>>.listings`
GROUP BY
  neighbourhood_group
HAVING
  AVG(price) > 100 AND COUNT(*) >= 100
ORDER BY
  avg_price DESC;

In [118]:
result_df = (
    listings
    .groupBy('neighbourhood_group')
    .agg(
        F.avg('price').alias('avg_price'),
        F.count('*').alias('num_listings')
    )
    .filter((F.avg('price') > 100) & (F.count('*') >= 100))
    .orderBy('avg_price', ascending=False)
)

In [123]:
result_df.limit(5).toPandas()

Unnamed: 0,neighbourhood_group,avg_price,num_listings
0,San Blas - Canillejas,253.896127,568
1,Salamanca,163.553864,1708
2,Moncloa - Aravaca,141.950292,684
3,Hortaleza,139.68431,529
4,Centro,136.024324,10607


Encuentra el número total de reseñas para cada listado en Madrid. Muestra el nombre del listado y la cantidad de reseñas.

In [92]:
ejer_1 = (
    listings.alias("l")
    .join(reviews.alias("r"), col("l.id") == col("r.listing_id"))
    .groupBy("l.name")
    .agg(F.count("r.listing_id").alias("cantidad_resenas"))
    .orderBy(col("cantidad_resenas").desc())
)

In [91]:
result.limit(3).toPandas()

                                                                                

Unnamed: 0,name,cantidad_resenas
0,Rental unit in Madrid · ★4.68 · 1 bedroom · 1 ...,4895
1,Rental unit in Madrid · ★4.75 · 1 bedroom · 2 ...,4783
2,Rental unit in Madrid · ★4.88 · 1 bedroom · 2 ...,4218


In [128]:
precios_enero = (
    calendar
    .filter(F.month('DATE') == 1)  # Extracción del mes desde la columna 'DATE'
    .groupBy('listing_id')
    .agg(F.avg('price').alias('precio_promedio_enero'))
)

In [45]:
def calcular_precio_promedio_por_mes(spark, dataframe, mes):
    """
    Calcula el precio promedio para un mes específico.

    Parameters:
    - spark: Objeto SparkSession.
    - dataframe: DataFrame de PySpark.
    - mes: Número del mes (1 para enero, 2 para febrero, etc.).

    Returns:
    - DataFrame con listing_id y precio_promedio.
    """
    precios_por_mes = (
        dataframe
        .filter(F.month('DATE') == mes)  # Extracción del mes desde la columna 'DATE'
        .groupBy('listing_id')
        .agg(F.avg('price').alias(f'precio_promedio_mes_{mes}'))
    )
    return precios_por_mes

In [46]:
calcular_precio_promedio_por_mes(spark,calendar, 6).limit(5).toPandas()

                                                                                

Unnamed: 0,listing_id,precio_promedio_mes_6
0,949719529014418407,406.4
1,23917540,217.866667
2,562980972202430078,15.0
3,48648725,16.6
4,49186036,16.0


In [47]:
gcs_path

'gs://airbnb_equifax/calendar.csv'

In [48]:
calcular_precio_promedio_por_mes(spark,calendar, 6).write.parquet('gs://airbnb_equifax/precio_promedio')

                                                                                

In [58]:
calcular_precio_promedio_por_mes(spark,calendar, 6).write.format("bigquery") \
        .option("table", "bigquery-learning-405922.airbnb.precio_promedio") \
        .option("temporaryGcsBucket", "tmp-spark-bucket") \
        .save()

                                                                                