# Cargamos las librerias

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, DoubleType, DecimalType
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import sqrt
from pyspark.sql.window import Window

# Cargamos los datos

En esta parte leemos los datos con la fución read añadiendo las opciones que requiera cada dataset de forma específica, introduciendo su ruta del bucket de cloud storage y seleccionamos los campos que nos interesan de cada uno.

In [2]:


# Leemos los datos introduciendo su ruta en el bucket de cloud storage y seleccionamos los campos que nos interesan de cada uno

#Para listings selecionamos que los nombres del campo estan en la primera fila
#Que ";" separa los campos 
#Y que infiera el esquema de los datos
listings = spark.read\
                 .option("header", "true")\
                 .option("delimiter", ";")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/listings.csv")\
                 .select("id", "host_id", "neighbourhood", "neighbourhood_group", "latitude", "longitude",\
                         "room_type", "price", "minimum_nights", "number_of_reviews", "availability_365")
listings.show()

#Para reviews selecionamos que los nombres del campo estan en la primera fila
#Que "," separa los campos 
#Y que infiera el esquema de los datos
reviews = spark.read\
                 .option("header", "true")\
                 .option("delimiter", ",")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/reviews.csv")

reviews.show()

#Para population selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
population = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/New_York_City_Population_by_Borough__1950_-_2040.csv")\
                 .select("Borough", "2020", "2020 - Boro share of NYC total")                 

population.show()

#Para population selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
#projects = spark.read\
#                 .option("header", "true")\
#                 .option("inferSchema", "true")\
 #                .csv("gs://bucket-tfm-ucm/data/Active_Projects_Under_Construction.csv")\
  #               .select("Building ID","Borough")

#projects.show()

#Para covid selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
covid = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/COVID-19_Daily_Counts_of_Cases__Hospitalizations__and_Deaths.csv")\
                 .select("DATE_OF_INTEREST", "CASE_COUNT", "BX_CASE_COUNT","BK_CASE_COUNT","MN_CASE_COUNT","QN_CASE_COUNT","SI_CASE_COUNT")

covid.show()

#Para incidents selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
incidents = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/Fire_Incident_Dispatch_Data.csv")\
                 .select("STARFIRE_INCIDENT_ID", "INCIDENT_DATETIME", "INCIDENT_BOROUGH", "INCIDENT_CLASSIFICATION")

incidents.show()

#Para events selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
events = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/Parks_Special_Events.csv")\
                 .select("Borough", "Date and Time","Event Type")

events.show()

#Para crime selecionamos que los nombres del campo estan en la primera fila
#Y que infiera el esquema de los datos
crime = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("gs://bucket-tfm-ucm/data/NYPD_Complaint_Data_Historic.csv")\
                 .select("CMPLNT_NUM", "CMPLNT_FR_DT","BORO_NM", "LAW_CAT_CD", "Latitude", "Longitude")

crime.show()


+-----+-------+------------------+-------------------+--------+---------+---------------+-----+--------------+-----------------+----------------+
|   id|host_id|     neighbourhood|neighbourhood_group|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|availability_365|
+-----+-------+------------------+-------------------+--------+---------+---------------+-----+--------------+-----------------+----------------+
| 2595|   2845|           Midtown|          Manhattan|40.75356|-73.98559|Entire home/apt|  150|            30|               48|             334|
| 3831|   4869|Bedford-Stuyvesant|           Brooklyn|40.68494|-73.95765|Entire home/apt|   73|             1|              409|             214|
| 5121|   7356|Bedford-Stuyvesant|           Brooklyn|40.68535|-73.95512|   Private room|   60|            30|               50|             365|
| 5136|   7378|       Sunset Park|           Brooklyn|40.66265|-73.99454|Entire home/apt|  275|             5|              

# Preprocesado listings

In [3]:
#Mostramos que el dataset viene algo sucio
listings.orderBy("id").show()

# Como se puede ver los datos vienen con valores null y con registros que están mal asignados, como IDs no númericos
# convertimos todas las columnas que son susceptibles de ser integer a este tipo, para que se vuelvan null los registros erroneos 
#y así quitar todas las filas mal asignadas

#En esta transformación vamos a cambiar id a int utilizando withcolumn y la fución cast
#host_id a int utilizando withcolumn y la fución cast
#latitude a double utilizando withcolumn y la fución cast
#longitude a double utilizando withcolumn y la fución cast
#price a int utilizando withcolumn y la fución cast
#minimum_nights a int utilizando withcolumn y la fución cast
#number_of_reviews a int utilizando withcolumn y la fución cast
#availabity_ 365 a int utilizando withcolumn y la fución cast

listings_df = listings\
.withColumn("id", F.col("id").cast('int'))\
.withColumn("host_id", F.col("host_id").cast('int'))\
.withColumn("latitude", F.col("latitude").cast('double'))\
.withColumn("longitude", F.col("longitude").cast('double'))\
.withColumn("price", F.col("price").cast('int'))\
.withColumn("minimum_nights", F.col("minimum_nights").cast('int'))\
.withColumn("number_of_reviews", F.col("number_of_reviews").cast('int'))\
.withColumn("availability_365", F.col("availability_365").cast('int'))

print("Total de registros")
print(listings_df.count())
#Eliminamos NAs y nulls con na.drop
listings_noNA = listings_df.na.drop()
print("Total de registros sin NAs")
print(listings_noNA.count())
#Eliminamos duplicados con dropduplicates
listings_noDup = listings_noNA.dropDuplicates()
print("Total de registros sin duplicidad")
print(listings_noDup.count())

