In [29]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 16

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

In [30]:
spark

In [35]:
spark.sql("SHOW DATABASES").show()
spark.sql("USE team16_projectdb").show()
spark.sql("SHOW TABLES").show()
spark.sql("SELECT * FROM team16_projectdb.crimes").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             retake1|
|             root_db|
|                show|
|     team0_projectdb|
|    team11_projectdb|
|           team12_db|
|team12_hive_proje...|
|    team12_projectdb|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
| team21_projectdb_v2|
| team21_projectdb_v3|
+--------------------+
only showing top 20 rows

++
||
++
++

+----------------+----------------+-----------+
|       namespace|       tableName|isTemporary|
+----------------+----------------+-----------+
|team16_projectdb|          crimes|      false|
|team16_projectdb|crimes_optimized|      false|
|team16_projectdb|      q1_results|      false|
|team16_projectdb|      q2_results|      false|
|team16_projectdb|      q3_results|      false|
|team16_projectdb|      q

In [44]:
crimes_df = spark.table("team16_projectdb.crimes")
crimes_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: long (nullable = true)
 |-- block: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: boolean (nullable = true)
 |-- domestic: boolean (nullable = true)
 |-- beat: integer (nullable = true)
 |-- district: integer (nullable = true)
 |-- ward: double (nullable = true)
 |-- community_area: double (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- x_coordinate: double (nullable = true)
 |-- y_coordinate: double (nullable = true)



## Data preprocessing

In [45]:
crimes_df.head(5)

[Row(id=4786321, date=1072904460000, block='082XX S COLES AVE', primary_type='THEFT', location_description='RESIDENCE', arrest=False, domestic=False, beat=424, district=4, ward=7.0, community_area=46.0, fbi_code='06', x_coordinate=None, y_coordinate=None),
 Row(id=4676906, date=1046466000000, block='004XX W 42ND PL', primary_type='OTHER OFFENSE', location_description='RESIDENCE', arrest=False, domestic=True, beat=935, district=9, ward=11.0, community_area=61.0, fbi_code='26', x_coordinate=1173974.0, y_coordinate=1876757.0),
 Row(id=4789749, date=1087714800000, block='025XX N KIMBALL AVE', primary_type='OFFENSE INVOLVING CHILDREN', location_description='RESIDENCE', arrest=False, domestic=False, beat=1413, district=14, ward=35.0, community_area=22.0, fbi_code='20', x_coordinate=None, y_coordinate=None),
 Row(id=4789765, date=1104426000000, block='045XX W MONTANA ST', primary_type='THEFT', location_description='OTHER', arrest=False, domestic=False, beat=2521, district=25, ward=31.0, commu

In [None]:
crimes_df["

## Training model

In [47]:
pip install lightgbm

Defaulting to user installation because normal site-packages is not writeable
Collecting lightgbm
  Downloading lightgbm-4.3.0.tar.gz (1.7 MB)
     |████████████████████████████████| 1.7 MB 1.1 MB/s            
[?25h  Installing build dependencies ... [?25lerror
[31m  ERROR: Command errored out with exit status 1:
   command: /usr/bin/python3 /usr/local/lib/python3.6/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-m5xrywwa/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i https://pypi.org/simple -- 'scikit-build-core>=0.4.4'
       cwd: None
  Complete output (2 lines):
  ERROR: Could not find a version that satisfies the requirement scikit-build-core>=0.4.4 (from versions: none)
  ERROR: No matching distribution found for scikit-build-core>=0.4.4
  ----------------------------------------[0m
[?25h  Downloading lightgbm-4.2.0.tar.gz (1.7 MB)
     |████████████████████████████████| 1.7 MB 70.3 MB/s            ██████████▉

In [49]:
dataset = crimes_df

In [110]:
crimes_df = spark.table("team16_projectdb.crimes")
crimes_df

6170812

In [111]:
crimes_df

DataFrame[id: int, date: bigint, block: string, primary_type: string, location_description: string, arrest: boolean, domestic: boolean, beat: int, district: int, ward: double, community_area: double, fbi_code: string, x_coordinate: double, y_coordinate: double]

In [191]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Convert timestamp
crime_df = crime_df.withColumn("datetime", F.from_unixtime(F.col("date") / 1000))
crime_df = crime_df.withColumn("timestamp", F.col("datetime").cast("long"))

# 2. Create target: days until next crime in same district
w = Window.partitionBy("district").orderBy("datetime")
crime_df = crime_df.withColumn("next_datetime", F.lead("datetime").over(w)) \
                   .withColumn("days_until_next", 
                               (F.col("next_datetime").cast("long") - F.col("datetime").cast("long")) / (24*3600)) \
                   .filter(F.col("days_until_next").isNotNull())

# 3. Drop irrelevant columns
cols_to_drop = ["id", "date", "block", "fbi_code", "arrest", "domestic", "next_datetime"]
crime_df = crime_df.drop(*cols_to_drop)

# 4. Add useful features
crime_df = crime_df.withColumn("hour", F.hour("datetime")) \
                   .withColumn("day_of_week", F.dayofweek("datetime")) \
                   .withColumn("month", F.month("datetime"))

# 5. Drop missing values
crime_df = crime_df.dropna(subset=[
    "x_coordinate", "y_coordinate", "ward", "community_area", 
    "primary_type", "location_description", "district"
])

# 6. Encode categorical columns
cat_cols = ["primary_type", "location_description", "district"]
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "_idx").fit(crime_df)
    crime_df = indexer.transform(crime_df)

# 7. Assemble features
features = [
    "hour", "day_of_week", "month", "x_coordinate", "y_coordinate",
    "ward", "community_area"
] + [col + "_idx" for col in cat_cols]

assembler = VectorAssembler(inputCols=features, outputCol="features")
crime_df = assembler.transform(crime_df)

# 8. Time-based split (70% train, 30% test)
cutoff = crime_df.approxQuantile("timestamp", [0.7], 0.01)[0]
train_df = crime_df.filter(F.col("timestamp") < cutoff)
test_df = crime_df.filter(F.col("timestamp") >= cutoff)

# 9. Train Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="days_until_next")
model = lr.fit(train_df)

# 10. Predict and evaluate
predictions = model.transform(test_df)
evaluator = RegressionEvaluator(labelCol="days_until_next", predictionCol="prediction")

mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"\nModel Performance:")
print(f"MAE: {mae:.2f} days")
print(f"RMSE: {rmse:.2f} days")
print(f"R²: {r2:.2f}")


Py4JJavaError: An error occurred while calling o3918.fit.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2520)
	at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:2529)
	at org.apache.spark.sql.hive.HadoopTableReader.<init>(TableReader.scala:84)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopReader$lzycompute(HiveTableScanExec.scala:110)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopReader(HiveTableScanExec.scala:105)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.$anonfun$doExecute$1(HiveTableScanExec.scala:207)
	at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2587)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.doExecute(HiveTableScanExec.scala:207)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:370)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at sun.reflect.GeneratedMethodAccessor208.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [101]:
cat_cols = ["primary_type", "location_description", "district"]
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "_idx").fit(crimes_df)
    crimes_df = indexer.transform(crimes_df)

# Feature list
features = ["hour", "day_of_week", "month", "x_coordinate", "y_coordinate",
            "ward", "community_area", "crimes_last_week"] + [c + "_idx" for c in cat_cols]

# Assemble features

assembler = VectorAssembler(inputCols=features, outputCol="features")


In [108]:
crimes_df.count()

0

In [102]:
crimes_df = assembler.transform(crimes_df)

# Time-based split
quantile_cutoff = crimes_df.approxQuantile("timestamp", [0.7], 0.01)[0]
train_df = crimes_df.filter(F.col("timestamp") < quantile_cutoff)
test_df = crimes_df.filter(F.col("timestamp") >= quantile_cutoff)



IndexError: list index out of range

In [None]:
# Train Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="days_until_next")
model = lr.fit(train_df)

# Predict
predictions = model.transform(test_df)

# Evaluate
evaluator = RegressionEvaluator(labelCol="days_until_next", predictionCol="prediction")
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"\nModel Performance:")
print(f"MAE: {mae:.2f} days")
print(f"RMSE: {rmse:.2f} days")
print(f"R²: {r2:.2f}")

In [117]:
crimes_df

DataFrame[id: int, date: bigint, block: string, primary_type: string, location_description: string, arrest: boolean, domestic: boolean, beat: int, district: int, ward: double, community_area: double, fbi_code: string, x_coordinate: double, y_coordinate: double]

In [140]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
crime_df = spark.table("team16_projectdb.crimes")

# 1. Convert timestamp
crime_df = crime_df.withColumn("datetime", F.from_unixtime(F.col("date") / 1000))
crime_df = crime_df.withColumn("timestamp", F.col("datetime").cast("long"))

In [142]:
crime_df

DataFrame[id: int, date: bigint, block: string, primary_type: string, location_description: string, arrest: boolean, domestic: boolean, beat: int, district: int, ward: double, community_area: double, fbi_code: string, x_coordinate: double, y_coordinate: double, datetime: string, timestamp: bigint]

In [138]:
# 2. Create target: days until next crime in same district
w = Window.partitionBy("district").orderBy("datetime")
crime_df = crime_df.withColumn("next_datetime", F.lead("datetime").over(w)) \
                   .withColumn("days_until_next", 
                               (F.col("next_datetime").cast("long") - F.col("datetime").cast("long")) / (24*3600)) \
                   .filter(F.col("days_until_next").isNotNull())



In [141]:
crime_df.count()

6170812

In [120]:
# 6. Encode categorical columns
cat_cols = ["primary_type", "location_description", "district"]
for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col + "_idx").fit(crime_df)
    crime_df = indexer.transform(crime_df)

