In [0]:
import os

In [0]:
datalake_name = dbutils.secrets.get(scope="nyc-crimes-project", key="storage-account-name")
datalake_key = dbutils.secrets.get(scope="nyc-crimes-project", key="storage-account-key")
container_name = dbutils.secrets.get(scope="nyc-crimes-project", key="container-name")

In [0]:
if not any(mount.mountPoint == "/mnt/ouro" for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{datalake_name}.blob.core.windows.net/ouro",
        mount_point="/mnt/ouro",
        extra_configs={
            f"fs.azure.account.key.{datalake_name}.blob.core.windows.net": datalake_key
        }
    )
else:
    print("A montagem '/mnt/ouro' já existe.")

In [0]:
df = spark.read.parquet("/mnt/ouro/")

In [0]:
from pyspark.sql.functions import col, hour, dayofweek, year, month, date_format
from pyspark.sql.functions import col, count

df = df.withColumn("date", col("start_date"))

df_aggregated = df.groupBy("latitude", "longitude", "date") \
                  .agg(count("complaint_id").alias("crime_count"))

df = df.join(df_aggregated, on=["latitude", "longitude", "date"], how="left")

df = df.withColumn("start_hour", hour(col("start_time"))) \
       .withColumn("day_of_week", dayofweek(col("start_date"))) \
       .withColumn("month", month(col("start_date"))) \
       .withColumn("year", year(col("start_date")))

df = df.withColumn("latitude", col("latitude").cast("float")) \
       .withColumn("longitude", col("longitude").cast("float"))

df = df.select("borough_name", "offense_description", "premises_type_description", 
               "victim_race", "suspect_race", "latitude", "longitude", 
               "start_hour", "day_of_week", "month", "year", "crime_count")


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid="keep") 
            for col in ["borough_name", "offense_description", "premises_type_description", "victim_race", "suspect_race"]]

encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=[col + "_encoded" for col in ["borough_name", "offense_description", "premises_type_description", "victim_race", "suspect_race"]])

pipeline = Pipeline(stages=indexers + [encoder, assembler, rf])


In [0]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="crime_count")

pipeline = Pipeline(stages=indexers + [encoder, assembler, rf])

train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

model = pipeline.fit(train_data)


In [0]:
predictions.select("latitude", "longitude", "crime_count").show()