#comprobamos los valores de minimum_nights con la función orderBy 
listings_noDup.orderBy(F.col("minimum_nights").desc()).show()

# Vamos a optar por quitar los apartamentos que cuyo minimum_nights sea mayor de un año porque son pocos registros y posiblemente esten mal,
# y tambien vamos a eliminar los registros cuyo valor de precio sea 0.
listings_filter = listings_noDup.filter(listings_noDup['price']>0)\
                  .filter(listings_noDup['minimum_nights']<366)

print("Total de registros después de filtrar")
print(listings_filter.count())

# Por último creamos nuevas columnas en funcion de otras que nos serán útiles para las tablas finales.
#Generamos la columna Distance valiendonos de la funcion de distancia entre dos puntos en un plano y las coordenadas de los Airbnb y las del centro de manhattan
#Generamos la columna PriceByTime multiplicando la columna price por number_of_reviews
#Generamos la columna Income que serán los beneficios mínimos, que es lo que podemos calcular con estos datos, multiplicado price bytimee por number of reviews
#Cambiamos de los id a string
listingsDF = listings_filter\
.withColumn("Distance", F.sqrt((F.col("latitude")-40.752298)**2 + (F.col("longitude")+73.985949)**2))\
.withColumn("PriceByTime",F.col("price") * F.col("minimum_nights"))\
.withColumn("Income",F.col("PriceByTime") * F.col("number_of_reviews"))\
.withColumn("id", F.col("id").cast('string'))\
.withColumn("host_id", F.col("host_id").cast('string'))

listingsDF.show()

+--------------------+--------------------+-------------+-------------------+----------+---------------+---------------+---------+--------------+-----------------+----------------+
|                  id|             host_id|neighbourhood|neighbourhood_group|  latitude|      longitude|      room_type|    price|minimum_nights|number_of_reviews|availability_365|
+--------------------+--------------------+-------------+-------------------+----------+---------------+---------------+---------+--------------+-----------------+----------------+
|                null|                null|         null|               null|      null|           null|           null|     null|          null|             null|            null|
|                null|                null|         null|               null|      null|           null|           null|     null|          null|             null|            null|
|                null|                null|         null|               null|      null|       

# Preprocesado de Reviews

In [4]:
#Mostramos la columnas de reviews
reviews.show()

print("Total de registros")
print(reviews.count())
#Eliminamos NAs y nulls con na.drop
reviews_noNA = reviews.na.drop()
print("Total de registros sin NAs")
print(reviews_noNA.count())

#la columna date fue inferida como timestamp pero no tenemos la hora del registro por lo que lo covertimos a formato date
#cambiamos date al formato date utilizando withcolumn y la fución to_date
reviews_todate = reviews_noNA.withColumn("date",to_date("date"))

#Generamos nuevas columnas de Year, Month y DayOfMonth con las funciones homonimas dentro de la función withcolumn
reviewsDF = reviews_todate\
            .withColumn('Year', year(col('date')))\
            .withColumn('Month', month(col('date')))\
            .withColumn('DayOfMonth', dayofmonth(col('date')))
    
    
reviewsDF.show(truncate=False) 

+----------+-------------------+
|listing_id|               date|
+----------+-------------------+
|      2595|2009-11-21 00:00:00|
|      2595|2009-12-05 00:00:00|
|      2595|2009-12-10 00:00:00|
|      2595|2010-04-09 00:00:00|
|      2595|2010-05-25 00:00:00|
|      2595|2012-05-07 00:00:00|
|      2595|2012-05-17 00:00:00|
|      2595|2012-08-18 00:00:00|
|      2595|2013-05-20 00:00:00|
|      2595|2014-05-21 00:00:00|
|      2595|2014-07-10 00:00:00|
|      2595|2014-09-28 00:00:00|
|      2595|2014-10-07 00:00:00|
|      2595|2014-10-18 00:00:00|
|      2595|2015-03-30 00:00:00|
|      2595|2015-04-21 00:00:00|
|      2595|2015-05-19 00:00:00|
|      2595|2015-09-21 00:00:00|
|      2595|2015-09-28 00:00:00|
|      2595|2016-04-11 00:00:00|
+----------+-------------------+
only showing top 20 rows

Total de registros
908803
Total de registros sin NAs
908803
+----------+----------+----+-----+----------+
|listing_id|date      |Year|Month|DayOfMonth|
+----------+----------+----+--

# Preprocesado de Population

In [5]:
population.printSchema()
#vamos a quitar la fila del total de población de Nueva York,
#ya que, no nos interesa y puede entorpecer los join que haremos más adelante
#para esto usamos la función filter
#también cambiamos el nobre de la columna 2020 por population_2020
populationDF = population.filter("Borough != 'NYC Total'")\
                         .withColumnRenamed("2020", "population_2020")
population.show()
populationDF.show()

root
 |-- Borough: string (nullable = true)
 |-- 2020: integer (nullable = true)
 |-- 2020 - Boro share of NYC total: double (nullable = true)

+----------------+-------+------------------------------+
|         Borough|   2020|2020 - Boro share of NYC total|
+----------------+-------+------------------------------+
|       NYC Total|8550971|                         100.0|
|           Bronx|1446788|                         16.92|
|        Brooklyn|2648452|                         30.97|
|       Manhattan|1638281|                         19.16|
|          Queens|2330295|                         27.25|
|   Staten Island| 487155|                           5.7|
+----------------+-------+------------------------------+