# 7. Assemble features
features = [
    "hour", "day_of_week", "month", "x_coordinate", "y_coordinate",
    "ward", "community_area"
] + [col + "_idx" for col in cat_cols]

assembler = VectorAssembler(inputCols=features, outputCol="features")
crime_df = assembler.transform(crime_df)

# 8. Time-based split (70% train, 30% test)
cutoff = crime_df.approxQuantile("timestamp", [0.7], 0.01)[0]
train_df = crime_df.filter(F.col("timestamp") < cutoff)
test_df = crime_df.filter(F.col("timestamp") >= cutoff)

IndexError: list index out of range

In [None]:


# 9. Train Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="days_until_next")
model = lr.fit(train_df)

# 10. Predict and evaluate
predictions = model.transform(test_df)
evaluator = RegressionEvaluator(labelCol="days_until_next", predictionCol="prediction")

mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"\nModel Performance:")
print(f"MAE: {mae:.2f} days")
print(f"RMSE: {rmse:.2f} days")
print(f"R²: {r2:.2f}")


In [146]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lead, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# ── 1. Spark session ────────────────────────────────────────────────────────
team = 16
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
    .appName(f"{team} - spark ML")\
    .master("yarn")\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

# ── 2. Load Hive table ───────────────────────────────────────────────────────
# adjust to your actual database/table
df = spark.table("team16_projectdb.crimes")

