## Dataset Exploration:


In [48]:
#!pip install duckdb
#!pip3 install pyspark
#!sudo -u postgres psql -U postgres -c "ALTER USER postgres PASSWORD 'postgres';"
#!sudo -u postgres psql -U postgres -c 'DROP DATABASE IF EXISTS pyspark_postgres;'
#!sudo -u postgres psql -U postgres -c 'CREATE DATABASE pyspark_postgres;'
#!wget -O "postgresql.jar" "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar"
#!wget -O "duckdb.jar" "https://repo1.maven.org/maven2/org/duckdb/duckdb_jdbc/0.10.1/duckdb_jdbc-0.10.1.jar"


In [12]:
## Imports
import duckdb
import pyspark
from pyspark.sql import SparkSession
from pprint import pprint
from pyspark.sql.functions import isnan, when, count

## Connection to formatted database
jdbc_url = 'jdbc:duckdb:./../data/formatted_zone/barcelona.db'
driver = "org.duckdb.DuckDBDriver"

# SparkSession inicialitzation
spark = SparkSession.builder\
    .config("spark.jars", "duckdb.jar") \
    .appName("DataExploration") \
    .getOrCreate()

#### Criminal Dataset


In [18]:
df = spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("driver", driver) \
  .option("query", "SELECT * FROM df_criminal_dataset") \
  .load()

df.show(2)

+----+-------+--------+--------------------+-----------------------+---------+----------+--------------------+-----------------------+---------------+------------+------------------------+-------------------------+--------+-------------------------+---------------+
| any|n_m_mes| nom_mes|    regi_policial_rp|rea_b_sica_policial_abp|prov_ncia|   comarca|            municipi|tipus_de_lloc_dels_fets|canal_dels_fets|tipus_de_fet|t_tol_del_fet_codi_penal|tipus_de_fet_codi_penal_o|mbit_fet|nombre_fets_o_infraccions|nombre_v_ctimes|
+----+-------+--------+--------------------+-----------------------+---------+----------+--------------------+-----------------------+---------------+------------+------------------------+-------------------------+--------+-------------------------+---------------+
|2021|     12|Desembre|RP  Metropolitana...|           ABP Badalona|Barcelona|Barcelonès|            Badalona|     Via pública urbana|     Presencial|    Delictes|          De les lesions|              

In [45]:
# Correct the column names
df = df.withColumnRenamed("n_m_mes", "num_mes") \
       .withColumnRenamed("regi_policial_rp", "regio_policial") \
       .withColumnRenamed("rea_b_sica_policial_abp", "area_basica_policial") \
       .withColumnRenamed("t_tol_del_fet_codi_penal", "titol_del_fet_codi_penal") \
       .withColumnRenamed("tipus_de_fet_codi_penal_o", "tipus_de_fet_codi_penal") \
       .withColumnRenamed("mbit_fet", "ambit_fet") \
       .withColumnRenamed("prov_ncia", "provincia") \
       .withColumnRenamed("nombre_v_ctimes", "nombre_victimes")

In [46]:
df.printSchema()

root
 |-- any: decimal(20,0) (nullable = true)
 |-- num_mes: decimal(20,0) (nullable = true)
 |-- nom_mes: string (nullable = true)
 |-- regio_policial: string (nullable = true)
 |-- area_basica_policial: string (nullable = true)
 |-- provincia: string (nullable = true)
 |-- comarca: string (nullable = true)
 |-- municipi: string (nullable = true)
 |-- tipus_de_lloc_dels_fets: string (nullable = true)
 |-- canal_dels_fets: string (nullable = true)
 |-- tipus_de_fet: string (nullable = true)
 |-- titol_del_fet_codi_penal: string (nullable = true)
 |-- tipus_de_fet_codi_penal: string (nullable = true)
 |-- ambit_fet: string (nullable = true)
 |-- nombre_fets_o_infraccions: decimal(20,0) (nullable = true)
 |-- nombre_victimes: double (nullable = true)



