In [None]:
%pip install requests


Python interpreter will be restarted.
Python interpreter will be restarted.


In [None]:
import requests
from pyspark.sql import SparkSession
from io import BytesIO
from pyspark.sql import DataFrame
import pandas as pd

# Configurar la URL base para la API
base_url = "https://open.canada.ca/data/api/action/"

# Lista de datasets a descargar (nombre y ID)
datasets_to_download = [
    {"title": "Crime Statistics - Incidents and rates for selected offences", "id": "47466568-c38d-8bb9-26d9-331079dad727"}
]

def download_dataset_to_spark(dataset_id, title):
    endpoint = f"package_show?id={dataset_id}"
    response = requests.get(f"{base_url}{endpoint}")
    
    if response.status_code == 200:
        dataset_info = response.json()
        for resource in dataset_info['result']['resources']:
            if resource['format'].lower() == 'csv':
                csv_url = resource['url']
                csv_response = requests.get(csv_url)
                
                # Leer el CSV directamente en un DataFrame de pandas desde la respuesta
                csv_content = BytesIO(csv_response.content)
                df_pandas = pd.read_csv(csv_content, sep=';')  # Ajusta el separador si es necesario
                
                # Convertir el DataFrame de pandas a un DataFrame de Spark
                df_spark = spark.createDataFrame(df_pandas)
                
                # Mostrar algunos datos para verificar
                df_spark.show()
                
                # Guardar el DataFrame de Spark en DBFS (opcional)
                dbfs_path = f"/files/tables/{title.replace(' ', '_').lower()}.csv"
                df_spark.write.csv(dbfs_path, header=True)
                
                print(f"{title} descargado y cargado en Spark DataFrame")
                return df_spark
    else:
        print(f"Error al obtener el dataset {title}: {response.status_code}")

# Descargar y cargar el dataset en Databricks usando PySpark
for dataset in datasets_to_download:
    df_spark = download_dataset_to_spark(dataset['id'], dataset['title'])




+-----------------------------------------+
|GEOGRAPHY,YEAR,VIOLATIONS,INCIDENTS,RATES|
+-----------------------------------------+
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Amherst, N

In [None]:
df_spark.show()

+-----------------------------------------+
|GEOGRAPHY,YEAR,VIOLATIONS,INCIDENTS,RATES|
+-----------------------------------------+
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Nova Scotia [12],...|
|                     Amherst, N

In [None]:
from pyspark.sql.functions import split, col, expr, trim, concat_ws
from pyspark.sql.types import StructType, StructField, StringType

# Crear esquema y DataFrame inicial
schema = StructType([StructField("combined", StringType(), True)])
df = spark.createDataFrame(df_spark.rdd, schema)

# Mostrar los primeros registros
df.show(5, truncate=False)

# Dividir usando expresión regular, capturando comas fuera de quotes
df = df.withColumn("split_col", split(trim(col("combined")), r',(?=(?:[^"]*"[^"]*")*[^"]*$)'))

# Añadir columna para contar el número de elementos después de la separación
df = df.withColumn("num_cols", expr("size(split_col)"))

# Mostrar filas problemáticas que no tienen 5 columnas
df_invalid = df.filter(col("num_cols") != 5)
df_invalid.show(truncate=False)

# Filtrar las filas válidas que tienen exactamente 5 columnas
df_valid = df.filter(col("num_cols") == 5)

# Asignar columnas correctas para las filas válidas
df_valid = df_valid.withColumn("Geography", trim(col("split_col").getItem(0))) \
                   .withColumn("Year", trim(col("split_col").getItem(1))) \
                   .withColumn("Violations", trim(col("split_col").getItem(2))) \
                   .withColumn("Incidents", trim(col("split_col").getItem(3))) \
                   .withColumn("Rates", trim(col("split_col").getItem(4))) \
                   .drop("split_col", "combined", "num_cols")

# Mostrar las filas válidas
df_valid.show(5, truncate=False)