# ── 3. Drop irrelevant columns ───────────────────────────────────────────────
df = df.drop("id", "block", "location_description", "x_coordinate", "y_coordinate")

# ── 4. Convert 'date' (ms since epoch) → seconds as DOUBLE ───────────────────
df = df.withColumn("ts", (col("date")/1000).cast("double"))

# ── 5. Build the target: next crime’s timestamp ──────────────────────────────
w = Window.orderBy("ts")
df = df.withColumn("ts_next", lead("ts", 1).over(w))
df = df.na.drop(subset=["ts_next"])

# ── 6. Encode categoricals with StringIndexer ───────────────────────────────
for inp, out in [("primary_type", "primary_type_idx"),
                 ("fbi_code",    "fbi_code_idx")]:
    idx = StringIndexer(inputCol=inp, outputCol=out, handleInvalid="keep")
    df = idx.fit(df).transform(df)

# ── 7. Cast booleans to ints ────────────────────────────────────────────────
df = df.withColumn("arrest",  when(col("arrest")  == True, 1.0).otherwise(0.0))
df = df.withColumn("domestic",when(col("domestic")== True, 1.0).otherwise(0.0))

# ── 8. Assemble features ────────────────────────────────────────────────────
feature_cols = [
    "ts",
    "primary_type_idx",
    "fbi_code_idx",
    "arrest",
    "domestic",
    "beat",
    "district",
    "ward",
    "community_area"
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# ── 9. Split data ───────────────────────────────────────────────────────────
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# ── 10. Train Linear Regression ─────────────────────────────────────────────



In [151]:
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="skip"      # <— skip any row that has null in any feature
)
df = assembler.transform(df)


IllegalArgumentException: Output column features already exists.

In [153]:
train_df.count()