+----------------+---------------+------------------------------+
|         Borough|population_2020|2020 - Boro share of NYC total|
+----------------+---------------+------------------------------+
|           Bronx|        1446788|                         16.92|
|        Br

# Limpieza de Covid

In [6]:
covid.show()

#Pasamos las fechas de string a date utilizando withcolumn y la fución to_date
covid_todate = covid.withColumn("DATE_OF_INTEREST",to_date(col("DATE_OF_INTEREST"),"MM/dd/yyyy"))

print("Total de casos covid")
print(covid_todate.count())
#Eliminamos NAs y nulls con na.drop
covid_noNA = covid_todate.na.drop()
print("Total de casos covid sin NAs")
print(covid_noNA.count())
covid_noNA.show(n=20, truncate=False)

# creamos una tabla que va a calcular la suma de los casos diarios en los últimos tres meses registrados en cada distrito
# utilizamos fiter para filtrar por fecha 
# Con agg indicamos que cada columna será la suma de los valores de cada campo
covid_noNA_sum = covid_noNA.filter(covid_noNA['DATE_OF_INTEREST']>"2022-01-06")\
              .agg({'BX_CASE_COUNT': 'sum', 'BK_CASE_COUNT': 'sum','MN_CASE_COUNT': 'sum','QN_CASE_COUNT': 'sum','SI_CASE_COUNT': 'sum'})

covid_noNA_sum.show()

#cambiamos los nombres de las columnas para evitar errores con la función withColumnRenamed
covid_noNA_sum_rn = covid_noNA_sum.withColumnRenamed("sum(BX_CASE_COUNT)", "sum_BX")\
                               .withColumnRenamed("sum(SI_CASE_COUNT)", "sum_SI",)\
                               .withColumnRenamed("sum(MN_CASE_COUNT)", "sum_MN")\
                               .withColumnRenamed("sum(QN_CASE_COUNT)", "sum_QN")\
                               .withColumnRenamed("sum(BK_CASE_COUNT)", "sum_BK")

covid_noNA_sum_rn.show()

#Por último colocamos las columnas como filas creando una vista temporal con createOrReplaxceTempView y aplicando la función stack
covid_noNA_sum_rn.createOrReplaceTempView("sumas")
covidDF = spark.sql("select stack(5,'Bronx', sum_BX, 'Brooklyn', sum_BK, 'Queens', sum_QN, 'Manhattan', sum_MN, 'Staten Island', sum_SI)as(covid_Borough, N_covid_cases) from sumas")

covidDF.show()


+----------------+----------+-------------+-------------+-------------+-------------+-------------+
|DATE_OF_INTEREST|CASE_COUNT|BX_CASE_COUNT|BK_CASE_COUNT|MN_CASE_COUNT|QN_CASE_COUNT|SI_CASE_COUNT|
+----------------+----------+-------------+-------------+-------------+-------------+-------------+
|      02/29/2020|         1|            0|            0|            1|            0|            0|
|      03/01/2020|         0|            0|            0|            0|            0|            0|
|      03/02/2020|         0|            0|            0|            0|            0|            0|
|      03/03/2020|         1|            0|            0|            0|            1|            0|
|      03/04/2020|         5|            0|            1|            2|            2|            0|
|      03/05/2020|         3|            0|            3|            0|            0|            0|
|      03/06/2020|         8|            2|            1|            3|            1|            1|


# Limpieza de Incidents

In [7]:
#Pasamos las fechas de string a date utilizando withcolumn y la fución to_date
#id a string utilizando withcolumn y la fución cast
incidents_todate =incidents\
.withColumn("STARFIRE_INCIDENT_ID", F.col("STARFIRE_INCIDENT_ID").cast('string'))\
.withColumn("INCIDENT_DATETIME",to_date(col("INCIDENT_DATETIME"),"MM/dd/yyyy"))

incidents_todate.printSchema()


#cambiamos los nombres de las columnas para evitar errores con la función when y otherwise
incidents_BORO = incidents_todate.withColumn("INCIDENT_BOROUGH", F.when(F.col("INCIDENT_BOROUGH") == "BROOKLYN", "Brooklyn")\
                                                               .when(F.col("INCIDENT_BOROUGH") == "QUEENS", "Queens")\
                                                               .when(F.col("INCIDENT_BOROUGH") == "BRONX", "Bronx")\
                                                               .when(F.col("INCIDENT_BOROUGH") == "RICHMOND / STATEN ISLAND", "Staten Island")\
                                                               .otherwise("Manhattan"))
print("Total de incidentes")
print(incidents_BORO.count())
#Eliminamos NAs y nulls con na.drop
incidents_noNA = incidents_BORO.na.drop()
print("Total de incidentes sin NAs")
print(incidents_noNA.count())

incidents_noNA.show()

#Filtramos para quedarnos con los incidentes de los últimos 5 años registrados con filter
incidents_filter = incidents_noNA.filter(incidents_noNA['INCIDENT_DATETIME']>"2016-05-04")

#Creamos dos tablas diferentes para la generación de las tablas finales

# incidentes por distritos
#agrupamos por distritos con groupBy y añadimos una columna con el conteo de los registros con agg
incidentsDF_BORO = incidents_filter.groupBy("INCIDENT_BOROUGH").agg(count("INCIDENT_BOROUGH").alias("N_incidents"))

incidentsDF_BORO.show()

# incidentes por año mes y día
#Generamos nuevas columnas de Year, Month y DayOfMonth con las funciones homonimas dentro de la función withcolumn
incidentsDF_YMD = incidents_filter\
            .withColumn('Year', year(col('INCIDENT_DATETIME')))\
            .withColumn('Month', month(col('INCIDENT_DATETIME')))\
            .withColumn('DayOfMonth', dayofmonth(col('INCIDENT_DATETIME')))\
            .filter(F.col('Year')>2015)