# Ajustar filas problemáticas para que tengan exactamente 5 elementos
# Aquí unimos los primeros elementos que corresponden a "Geography" y los demás campos
df_fixed = df_invalid.withColumn("Geography", concat_ws(", ", col("split_col").getItem(0), col("split_col").getItem(1), col("split_col").getItem(2))) \
                     .withColumn("Year", trim(col("split_col").getItem(3))) \
                     .withColumn("Violations", trim(col("split_col").getItem(4))) \
                     .withColumn("Incidents", trim(col("split_col").getItem(5))) \
                     .withColumn("Rates", trim(col("split_col").getItem(6))) \
                     .drop("split_col", "num_cols", "combined")

# Mostrar las filas ajustadas
df_fixed.show(5, truncate=False)

# Unir DataFrames válidos y corregidos manualmente
df_final = df_valid.unionByName(df_fixed)

# Verificar el esquema final y mostrar resultados
df_final.printSchema()
df_final.show(50, truncate=False)

# Mostrar valores únicos de Geography para confirmar que están completos y correctos
df_final.select("Geography").distinct().show(50, truncate=False)
df_final.show(50)

+-------------------------------------------------------------------------------------+
|combined                                                                             |
+-------------------------------------------------------------------------------------+
|Nova Scotia [12],2010,Homicide [110],21,2.23                                         |
|Nova Scotia [12],2010,Attempted murder [1210],24,2.55                                |
|Nova Scotia [12],2010,"Sexual assault, level 3, aggravated [1310]",6,0.64            |
|Nova Scotia [12],2010,"Sexual assault, level 2, weapon or bodily harm [1320]",13,1.38|
|Nova Scotia [12],2010,"Sexual assault, level 1 [1330]",662,70.27                     |
+-------------------------------------------------------------------------------------+
only showing top 5 rows

+-------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------

In [None]:
# Filtrar filas donde Year sea un número válido (año) y Violations sea texto
from pyspark.sql.functions import col, regexp_replace

# Filtrar registros válidos para Year
df_clean = df_final.filter(col("Year").rlike("^[0-9]{4}$"))

# Eliminar espacios y comillas de la columna Year y asegurarse de que Year es numérica
df_clean = df_clean.withColumn("Year", regexp_replace(col("Year"), '"', '').cast("int"))

df_clean.show()

+----------------+----+--------------------+---------+------+
|       Geography|Year|          Violations|Incidents| Rates|
+----------------+----+--------------------+---------+------+
|Nova Scotia [12]|2010|      Homicide [110]|       21|  2.23|
|Nova Scotia [12]|2010|Attempted murder ...|       24|  2.55|
|Nova Scotia [12]|2010|"Sexual assault, ...|        6|  0.64|
|Nova Scotia [12]|2010|"Sexual assault, ...|       13|  1.38|
|Nova Scotia [12]|2010|"Sexual assault, ...|      662| 70.27|
|Nova Scotia [12]|2010|Total sexual viol...|       93|  9.87|
|Nova Scotia [12]|2010|"Assault, level 3...|       85|  9.02|
|Nova Scotia [12]|2010|"Assault, level 2...|     1212|128.65|
|Nova Scotia [12]|2010|"Assault, level 1...|     5780|613.54|
|Nova Scotia [12]|2010| Total robbery [160]|      490| 52.01|
|Nova Scotia [12]|2010|Total breaking an...|     5269| 559.3|
|Nova Scotia [12]|2010|Total theft of mo...|     1281|135.98|
|Nova Scotia [12]|2010|Total impaired dr...|     3431| 364.2|
|Nova Sc

In [None]:
# Filtrar registros válidos para Violations (no deben ser solo números)
df_clean = df_clean.filter(~col("Violations").rlike("^[0-9]+$"))

# Asegurarse de que la columna Violations no tenga valores erróneos como años
df_clean = df_clean.withColumn("Violations", regexp_replace(col("Violations"), '"', '').cast("string"))

df_clean.show()