Py4JJavaError: An error occurred while calling o2298.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 298.0 failed 4 times, most recent failure: Lost task 0.3 in stage 298.0 (TID 447) (hadoop-02.uni.innopolis.ru executor 2): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3750/53548035: (struct<ts:double,primary_type_idx:double,fbi_code_idx:double,arrest:double,domestic:double,beat_double_VectorAssembler_1a0fe3dd03ee:double,district_double_VectorAssembler_1a0fe3dd03ee:double,ward:double,community_area:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2245)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2264)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2289)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:410)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:371)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3012)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3011)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3011)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3750/53548035: (struct<ts:double,primary_type_idx:double,fbi_code_idx:double,arrest:double,domestic:double,beat_double_VectorAssembler_1a0fe3dd03ee:double,district_double_VectorAssembler_1a0fe3dd03ee:double,ward:double,community_area:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 20 more


In [148]:
lr = LinearRegression(featuresCol="features", labelCol="ts_next")
model = lr.fit(train_df)



Py4JJavaError: An error occurred while calling o2338.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 289.0 failed 4 times, most recent failure: Lost task 0.3 in stage 289.0 (TID 429) (hadoop-02.uni.innopolis.ru executor 2): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3750/53548035: (struct<ts:double,primary_type_idx:double,fbi_code_idx:double,arrest:double,domestic:double,beat_double_VectorAssembler_1a0fe3dd03ee:double,district_double_VectorAssembler_1a0fe3dd03ee:double,ward:double,community_area:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2319)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:451)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:345)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:327)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:184)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3750/53548035: (struct<ts:double,primary_type_idx:double,fbi_code_idx:double,arrest:double,domestic:double,beat_double_VectorAssembler_1a0fe3dd03ee:double,district_double_VectorAssembler_1a0fe3dd03ee:double,ward:double,community_area:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 31 more


In [None]:
# ── 11. Evaluate ────────────────────────────────────────────────────────────
pred = model.transform(test_df)
mae = RegressionEvaluator(labelCol="ts_next", predictionCol="prediction", metricName="mae")\
      .evaluate(pred)
r2  = RegressionEvaluator(labelCol="ts_next", predictionCol="prediction", metricName="r2")\
      .evaluate(pred)

print(f"Test MAE: {mae:.2f} seconds")
print(f"Test R² : {r2:.4f}")

# ── (Optional) Save model ───────────────────────────────────────────────────
# model.save(f"hdfs:///models/team{team}/linear_crime_time")

In [164]:
spark = SparkSession.builder\
    .appName(f"{team} - spark ML")\
    .master("yarn")\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

# 2. Load the table
df = spark.table("team16_projectdb.crimes")
df.head(2)


[Row(id=4786321, date=1072904460000, block='082XX S COLES AVE', primary_type='THEFT', location_description='RESIDENCE', arrest=False, domestic=False, beat=424, district=4, ward=7.0, community_area=46.0, fbi_code='06', x_coordinate=None, y_coordinate=None),
 Row(id=4676906, date=1046466000000, block='004XX W 42ND PL', primary_type='OTHER OFFENSE', location_description='RESIDENCE', arrest=False, domestic=True, beat=935, district=9, ward=11.0, community_area=61.0, fbi_code='26', x_coordinate=1173974.0, y_coordinate=1876757.0)]

In [165]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lead, when
from pyspark.ml.feature import StringIndexer, RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Spark session
team = 16
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
    .appName(f"{team} - spark ML")\
    .master("yarn")\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

# 2. Load the table
df = spark.table("team16_projectdb.crimes")

# 3. Drop useless columns
df = df.drop("id", "block", "location_description", "x_coordinate", "y_coordinate")

# 4. Convert 'date' → seconds timestamp
df = df.withColumn("ts", (col("date")/1000).cast("double"))

In [168]:
1046466000.0 - 1087714800.0

-41248800.0

In [166]:
df.head(5)

[Row(date=1072904460000, primary_type='THEFT', arrest=False, domestic=False, beat=424, district=4, ward=7.0, community_area=46.0, fbi_code='06', ts=1072904460.0),
 Row(date=1046466000000, primary_type='OTHER OFFENSE', arrest=False, domestic=True, beat=935, district=9, ward=11.0, community_area=61.0, fbi_code='26', ts=1046466000.0),
 Row(date=1087714800000, primary_type='OFFENSE INVOLVING CHILDREN', arrest=False, domestic=False, beat=1413, district=14, ward=35.0, community_area=22.0, fbi_code='20', ts=1087714800.0),
 Row(date=1104426000000, primary_type='THEFT', arrest=False, domestic=False, beat=2521, district=25, ward=31.0, community_area=20.0, fbi_code='06', ts=1104426000.0),
 Row(date=1051736400000, primary_type='THEFT', arrest=False, domestic=False, beat=2233, district=22, ward=34.0, community_area=49.0, fbi_code='06', ts=1051736400.0)]

In [155]:

# 5. Create target ts_next
w = Window.orderBy("ts")
df = df.withColumn("ts_next", lead("ts", 1).over(w))
df = df.na.drop(subset=["ts_next"])

# 6. Ensure no nulls in predictors
non_null_cols = ["ts", "primary_type", "fbi_code", "arrest", "domestic",
                 "beat", "district", "ward", "community_area", "ts_next"]
df = df.na.drop(subset=non_null_cols)

# 7. Cast booleans to numeric
df = df.withColumn("arrest",  when(col("arrest")  == True, 1.0).otherwise(0.0))
df = df.withColumn("domestic",when(col("domestic")== True, 1.0).otherwise(0.0))

# 8. Index categoricals
for inp in ("primary_type", "fbi_code"):
    idx = StringIndexer(inputCol=inp, outputCol=inp + "_idx", handleInvalid="keep")
    df = idx.fit(df).transform(df)

# 9. Use RFormula to build features & label
#    ts_next ~ ts + primary_type_idx + fbi_code_idx + arrest + domestic + beat + district + ward + community_area
formula = RFormula(
    formula="ts_next ~ ts + primary_type_idx + fbi_code_idx + arrest + domestic + beat + district + ward + community_area",
    featuresCol="features",
    labelCol="ts_next",
    handleInvalid="skip"
)
df2 = formula.fit(df).transform(df)

# 10. Split train/test
train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)