incidentsDF_YMD.show()

root
 |-- STARFIRE_INCIDENT_ID: string (nullable = true)
 |-- INCIDENT_DATETIME: date (nullable = true)
 |-- INCIDENT_BOROUGH: string (nullable = true)
 |-- INCIDENT_CLASSIFICATION: string (nullable = true)

Total de incidentes
8538438
Total de incidentes sin NAs
8538418
+--------------------+-----------------+----------------+-----------------------+
|STARFIRE_INCIDENT_ID|INCIDENT_DATETIME|INCIDENT_BOROUGH|INCIDENT_CLASSIFICATION|
+--------------------+-----------------+----------------+-----------------------+
|     500127850130001|       2005-01-01|   Staten Island|   Carbon Monoxide -...|
|     500133070120471|       2005-01-01|           Bronx|   Medical MFA - PD ...|
|     500103630140001|       2005-01-01|        Brooklyn|   Medical - Assist ...|
|     500175150150001|       2005-01-01|          Queens|   Non-Medical MFA -...|
|     500171620150003|       2005-01-01|          Queens|   Sprinkler System ...|
|     500116870110001|       2005-01-01|       Manhattan|   Utility Emer

# Limpieza de Events

In [8]:
events.printSchema()

#Pasamos las fechas de string a date con la función to_date y withcolumn
#cambiamos el nombre de la columna borough para evitar errores en el join con la función withColumnRenamed
events_todate = events.withColumn("Date and Time",to_date(col("Date and Time"),"MM/dd/yyyy"))\
                      .withColumnRenamed("Borough", "event_borough")

print("Total de eventos")
print(events_todate.count())
#Eliminamos NAs y nulls con na.drop
events_noNA = events_todate.na.drop()
print("Total de eventos sin NAs")
print(events_noNA.count())

events_noNA.show()


#Creamos dos tablas diferentes para la generación de las tablas finales

# eventos por distritos
#Filtramos para obteber los eventos en los últimos tres meses registrados con la función filter
events_filter = events_noNA.filter(events_noNA['Date and Time']>"2021-09-18")
#agrupamos por distrito con groupBy y contamos el número de eventos con agg
eventsDF_BORO = events_filter.groupBy("event_borough").agg(count("event_borough").alias("N_events"))

eventsDF_BORO.show()

# eventos por fechas
#Filtramos para obteber los eventos en los últimos cinco años registrados con filter
events_filter2 = events_noNA.filter(events_noNA['Date and Time']>"2016-12-18")

#Generamos nuevas columnas de Year, Month y DayOfMonth con las funciones homonimas dentro de la función withcolumn
eventsDF_YMD = events_filter2\
            .withColumn('Year', year(F.col('Date and Time')))\
            .withColumn('Month', month(F.col('Date and Time')))\
            .withColumn('DayOfMonth', dayofmonth(F.col('Date and Time')))


eventsDF_YMD.show()

root
 |-- Borough: string (nullable = true)
 |-- Date and Time: string (nullable = true)
 |-- Event Type: string (nullable = true)

Total de eventos
5529
Total de eventos sin NAs
5529
+-------------+-------------+--------------------+
|event_borough|Date and Time|          Event Type|
+-------------+-------------+--------------------+
|    Manhattan|   2018-04-12|Tournament/Compet...|
|    Manhattan|   2018-01-20|         Local Event|
|        Bronx|   2018-06-11|Community Based E...|
|    Manhattan|   2019-06-07|Community Based E...|
|    Manhattan|   2019-06-08|Agency Produced E...|
|Staten Island|   2019-06-08|Community Based E...|
|Staten Island|   2019-06-01|Community Based E...|
|    Manhattan|   2019-06-06|Community Based E...|
|    Manhattan|   2019-06-05|Agency Produced E...|
|    Manhattan|   2019-06-04|Community Based E...|
|    Manhattan|   2019-06-06|Community Based E...|
|     Brooklyn|   2019-06-12|Agency Produced E...|
|    Manhattan|   2019-06-11|Community Based E...|


# Limpieza de Crime

In [9]:
crime.printSchema()

#Pasamos las fechas de string a date con la función to_date y withcolumn
#cambiamos el tipo de la columna CMPLNT_NUM a string utilizando withcolumn y la fución cast
crime_todate =crime\
.withColumn("CMPLNT_NUM", F.col("CMPLNT_NUM").cast('string'))\
.withColumn("CMPLNT_FR_DT",to_date(col("CMPLNT_FR_DT"),"MM/dd/yyyy"))

#cambiamos los nombres de las columnas para evitar errores con la función when y otherwise
crime_BORO = crime_todate.withColumn("BORO_NM", F.when(F.col("BORO_NM") == "BROOKLYN", "Brooklyn")\
                                                               .when(F.col("BORO_NM") == "QUEENS", "Queens")\
                                                               .when(F.col("BORO_NM") == "BRONX", "Bronx")\
                                                               .when(F.col("BORO_NM") == "STATEN ISLAND", "Staten Island")\
                                                               .otherwise("Manhattan"))

print("Total de crimenes")
print(crime_BORO.count())
#Eliminamos NAs y nulls con na.drop
crime_noNA = crime_BORO.na.drop()
print("Total de crimenes sin NAs")
print(crime_noNA.count())