+----------------+----+--------------------+---------+------+
|       Geography|Year|          Violations|Incidents| Rates|
+----------------+----+--------------------+---------+------+
|Nova Scotia [12]|2010|      Homicide [110]|       21|  2.23|
|Nova Scotia [12]|2010|Attempted murder ...|       24|  2.55|
|Nova Scotia [12]|2010|Sexual assault, l...|        6|  0.64|
|Nova Scotia [12]|2010|Sexual assault, l...|       13|  1.38|
|Nova Scotia [12]|2010|Sexual assault, l...|      662| 70.27|
|Nova Scotia [12]|2010|Total sexual viol...|       93|  9.87|
|Nova Scotia [12]|2010|Assault, level 3,...|       85|  9.02|
|Nova Scotia [12]|2010|Assault, level 2,...|     1212|128.65|
|Nova Scotia [12]|2010|Assault, level 1 ...|     5780|613.54|
|Nova Scotia [12]|2010| Total robbery [160]|      490| 52.01|
|Nova Scotia [12]|2010|Total breaking an...|     5269| 559.3|
|Nova Scotia [12]|2010|Total theft of mo...|     1281|135.98|
|Nova Scotia [12]|2010|Total impaired dr...|     3431| 364.2|
|Nova Sc

In [None]:
# Verificar los valores únicos en Year y Violations después de la limpieza
df_clean.select("Year").distinct().orderBy("Year").show()
df_clean.select("Violations").distinct().orderBy("Violations").show()


+----+
|Year|
+----+
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
+----+

+--------------------+
|          Violations|
+--------------------+
|Assault, level 1 ...|
|Assault, level 2,...|
|Assault, level 3,...|
|Attempted murder ...|
|      Homicide [110]|
|Possession, cocai...|
|Sexual assault, l...|
|Sexual assault, l...|
|Sexual assault, l...|
|Total Drug violat...|
|Total breaking an...|
|Total cannabis, t...|
|Total cocaine, tr...|
|Total impaired dr...|
|Total other Contr...|
| Total robbery [160]|
|Total sexual viol...|
|Total theft of mo...|
|Total, possession...|
+--------------------+



In [None]:
# Mostrar los valores únicos de Geography para verificar que están completos
df_final.select("Geography").distinct().show(50, truncate=False)

+----------------------------------------------------------------+
|Geography                                                       |
+----------------------------------------------------------------+
|Nova Scotia [12]                                                |
|Lunenburg County,  Nova Scotia,  Royal Canadian Mounted Police  |
|Chester,  Nova Scotia,  Royal Canadian Mounted Police           |
|Halifax County,  Nova Scotia,  Royal Canadian Mounted Police    |
|Sheet Harbour,  Nova Scotia,  Royal Canadian Mounted Police     |
|Digby,  Nova Scotia,  municipal [12012]                         |
|Halifax Metropolitan Area,  Nova Scotia,  municipal [12099]     |
|Kentville,  Nova Scotia,  municipal [12023]                     |
|Yarmouth,  Nova Scotia,  municipal [12057]                      |
|Lunenburg,  Nova Scotia,  Royal Canadian Mounted Police         |
|Wolfville,  Nova Scotia,  municipal [12056]                     |
|Meteghan (Clare),  Nova Scotia,  Royal Canadian Mounted Polic

In [None]:
# Asumiendo que los nombres de las columnas en la fila incorrecta están en mayúsculas
df_final = df_final.filter(df_final.Geography != "Geography")

# Verificar que la fila fue eliminada correctamente
df_final.show(5, truncate=False)

+----------------+----+-------------------------------------------------------+---------+-----+
|Geography       |Year|Violations                                             |Incidents|Rates|
+----------------+----+-------------------------------------------------------+---------+-----+
|Nova Scotia [12]|2010|Homicide [110]                                         |21       |2.23 |
|Nova Scotia [12]|2010|Attempted murder [1210]                                |24       |2.55 |
|Nova Scotia [12]|2010|"Sexual assault, level 3, aggravated [1310]"           |6        |0.64 |
|Nova Scotia [12]|2010|"Sexual assault, level 2, weapon or bodily harm [1320]"|13       |1.38 |
|Nova Scotia [12]|2010|"Sexual assault, level 1 [1330]"                       |662      |70.27|
+----------------+----+-------------------------------------------------------+---------+-----+
only showing top 5 rows



file_path = 'dbfs:/FileStore/tables/crime_statistics_-_incidents_and_rates_for_selected_offences.csv/'