In [156]:
train_df.count()

4443931

In [163]:
train_df.head(2)

[Row(date=978296400000, primary_type='CRIM SEXUAL ASSAULT', arrest=0.0, domestic=0.0, beat=211, district=2, ward=4.0, community_area=35.0, fbi_code='02', ts=978296400.0, ts_next=978296400.0, primary_type_idx=15.0, fbi_code_idx=16.0, features=DenseVector([978296400.0, 15.0, 16.0, 0.0, 0.0, 211.0, 2.0, 4.0, 35.0])),
 Row(date=978296400000, primary_type='CRIM SEXUAL ASSAULT', arrest=0.0, domestic=0.0, beat=422, district=4, ward=7.0, community_area=46.0, fbi_code='02', ts=978296400.0, ts_next=978296400.0, primary_type_idx=15.0, fbi_code_idx=16.0, features=DenseVector([978296400.0, 15.0, 16.0, 0.0, 0.0, 422.0, 4.0, 7.0, 46.0]))]

In [158]:
# 11. Train Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="ts_next")
model = lr.fit(train_df)

# 12. Evaluate
pred = model.transform(test_df)
mae = RegressionEvaluator(labelCol="ts_next", predictionCol="prediction", metricName="mae")\
      .evaluate(pred)
r2  = RegressionEvaluator(labelCol="ts_next", predictionCol="prediction", metricName="r2")\
      .evaluate(pred)

print(f"Test MAE: {mae:.2f} seconds")
print(f"Test R² : {r2:.4f}")

Test MAE: 106.74 seconds
Test R² : 1.0000


In [190]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lead, when
from pyspark.ml.feature import StringIndexer, RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Spark session
team = 16
warehouse = "project/hive/warehouse"

spark = SparkSession.builder \
    .appName(f"{team} - spark ML") \
    .master("yarn") \
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883") \
    .config("spark.sql.warehouse.dir", warehouse) \
    .config("spark.sql.avro.compression.codec", "snappy") \
    .enableHiveSupport() \
    .getOrCreate()

# 2. Load the table
df = spark.table("team16_projectdb.crimes")

# 3. Drop useless columns
df = df.drop("id", "block", "location_description", "x_coordinate", "y_coordinate")

# 4. Convert 'date' → seconds timestamp
df = df.withColumn("ts", (col("date") / 1000).cast("double"))