#Filtramos para obteber los crimenes en los últimos cinco años registrados con filter
crime_filter = crime_noNA.filter(crime_noNA['CMPLNT_FR_DT']>"2015-12-31")


#Creamos tres tablas diferentes para la generación de las tablas finales

#localización de los crimenes
#seleccionamos las columnas que nos interezan con select y filtramos por fecha con filter
loc_crime = crime_filter.select("CMPLNT_NUM","Latitude","Longitude","LAW_CAT_CD","CMPLNT_FR_DT").filter(F.col("CMPLNT_FR_DT")>"2020-06-31")
loc_crime.show()

# crimenes por distritos
#agrupamos por distrito con groupBy y con agg contamos el número de registros
crimeDF_BORO = crime_filter.groupBy("BORO_NM").agg(count("BORO_NM").alias("N_crimes"))

crimeDF_BORO.show()

# crimenes por fechas
#Generamos nuevas columnas de Year, Month y DayOfMonth con las funciones homonimas dentro de la función withcolumn
#filtramos para años mayores de 2015 con filter
crimeDF_YMD = crime_filter\
            .withColumn('Year', year(F.col('CMPLNT_FR_DT')))\
            .withColumn('Month', month(F.col('CMPLNT_FR_DT')))\
            .withColumn('DayOfMonth', dayofmonth(F.col('CMPLNT_FR_DT')))\
            .filter(F.col('Year')>2015)

crimeDF_YMD.show()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)

Total de crimenes
7375993
Total de crimenes sin NAs
7358006
+----------+------------------+------------------+-----------+------------+
|CMPLNT_NUM|          Latitude|         Longitude| LAW_CAT_CD|CMPLNT_FR_DT|
+----------+------------------+------------------+-----------+------------+
| 808857971| 40.81573205300003|-73.94542041099999|     FELONY|  2020-12-22|
| 990133707| 40.65132343000005|-73.95696100099997|     FELONY|  2020-09-02|
| 420348781|40.877554428000046|-73.87293947099994|     FELONY|  2020-07-20|
| 462496511| 40.83778161800007|-73.91945797099999|     FELONY|  2020-07-02|
| 921351410|40.799466801000044|-73.95153053599995|     FELONY|  2020-11-04|
| 138981745| 40.67970040800003|-73.77604736799998|     FELONY|  2020-12-31|
|

# Carga de tablas definitivas

### Apartamentos mas cercanos al centro de Manhattan con precio y noches mínimas

In [10]:
#ordenamos con orderBy y seleccionamos las comlumnas que nos interesen
near_Airbnb = listingsDF.orderBy(F.col("Distance").asc()).select("id","Distance","PriceByTime","price","minimum_nights")
near_Airbnb.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#near_Airbnb.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/apart_cerc")

+--------+--------------------+-----------+-----+--------------+
|      id|            Distance|PriceByTime|price|minimum_nights|
+--------+--------------------+-----------+-----+--------------+
|42518749|2.313114783144816...|      15000|  500|            30|
|52687730|6.124744892562974E-4|       1635| 1635|             1|
|52687752|6.616381186043515E-4|       1270| 1270|             1|
|42197478|7.325742283231752E-4|       7500|  250|            30|
|15900336|8.993358660793196E-4|       4050|  135|            30|
|49011008|9.066449139463447E-4|        677|  677|             1|
|52687700|9.235935253073965E-4|        818|  818|             1|
|49011001| 9.67122019184211E-4|        298|  298|             1|
|49011009|9.796249282230682E-4|       1016| 1016|             1|
|22376956|0.001003884953563...|       9900|  330|            30|
|49010997|0.001058454061355...|       1355| 1355|             1|
|46336790|0.001063120407107...|         90|   90|             1|
|46334868|0.0011467192332

### Tabla con tasa de criminalidad, tasa de contagios, nº de obras activas, nº de incidentes, nº de eventos por distrito

In [12]:
#Primero creamos una tabla para calcular la tasa de criminalidad por medio de un join de populationDF y crimeDf_BORO
crime_rateDF = populationDF.join(crimeDF_BORO,trim(populationDF.Borough) == trim(crimeDF_BORO.BORO_NM),"inner")

#El calculo de la tasa sera por cada 100000 habitantes para los 5 últimos años registrados
crime_rateDF = crime_rateDF.withColumn("Crime_Rate",(F.col("N_crimes") / F.col("population_2020"))*100000).select("BORO_NM","Crime_Rate" )
crime_rateDF.show()

#Despues creamos una tabla para calcular la tasa de contagios por medio de un join de populationDF y covidDf_BORO
covid_rateDF = populationDF.join(covidDF,trim(populationDF.Borough) == trim(covidDF.covid_Borough),"inner")

#El calculo de la tasa sera por cada 100000 habitantes para los 3 últimos meses registrados
covid_rateDF = covid_rateDF.withColumn("Covid_rate",(F.col("N_covid_cases") / F.col("population_2020"))*100000).select("covid_Borough","Covid_rate" )
covid_rateDF.show()

#Creamos la tabla definitiva co un join de las nuevas más las ateriormente creadas
boroDF = populationDF.join(covid_rateDF,trim(populationDF.Borough) == trim(covid_rateDF.covid_Borough),"inner")\
                     .join(crime_rateDF,trim(populationDF.Borough) == trim(crime_rateDF.BORO_NM),"inner")\
                     .join(incidentsDF_BORO,trim(populationDF.Borough) == trim(incidentsDF_BORO.INCIDENT_BOROUGH),"inner")\
                     .join(eventsDF_BORO,trim(populationDF.Borough) == trim(eventsDF_BORO.event_borough),"inner")\
                     .select("Borough", "population_2020", "Covid_rate", "Crime_Rate", "N_incidents", "N_events")