In [47]:
df.describe().show()

+-------+------------------+------------------+--------+--------------------+--------------------+---------+--------------+--------------------+-----------------------+---------------+--------------------+------------------------+-----------------------+--------------------+-------------------------+------------------+
|summary|               any|           num_mes| nom_mes|      regio_policial|area_basica_policial|provincia|       comarca|            municipi|tipus_de_lloc_dels_fets|canal_dels_fets|        tipus_de_fet|titol_del_fet_codi_penal|tipus_de_fet_codi_penal|           ambit_fet|nombre_fets_o_infraccions|   nombre_victimes|
+-------+------------------+------------------+--------+--------------------+--------------------+---------+--------------+--------------------+-----------------------+---------------+--------------------+------------------------+-----------------------+--------------------+-------------------------+------------------+
|  count|              2289|         

In [48]:
# Counting the NaN values
nan_counts = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
nan_counts.show()

+---+-------+-------+--------------+--------------------+---------+-------+--------+-----------------------+---------------+------------+------------------------+-----------------------+---------+-------------------------+---------------+
|any|num_mes|nom_mes|regio_policial|area_basica_policial|provincia|comarca|municipi|tipus_de_lloc_dels_fets|canal_dels_fets|tipus_de_fet|titol_del_fet_codi_penal|tipus_de_fet_codi_penal|ambit_fet|nombre_fets_o_infraccions|nombre_victimes|
+---+-------+-------+--------------+--------------------+---------+-------+--------+-----------------------+---------------+------------+------------------------+-----------------------+---------+-------------------------+---------------+
|  0|      0|      0|             0|                   0|        0|      0|       0|                      0|              0|           0|                       0|                      0|        0|                        0|              0|
+---+-------+-------+--------------+--------

In [54]:
# Filter by region in Barcelona: [ABP Eixample, ABP Sants-Montjuïc, ABP Les Corts, ABP Barcelona]
df_filtered = df.filter(df["area_basica_policial"].isin("ABP Eixample", "ABP Sants-Montjuïc", "ABP Les Corts", "ABP Barcelona"))
df_filtered.show(4)

+----+-------+-------+--------------------+--------------------+---------+----------+---------+-----------------------+---------------+------------+------------------------+-----------------------+-----------+-------------------------+---------------+
| any|num_mes|nom_mes|      regio_policial|area_basica_policial|provincia|   comarca| municipi|tipus_de_lloc_dels_fets|canal_dels_fets|tipus_de_fet|titol_del_fet_codi_penal|tipus_de_fet_codi_penal|  ambit_fet|nombre_fets_o_infraccions|nombre_victimes|
+----+-------+-------+--------------------+--------------------+---------+----------+---------+-----------------------+---------------+------------+------------------------+-----------------------+-----------+-------------------------+---------------+
|2020|      1|  Gener|RP Metropolitana ...|        ABP Eixample|Barcelona|Barcelonès|Barcelona|      Zona d'oci/lúdica|     Presencial|    Delictes|    Delictes contra e...|   Robatori amb viol...|LGTBI-fòbia|                        1|         

## Airbnb Dataset

In [58]:
df_aribnb = spark.read \
  .format("jdbc") \
  .option("url", jdbc_url) \
  .option("driver", driver) \
  .option("query", "SELECT * FROM df_airbnb_listings") \
  .load()

df_aribnb.show(4)

+--------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------------+--------+--------------------+------------+----------+--------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+-----------------+----------------------+----------------------------+---------+---------+-------+---------+----------------+------------+-------+------------------+------------------+-------------+---------------+------------+---------+--------+----+--------+--------------------+-----------+-----+------------+-------------+-----

In [59]:
df_aribnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- space: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- access: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- medium_url: string (nullable = true)
 |-- picture_url: long (nullable = true)
 |-- xl_picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable =

In [None]:
df_aribnb.describe().show()

In [60]:
nan_counts = df_aribnb.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
nan_counts.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `geolocation`.`lon` cannot be resolved. Did you mean one of the following? [`geolocation.lon`, `geolocation.lat`, `host_location`, `description`, `interaction`].;
'Aggregate [count(CASE WHEN isnan(cast(id#9365 as double)) THEN id END) AS id#9913L, count(CASE WHEN isnan(cast(listing_url#9366 as double)) THEN listing_url END) AS listing_url#9915L, count(CASE WHEN isnan(cast(scrape_id#9367 as double)) THEN scrape_id END) AS scrape_id#9917L, count(CASE WHEN isnan(cast(last_scraped#9368 as double)) THEN last_scraped END) AS last_scraped#9919L, count(CASE WHEN isnan(cast(name#9369 as double)) THEN name END) AS name#9921L, count(CASE WHEN isnan(cast(summary#9370 as double)) THEN summary END) AS summary#9923L, count(CASE WHEN isnan(cast(space#9371 as double)) THEN space END) AS space#9925L, count(CASE WHEN isnan(cast(description#9372 as double)) THEN description END) AS description#9927L, count(CASE WHEN isnan(cast(experiences_offered#9373 as double)) THEN experiences_offered END) AS experiences_offered#9929L, count(CASE WHEN isnan(cast(neighborhood_overview#9374 as double)) THEN neighborhood_overview END) AS neighborhood_overview#9931L, count(CASE WHEN isnan(cast(notes#9375 as double)) THEN notes END) AS notes#9933L, count(CASE WHEN isnan(cast(transit#9376 as double)) THEN transit END) AS transit#9935L, count(CASE WHEN isnan(cast(access#9377 as double)) THEN access END) AS access#9937L, count(CASE WHEN isnan(cast(interaction#9378 as double)) THEN interaction END) AS interaction#9939L, count(CASE WHEN isnan(cast(house_rules#9379 as double)) THEN house_rules END) AS house_rules#9941L, count(CASE WHEN isnan(cast(thumbnail_url#9380 as double)) THEN thumbnail_url END) AS thumbnail_url#9943L, count(CASE WHEN isnan(cast(medium_url#9381 as double)) THEN medium_url END) AS medium_url#9945L, count(CASE WHEN isnan(cast(picture_url#9382L as double)) THEN picture_url END) AS picture_url#9947L, count(CASE WHEN isnan(cast(xl_picture_url#9383 as double)) THEN xl_picture_url END) AS xl_picture_url#9949L, count(CASE WHEN isnan(cast(host_id#9384 as double)) THEN host_id END) AS host_id#9951L, count(CASE WHEN isnan(cast(host_url#9385 as double)) THEN host_url END) AS host_url#9953L, count(CASE WHEN isnan(cast(host_name#9386 as double)) THEN host_name END) AS host_name#9955L, count(CASE WHEN isnan(cast(host_since#9387 as double)) THEN host_since END) AS host_since#9957L, count(CASE WHEN isnan(cast(host_location#9388 as double)) THEN host_location END) AS host_location#9959L, ... 67 more fields]
+- Relation [id#9365,listing_url#9366,scrape_id#9367,last_scraped#9368,name#9369,summary#9370,space#9371,description#9372,experiences_offered#9373,neighborhood_overview#9374,notes#9375,transit#9376,access#9377,interaction#9378,house_rules#9379,thumbnail_url#9380,medium_url#9381,picture_url#9382L,xl_picture_url#9383,host_id#9384,host_url#9385,host_name#9386,host_since#9387,host_location#9388,... 67 more fields] JDBCRelation((SELECT * FROM df_airbnb_listings) SPARK_GEN_SUBQ_207) [numPartitions=1]


24/04/17 20:39:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 410941 ms exceeds timeout 120000 ms
24/04/17 20:39:52 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/17 20:39:47 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
spark.stop()