# 5. Create target: time until next crime in the same district
w = Window.partitionBy("district").orderBy("ts")
df = df.withColumn("ts_next", lead("ts", 1).over(w))
df = df.na.drop(subset=["ts_next"])
df = df.withColumn("label", col("ts_next") - col("ts"))

# 6. Ensure no nulls in predictors
non_null_cols = ["ts", "primary_type", "fbi_code", "arrest", "domestic",
                 "beat", "district", "ward", "community_area", "label"]
df = df.na.drop(subset=non_null_cols)

# 7. Cast booleans to numeric
df = df.withColumn("arrest", when(col("arrest") == True, 1.0).otherwise(0.0))
df = df.withColumn("domestic", when(col("domestic") == True, 1.0).otherwise(0.0))

# 8. Index categoricals
for inp in ("primary_type", "fbi_code"):
    idx = StringIndexer(inputCol=inp, outputCol=inp + "_idx", handleInvalid="keep")
    df = idx.fit(df).transform(df)

# 9. Use RFormula to build features & label
formula = RFormula(
    formula="label ~ ts + primary_type_idx + fbi_code_idx + arrest + domestic + beat + district + ward + community_area",
    featuresCol="features",
    labelCol="label",
    handleInvalid="skip"
)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [170]:
df2 = formula.fit(df).transform(df)

# 10. Split train/test
train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)




In [171]:
train_df.head(1)

[Row(date=978296400000, primary_type='CRIM SEXUAL ASSAULT', arrest=0.0, domestic=0.0, beat=1622, district=16, ward=45.0, community_area=10.0, fbi_code='02', ts=978296400.0, ts_next=978296400.0, label=0.0, primary_type_idx=15.0, fbi_code_idx=16.0, features=DenseVector([978296400.0, 15.0, 16.0, 0.0, 0.0, 1622.0, 16.0, 45.0, 10.0]))]

In [172]:
# 11. Train linear regression
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_df)

# 12. Evaluate
predictions = model.transform(test_df)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.6/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib64/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [192]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, min as spark_min, monotonically_increasing_id
from pyspark.ml.feature import StringIndexer, RFormula
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Spark session
team = 16
warehouse = "project/hive/warehouse"

spark = SparkSession.builder \
    .appName(f"{team} - spark ML") \
    .master("yarn") \
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883") \
    .config("spark.sql.warehouse.dir", warehouse) \
    .config("spark.sql.avro.compression.codec", "snappy") \
    .enableHiveSupport() \
    .getOrCreate()

# 2. Load table
df = spark.table("team16_projectdb.crimes")

# 3. Drop useless cols
df = df.drop("block", "location_description", "x_coordinate", "y_coordinate")

# 4. Convert 'date' → seconds
df = df.withColumn("ts", (col("date") / 1000).cast("double"))

# 5. Add a stable row_id so we can re-join after aggregation
df = df.withColumn("row_id", monotonically_increasing_id())

# 6. Self-join to find the next ts in the same district
#    for each row d1, only consider d2 with same district and d2.ts > d1.ts
joined = df.alias("d1").join(
    df.alias("d2"),
    (col("d1.district") == col("d2.district")) & (col("d2.ts") > col("d1.ts")),
    how="left"
)

# 7. For each d1.row_id, pick the minimum d2.ts (the very next timestamp)
next_ts = joined.groupBy(col("d1.row_id").alias("row_id")) \
                .agg(spark_min(col("d2.ts")).alias("ts_next"))


In [193]:
# 8. Attach ts_next back onto the original df
df = df.join(next_ts, on="row_id", how="left")

# 9. Drop rows without any next crime in that district
df = df.na.drop(subset=["ts_next"])

# 10. Compute the actual time gap label
df = df.withColumn("label", col("ts_next") - col("ts"))

# 11. Clean up and drop helper columns
df = df.drop("row_id", "date", "ts_next")

# 12. Drop any rows with nulls in our predictors or label
required = ["ts", "primary_type", "fbi_code", "arrest", "domestic",
            "beat", "district", "ward", "community_area", "label"]
df = df.na.drop(subset=required)



In [194]:
# 13. Cast booleans to numeric
df = df.withColumn("arrest",  when(col("arrest")  == True, 1.0).otherwise(0.0))
df = df.withColumn("domestic",when(col("domestic")== True, 1.0).otherwise(0.0))