boroDF.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#boroDF.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/boro_rates")

+-------------+------------------+
|      BORO_NM|        Crime_Rate|
+-------------+------------------+
|       Queens| 19492.25312675005|
|     Brooklyn|24999.697936757017|
|Staten Island| 20373.59772556989|
|    Manhattan| 33954.98086103666|
|        Bronx| 34446.23538486633|
+-------------+------------------+

+-------------+------------------+
|covid_Borough|        Covid_rate|
+-------------+------------------+
|        Bronx|2536.1697774656686|
|     Brooklyn|2877.4922105441215|
|    Manhattan| 2596.807263222854|
|       Queens| 2981.468011560768|
|Staten Island|3284.5808828812183|
+-------------+------------------+

+----------------+---------------+------------------+------------------+-----------+--------+
|         Borough|population_2020|        Covid_rate|        Crime_Rate|N_incidents|N_events|
+----------------+---------------+------------------+------------------+-----------+--------+
|          Queens|        2330295| 2981.468011560768| 19492.25312675005|     556434|  

### Barrios con precio medio más alto y distrito al que pertenecen

In [13]:
#Vamos a seleccionar solo los apartamentos con unas noches minimas de 10 o menos porque tienen más interes turístico
#filtramos las noche con filter
#agrupamos por barrio y distrito con groupBy
#añadimos la columna avg_price que sera la media de price por barrio con agg
#ordenamos con orderBy
borough_pricesDF = listingsDF.filter(listingsDF['minimum_nights']<11)\
                             .groupBy("neighbourhood","neighbourhood_group")\
                             .agg(F.mean("price").alias("avg_price"))\
                             .orderBy(F.col("avg_price").desc())
                            
borough_pricesDF.show()
#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#borough_pricesDF.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/barrios_avg")

+-----------------+-------------------+------------------+
|    neighbourhood|neighbourhood_group|         avg_price|
+-----------------+-------------------+------------------+
|  Jamaica Estates|             Queens|            1519.2|
|        Riverdale|              Bronx| 574.3333333333334|
|Flatiron District|          Manhattan|511.10526315789474|
|          Tribeca|          Manhattan| 483.2926829268293|
|             SoHo|          Manhattan| 400.0777777777778|
| Brooklyn Heights|           Brooklyn| 346.9259259259259|
|     West Village|          Manhattan| 341.1587301587302|
|      Willowbrook|      Staten Island|             309.0|
|          Chelsea|          Manhattan|307.90595611285266|
|         Neponsit|             Queens|             295.0|
|             NoHo|          Manhattan| 290.3333333333333|
|          Midtown|          Manhattan| 286.0120627261761|
|         Red Hook|           Brooklyn| 285.0882352941176|
|Greenwich Village|          Manhattan| 281.826923076923

### Nº de reviews agrupados por fecha

Inicialmente estas tablas las calculabamos con los valores de año y mes pero tableau trabaja mejor con las fechas directamente

In [14]:
#agrupamos por fecha con groupBy
#contamos los registros con agg
#ordenamos con OrderBy
reviews_countbydate = reviewsDF.groupBy("date").agg(count("*").alias("N_reviews")).orderBy(F.col("N_reviews").desc())
reviews_countbydate.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#reviews_countbydate.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/reviews_cbd")

+----------+---------+
|      date|N_reviews|
+----------+---------+
|2021-12-12|     1966|
|2021-10-31|     1901|
|2021-11-28|     1869|
|2021-12-05|     1813|
|2021-11-14|     1755|
|2021-12-19|     1653|
|2021-09-06|     1603|
|2021-11-21|     1554|
|2021-11-07|     1545|
|2021-10-24|     1480|
|2020-01-01|     1450|
|2021-10-17|     1439|
|2021-12-13|     1376|
|2021-11-01|     1373|
|2021-10-03|     1365|
|2021-10-10|     1360|
|2019-09-02|     1336|
|2021-08-01|     1307|
|2020-01-02|     1304|
|2021-09-26|     1301|
+----------+---------+
only showing top 20 rows



### Nº de incidentes agrupados por fecha

In [15]:
#agrupamos por fecha con groupBy
#contamos los registros con agg
#ordenamos con OrderBy
incidents_countbydate = incidentsDF_YMD.groupBy("INCIDENT_DATETIME").agg(count("*").alias("N_incidents")).orderBy(F.col("N_incidents").desc())
incidents_countbydate.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#incidents_countbydate.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/incidents_cbd")

+-----------------+-----------+
|INCIDENT_DATETIME|N_incidents|
+-----------------+-----------+
|       2020-08-04|       4084|
|       2018-01-08|       2995|
|       2018-03-02|       2982|
|       2018-01-09|       2864|
|       2018-01-04|       2845|
|       2020-08-05|       2667|
|       2018-03-07|       2592|
|       2018-01-12|       2538|
|       2018-01-07|       2408|
|       2018-01-03|       2384|
|       2018-01-11|       2332|
|       2018-01-02|       2300|
|       2018-03-08|       2287|
|       2018-01-01|       2285|
|       2019-01-22|       2260|
|       2020-07-10|       2259|
|       2018-09-12|       2244|
|       2018-01-10|       2201|
|       2018-11-15|       2174|
|       2018-01-05|       2172|
+-----------------+-----------+
only showing top 20 rows



### Nº de crimenes agrupados por fecha