In [None]:
# Mostrar los valores únicos en la columna 'Incidents' para identificar posibles problemas
df_final.select("Incidents").distinct().show(100, truncate=False)


+---------+
|Incidents|
+---------+
|1412     |
|727      |
|686      |
|8        |
|22       |
|3095     |
|85       |
|4769     |
|214      |
|221      |
|126      |
|228      |
|6        |
|60       |
|220      |
|93       |
|3273     |
|3431     |
|231      |
|108      |
|5780     |
|679      |
|1212     |
|490      |
|24       |
|1281     |
|1299     |
|13       |
|191      |
|21       |
|662      |
|91       |
|3136     |
|469      |
|5269     |
|5567     |
|69       |
|34       |
|443      |
|4829     |
|4608     |
|609      |
|5        |
|61       |
|659      |
|17       |
|131      |
|640      |
|3197     |
|136      |
|2899     |
|292      |
|1326     |
|193      |
|122      |
|2715     |
|4091     |
|1390     |
|217      |
|3071     |
|9        |
|624      |
|5284     |
|222      |
|4        |
|249      |
|1118     |
|1083     |
|72       |
|337      |
|216      |
|574      |
|138      |
|3        |
|133      |
|2862     |
|610      |
|1082     |
|3689     |
|280      |
|247

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