# 14. Index categorical columns
for inp in ("primary_type", "fbi_code"):
    idx = StringIndexer(inputCol=inp, outputCol=inp + "_idx", handleInvalid="keep")
    df = idx.fit(df).transform(df)

# 15. Build a feature vector & label with RFormula
formula = RFormula(
    formula="label ~ ts + primary_type_idx + fbi_code_idx + arrest + domestic + "
            "beat + district + ward + community_area",
    featuresCol="features",
    labelCol="label",
    handleInvalid="skip"
)
df2 = formula.fit(df).transform(df)

# 16. Split into train/test
train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)

Py4JJavaError: An error occurred while calling o4006.fit.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2520)
	at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:2529)
	at org.apache.spark.sql.hive.HadoopTableReader.<init>(TableReader.scala:84)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopReader$lzycompute(HiveTableScanExec.scala:110)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.hadoopReader(HiveTableScanExec.scala:105)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.$anonfun$doExecute$1(HiveTableScanExec.scala:207)
	at org.apache.spark.util.Utils$.withDummyCallSite(Utils.scala:2587)
	at org.apache.spark.sql.hive.execution.HiveTableScanExec.doExecute(HiveTableScanExec.scala:207)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:370)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at sun.reflect.GeneratedMethodAccessor208.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
# 17. Train your Linear Regression
lr    = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_df)

# 18. Evaluate on test set
predictions = model.transform(test_df)
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse:.2f} seconds")

In [196]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, lead, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# ── 1. Spark session ────────────────────────────────────────────────────────
team = 16
warehouse = "project/hive/warehouse"
spark = SparkSession.builder\
    .appName(f"{team} - spark ML")\
    .master("yarn")\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

# ── 2. Load & prep ──────────────────────────────────────────────────────────
df = spark.table("team16_projectdb.crimes")\
    .drop("block", "location_description", "x_coordinate", "y_coordinate")\
    .withColumn("ts", (col("date")/1000).cast("double"))

# compute next‐in‐district & label
w = Window.partitionBy("district").orderBy("ts")
df = df.withColumn("ts_next", lead("ts",1).over(w))\
       .na.drop(subset=["ts_next"])\
       .withColumn("label", col("ts_next") - col("ts"))\
       .drop("date","ts_next")\
       .na.drop(subset=["ts","primary_type","fbi_code","arrest",
                         "domestic","beat","district","ward",
                         "community_area","label"])

# cast booleans
df = df.withColumn("arrest",  when(col("arrest")  == True, 1.0).otherwise(0.0))\
       .withColumn("domestic",when(col("domestic")== True, 1.0).otherwise(0.0))

# cache before the expensive split/index
df.cache()

# ── 3. Split early ──────────────────────────────────────────────────────────
train_df, test_df = df.randomSplit([0.8,0.2], seed=42)
train_df.cache()
test_df.cache()

# ── 4. Fit indexers on train only ──────────────────────────────────────────
indexers = []
for col_in in ("primary_type","fbi_code"):
    idx = StringIndexer(inputCol=col_in, outputCol=col_in+"_idx",
                        handleInvalid="keep").fit(train_df)
    train_df = idx.transform(train_df)
    test_df  = idx.transform(test_df)

# ── 5. Assemble features ───────────────────────────────────────────────────
assembler = VectorAssembler(
    inputCols=[
        "ts",
        "primary_type_idx",
        "fbi_code_idx",
        "arrest",
        "domestic",
        "beat",
        "district",
        "ward",
        "community_area"
    ],
    outputCol="features",
    handleInvalid="skip"    # skip any rows with still‐nulls
)
train_df = assembler.transform(train_df).select("features","label")
test_df  = assembler.transform(test_df).select("features","label")



Py4JJavaError: An error occurred while calling o4176.fit.
: java.lang.NullPointerException
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:370)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
	at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
	at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:242)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:145)
	at sun.reflect.GeneratedMethodAccessor208.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:

# ── 6. Train Linear Regression ─────────────────────────────────────────────
lr_model = LinearRegression(featuresCol="features", labelCol="label")\
           .fit(train_df)

# ── 7. Evaluate ─────────────────────────────────────────────────────────────


In [198]:
train_df.count()

Py4JJavaError: An error occurred while calling o4172.count.
: java.lang.NullPointerException
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:173)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:167)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:143)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:370)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:343)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3012)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3011)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3011)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