In [16]:
#agrupamos por fecha con groupBy
#contamos los registros con agg
#ordenamos con OrderBy
crimes_countbydate = crimeDF_YMD.groupBy("CMPLNT_FR_DT").agg(count("*").alias("N_crimes")).orderBy(F.col("N_crimes").desc())
crimes_countbydate.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#crimes_countbydate.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/crimes_cbd")

+------------+--------+
|CMPLNT_FR_DT|N_crimes|
+------------+--------+
|  2016-01-01|    2345|
|  2017-01-01|    2179|
|  2018-01-01|    2074|
|  2019-01-01|    2064|
|  2016-06-01|    1949|
|  2020-06-01|    1857|
|  2016-04-01|    1823|
|  2017-06-01|    1814|
|  2018-09-01|    1736|
|  2020-01-01|    1733|
|  2016-07-01|    1712|
|  2018-06-01|    1698|
|  2017-07-01|    1685|
|  2017-12-01|    1681|
|  2018-11-01|    1669|
|  2017-09-01|    1658|
|  2016-08-01|    1656|
|  2017-10-06|    1630|
|  2016-11-01|    1623|
|  2017-05-01|    1623|
+------------+--------+
only showing top 20 rows



### Nº de eventos agrupados por fecha

In [17]:
#agrupamos por fecha con groupBy
#contamos los registros con agg
#ordenamos con OrderBy
events_countbydate = eventsDF_YMD.groupBy("Date and Time").agg(count("*").alias("N_events")).orderBy(F.col("N_events").desc())
events_countbydate.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#events_countbydate.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/events_cbd")

+-------------+--------+
|Date and Time|N_events|
+-------------+--------+
|   2020-07-21|      63|
|   2020-08-18|      60|
|   2020-08-05|      57|
|   2020-08-25|      52|
|   2020-08-11|      51|
|   2020-07-22|      50|
|   2020-07-28|      49|
|   2020-08-20|      46|
|   2020-07-14|      46|
|   2020-07-29|      45|
|   2020-07-20|      45|
|   2020-09-01|      44|
|   2020-07-16|      44|
|   2020-07-30|      43|
|   2020-07-23|      43|
|   2020-09-08|      42|
|   2020-08-26|      42|
|   2020-08-27|      41|
|   2020-08-10|      41|
|   2020-07-15|      41|
+-------------+--------+
only showing top 20 rows



### Nº de eventos agrupados por fecha

In [18]:
#agrupamos por fecha con groupBy
#contamos los registros con agg
#ordenamos con OrderBy
covid_cases_countbydate = covid_noNA.select("DATE_OF_INTEREST","CASE_COUNT").orderBy(F.col("CASE_COUNT").desc())
covid_cases_countbydate.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#covid_cases_countbydate.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/covid_cbd")

+----------------+----------+
|DATE_OF_INTEREST|CASE_COUNT|
+----------------+----------+
|      2022-01-03|     54701|
|      2021-12-27|     50877|
|      2021-12-29|     50111|
|      2021-12-28|     49133|
|      2022-01-04|     48449|
|      2021-12-30|     47031|
|      2022-01-05|     39639|
|      2022-01-06|     35990|
|      2022-01-02|     32380|
|      2021-12-23|     31736|
|      2021-12-22|     31282|
|      2021-12-21|     30312|
|      2021-12-31|     29012|
|      2021-12-20|     27892|
|      2022-01-10|     24149|
|      2022-01-07|     23031|
|      2021-12-26|     22851|
|      2022-01-12|     18204|
|      2022-01-11|     17920|
|      2021-12-24|     17379|
+----------------+----------+
only showing top 20 rows



### Ingresos minimos de cada dueño y nº de apartamentos que posee

In [19]:
#agrupamos por host_id
#contamos los registros y sumamos los beneficios con agg
#ordenamos con orderBy
host_income = listingsDF.groupBy("host_id").agg(F.count("host_id").alias("N_AirBnB"),F.sum("Income").alias("Income")).orderBy(F.col("Income").desc())
#filtramos para quedarnos con los dueños que tengan mas de 5 Airbnb con filter
host_income = host_income.filter(host_income['N_AirBnB']>5)
host_income.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#host_income.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/incomes_host")

+---------+--------+--------+
|  host_id|N_AirBnB|  Income|
+---------+--------+--------+
|   836168|      10|11400000|
|116839989|       8| 6365156|
|  7245581|      19| 3669237|
| 61391963|     105| 2692860|
| 51501835|     186| 2459240|
| 66326553|       7| 2020380|
|224815152|      10| 1670700|
|100238132|      12| 1505616|
|  2119276|      33| 1501050|
|  4291007|      19| 1302450|
|219517861|      49| 1199104|
|158969505|     209| 1126872|
|  5162192|      13| 1102500|
| 16437254|      19| 1066740|
|  9293730|      32| 1043670|
| 50760546|      28| 1002505|
| 51548122|      50|  986910|
|120762452|      83|  986280|
| 23772724|      43|  974340|
|   116382|       7|  969570|
+---------+--------+--------+
only showing top 20 rows



### Porcentaje de cada tipo de Airbnb

In [20]:
#agrupamos por tipo de habitación con groupBy
#con withColumn creamos la columna percent que primero sera la división de N_room_type entre las suma del total de la columna
# y despues el porcentaje multiplicado por 100
room_type = listingsDF.groupBy("room_type").agg(F.count("id").alias("N_room_type"))\
                      .withColumn('percent', F.col('N_room_type')/F.sum('N_room_type').over(Window.partitionBy()))\
                      .withColumn('percent', F.col('percent')*100)