# Definir un esquema inicial con todas las columnas como StringType
schema = StructType([
    StructField("Geography", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Violations", StringType(), True),
    StructField("Incidents", StringType(), True),
    StructField("Rates", StringType(), True)
])

# Cargar el DataFrame con el esquema StringType para evitar problemas de conversión inmediata
df_final = spark.createDataFrame(df_final.rdd, schema)

# Mostrar las primeras filas para verificar el contenido
df_final.show(10, truncate=False)
df_final.printSchema()

+----------------+----+-------------------------------------------------------+---------+------+
|Geography       |Year|Violations                                             |Incidents|Rates |
+----------------+----+-------------------------------------------------------+---------+------+
|Nova Scotia [12]|2010|Homicide [110]                                         |21       |2.23  |
|Nova Scotia [12]|2010|Attempted murder [1210]                                |24       |2.55  |
|Nova Scotia [12]|2010|"Sexual assault, level 3, aggravated [1310]"           |6        |0.64  |
|Nova Scotia [12]|2010|"Sexual assault, level 2, weapon or bodily harm [1320]"|13       |1.38  |
|Nova Scotia [12]|2010|"Sexual assault, level 1 [1330]"                       |662      |70.27 |
|Nova Scotia [12]|2010|Total sexual violations against children [130]         |93       |9.87  |
|Nova Scotia [12]|2010|"Assault, level 3, aggravated [1410]"                  |85       |9.02  |
|Nova Scotia [12]|2010|"Assaul

In [None]:
from pyspark.sql.types import IntegerType, DoubleType

# Convertir las columnas con tipos de datos correctos
df_clean = df_clean.withColumn("Incidents", col("Incidents").cast(IntegerType())) \
                   .withColumn("Rates", col("Rates").cast(DoubleType()))

# Verificar nuevamente el esquema y los datos
df_clean.printSchema()
df_clean.show(10, truncate=False)

root
 |-- Geography: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Incidents: integer (nullable = true)
 |-- Rates: double (nullable = true)

+----------------+----+-----------------------------------------------------+---------+------+
|Geography       |Year|Violations                                           |Incidents|Rates |
+----------------+----+-----------------------------------------------------+---------+------+
|Nova Scotia [12]|2010|Homicide [110]                                       |21       |2.23  |
|Nova Scotia [12]|2010|Attempted murder [1210]                              |24       |2.55  |
|Nova Scotia [12]|2010|Sexual assault, level 3, aggravated [1310]           |6        |0.64  |
|Nova Scotia [12]|2010|Sexual assault, level 2, weapon or bodily harm [1320]|13       |1.38  |
|Nova Scotia [12]|2010|Sexual assault, level 1 [1330]                       |662      |70.27 |
|Nova Scotia [12]|2010|Total sexual

In [None]:
# Crear la vista limpia para usar con SQL
df_clean.createOrReplaceTempView("clean_crime_data")

# Consulta SQL para contar incidentes por año y región
spark.sql("""
    SELECT Year, Geography, SUM(Incidents) as Total_Incidents
    FROM clean_crime_data
    GROUP BY Year, Geography
    ORDER BY Year, Geography
""").show(100, truncate=False)


+----+---------------------------------------------------------------------------------------+---------------+
|Year|Geography                                                                              |Total_Incidents|
+----+---------------------------------------------------------------------------------------+---------------+
|2010|Amherst,  Nova Scotia,  municipal [12001]                                              |397            |
|2010|Annapolis Royal,  Nova Scotia,  municipal [12002]                                      |18             |
|2010|Berwick,  Nova Scotia,  municipal [12004]                                              |null           |
|2010|Bridgewater,  Nova Scotia,  municipal [12006]                                          |255            |
|2010|Cape Breton Region,  Nova Scotia,  municipal [12018]                                   |1885           |
|2010|Clark's Harbour,  Nova Scotia,  municipal [12008]                                      |null           |
|

In [None]:
# Consulta SQL para contar la frecuencia de violaciones
spark.sql("""
    SELECT Violations, COUNT(*) as Frequency
    FROM clean_crime_data
    GROUP BY Violations
    ORDER BY Frequency DESC
""").show(10, truncate=False)


+------------------------------------------------------------------------+---------+
|Violations                                                              |Frequency|
+------------------------------------------------------------------------+---------+
|Assault, level 2, weapon or bodily harm [1420]                          |377      |
|Sexual assault, level 2, weapon or bodily harm [1320]                   |377      |
|Assault, level 1 [1430]                                                 |377      |
|Sexual assault, level 1 [1330]                                          |377      |
|Total impaired driving [910]                                            |377      |
|Sexual assault, level 3, aggravated [1310]                              |377      |
|Total breaking and entering [210]                                       |377      |
|Possession, cocaine [4120]                                              |377      |
|Total, possession, other Controlled Drugs and Substances Act dru

In [None]:
# Spark SQL
spark.sql("""
    SELECT Violations, AVG(Rates) as Avg_Rate
    FROM clean_crime_data
    GROUP BY Violations
    ORDER BY Avg_Rate DESC
""").show(10, truncate=False)


+--------------------------------------------------------------------------------+------------------+
|Violations                                                                      |Avg_Rate          |
+--------------------------------------------------------------------------------+------------------+
|Assault, level 1 [1430]                                                         |738.5789285714286 |
|Total breaking and entering [210]                                               |343.8455395683454 |
|Total impaired driving [910]                                                    |316.63115107913666|
|Total Drug violations [401]                                                     |249.00297872340425|
|Assault, level 2, weapon or bodily harm [1420]                                  |143.5349645390071 |
|Total theft of motor vehicle [220]                                              |109.22042253521127|
|Sexual assault, level 1 [1330]                                                  |

In [None]:
# Spark SQL
spark.sql("""
    SELECT Geography, SUM(Incidents) as Total_Incidents
    FROM clean_crime_data
    GROUP BY Geography
    ORDER BY Total_Incidents DESC
""").show(10, truncate=False)

+-----------------------------------------------------------+---------------+
|Geography                                                  |Total_Incidents|
+-----------------------------------------------------------+---------------+
|Nova Scotia [12]                                           |242646         |
|Halifax Metropolitan Area,  Nova Scotia,  municipal [12099]|95346          |
|Cape Breton Region,  Nova Scotia,  municipal [12018]       |19004          |
|New Glasgow Region,  Nova Scotia,  municipal [12058]       |4512           |
|Truro,  Nova Scotia,  municipal [12052]                    |4209           |
|Amherst,  Nova Scotia,  municipal [12001]                  |3970           |
|Bridgewater,  Nova Scotia,  municipal [12006]              |2536           |
|Kentville,  Nova Scotia,  municipal [12023]                |2197           |
|Stellarton,  Nova Scotia,  municipal [12044]               |1078           |
|Westville,  Nova Scotia,  municipal [12054]                |741