room_type.show()
#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#room_type.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/roomtype_per")

+---------------+-----------+------------------+
|      room_type|N_room_type|           percent|
+---------------+-----------+------------------+
|    Shared room|        560| 1.476403901924598|
|   Private room|      16926| 44.62430793567097|
|Entire home/apt|      20271| 53.44318481413129|
|     Hotel room|        173|0.4561033482731347|
+---------------+-----------+------------------+



### Porcentaje de cada tipo de crimen según su gravedad

In [21]:
#agrupamos por tipo de crimen con groupBy
#con withColumn creamos la columna percent que primero sera la división de N_crime_type entre las suma del total de la columna
# y despues el porcentaje multiplicado por 100
crime_type_per= loc_crime.groupBy("LAW_CAT_CD").agg(F.count("CMPLNT_NUM").alias("N_crime_type"))\
                      .withColumn('percent', F.col('N_crime_type')/F.sum('N_crime_type').over(Window.partitionBy()))\
                      .withColumn('percent', F.col('percent')*100)

crime_type_per.show()
#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#crime_type_per.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/crime_type_per")

+-----------+------------+------------------+
| LAW_CAT_CD|N_crime_type|           percent|
+-----------+------------+------------------+
|  VIOLATION|       34639|16.532787315587758|
|     FELONY|       69086| 32.97393528926054|
|MISDEMEANOR|      105792|  50.4932773951517|
+-----------+------------+------------------+



### Porcentaje de cada tipo de evento al aire libre

In [22]:
#agrupamos por tipo de evento con groupBy y contamos los registros con agg
#con withColumn creamos la columna percent que primero sera la división de N_event_type entre las suma del total de la columna
# y despues el porcentaje multiplicado por 100
event_type_per = eventsDF_YMD.groupBy("Event Type").agg(F.count("*").alias("N_event_type"))\
                      .withColumn('percent', F.col('N_event_type')/F.sum('N_event_type').over(Window.partitionBy()))\
                      .withColumn('percent', F.col('percent')*100)

event_type_per.show()
#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#event_type_per.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/event_type_per")

+--------------------+------------+------------------+
|          Event Type|N_event_type|           percent|
+--------------------+------------+------------------+
|Community Based E...|        3482| 62.97703020437693|
|Tournament/Compet...|          80|1.4469162597214686|
|Agency Produced E...|        1084|  19.6057153192259|
|         Local Event|         523| 9.459215047929101|
|          Open House|         360| 6.511123168746609|
+--------------------+------------+------------------+



### Localización Airbnb

In [23]:
# Seleccionamos las columnas que nos interesan con select
loc_listing = listingsDF.select("id", "latitude", "longitude", "neighbourhood_group","room_type")
loc_listing.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#loc_listing.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/loc_listing")

+-------+--------+---------+-------------------+---------------+
|     id|latitude|longitude|neighbourhood_group|      room_type|
+-------+--------+---------+-------------------+---------------+
|   9657|40.72712|-73.98598|          Manhattan|Entire home/apt|
| 223930|40.67319|-73.96804|           Brooklyn|Entire home/apt|
| 311356|40.70167|-73.90927|             Queens|   Private room|
| 476983| 40.8239|-73.94242|          Manhattan|   Private room|
|1294721|40.71265|-73.96578|           Brooklyn|Entire home/apt|
|1813829|40.82486|-73.94378|          Manhattan|   Private room|
|1901154|40.82643|-73.94208|          Manhattan|   Private room|
|2055559|40.73673|-73.95655|           Brooklyn|    Shared room|
|2093304|40.69304|-73.97148|           Brooklyn|Entire home/apt|
|2231814|40.70986|-73.94186|           Brooklyn|Entire home/apt|
|2551299|40.68514|-73.99464|           Brooklyn|Entire home/apt|
|2708371|40.72783|-73.94175|           Brooklyn|Entire home/apt|
|2977232|40.79366|-73.945

### Localización crimen

In [24]:
# Seleccionamos las columnas que nos interesan con select
loc_crime_last6month = loc_crime.select("CMPLNT_NUM","Latitude","Longitude","LAW_CAT_CD")
loc_crime_last6month.show()

#Guardamos la tabla en el storage con la función write
#hacemos que se guarde en un solo documento con repation(1)
#loc_crime_last6month.repartition(1).write.option("header",True).csv("gs://bucket-tfm-ucm/ETL/loc_crime_last6month")

+----------+------------------+------------------+-----------+
|CMPLNT_NUM|          Latitude|         Longitude| LAW_CAT_CD|
+----------+------------------+------------------+-----------+
| 808857971| 40.81573205300003|-73.94542041099999|     FELONY|
| 990133707| 40.65132343000005|-73.95696100099997|     FELONY|
| 420348781|40.877554428000046|-73.87293947099994|     FELONY|
| 462496511| 40.83778161800007|-73.91945797099999|     FELONY|
| 921351410|40.799466801000044|-73.95153053599995|     FELONY|
| 138981745| 40.67970040800003|-73.77604736799998|     FELONY|
| 520396698|40.654879667000046|-73.96190341499994|     FELONY|
| 437035202| 40.64807745300004|-73.89783444799998|     FELONY|
| 857699799| 40.64472613100002|-74.07748315899995|     FELONY|
| 858151258| 40.83530661300006|-73.91290193999998|MISDEMEANOR|
| 192899882|40.652262403000066|-73.76050942399996|     FELONY|
| 542360560|40.738403706000035|-74.00386257599996|     FELONY|
| 132334653| 40.77429164100005|     -73.957296737|     