In [0]:
file_path = "/Volumes/hamzeh_databricks_workspace/default/hamzeh-volume/ChicagoParkingTickets.txt"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)

df.show(5)


+-------------------+------------------+------------------+------------------+--------------+-----------------+------------------+-----------------------+--------------------------------+------------+----+------+-----+---------------+----------+-------------------+-------+------------+--------------------+
|        Issued_date|    Community_Name|            Sector|              Side|Hardship_Index|Per_capita_income|Percent_unemployed|Percent_without_diploma|Percent_households_below_poverty|Neighborhood|Ward| Tract|  ZIP|Police_District|Plate_Type|License_Plate_State|Unit_ID|Violation_ID|PaymentIsOutstanding|
+-------------------+------------------+------------------+------------------+--------------+-----------------+------------------+-----------------------+--------------------------------+------------+----+------+-----+---------------+----------+-------------------+-------+------------+--------------------+
|2000-01-29 09:10:00|            Austin|      Other W Side|         West Sid

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("parking_tickets")


In [0]:
df.printSchema()

df.count()

df.groupBy("Community_Name") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(10)

from pyspark.sql.functions import year

df.groupBy(year("Issued_date").alias("year")) \
  .count() \
  .orderBy("year") \
  .show()


root
 |-- Issued_date: timestamp (nullable = true)
 |-- Community_Name: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Side: string (nullable = true)
 |-- Hardship_Index: double (nullable = true)
 |-- Per_capita_income: double (nullable = true)
 |-- Percent_unemployed: double (nullable = true)
 |-- Percent_without_diploma: double (nullable = true)
 |-- Percent_households_below_poverty: double (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Tract: integer (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- Police_District: integer (nullable = true)
 |-- Plate_Type: string (nullable = true)
 |-- License_Plate_State: string (nullable = true)
 |-- Unit_ID: integer (nullable = true)
 |-- Violation_ID: integer (nullable = true)
 |-- PaymentIsOutstanding: integer (nullable = true)

+---------------+-----+
| Community_Name|count|
+---------------+-----+
|Near North Side|84386|
|           Loop|68700|
|      Lak

In [0]:
file_path = "/Volumes/hamzeh_databricks_workspace/default/hamzeh-volume/ChicagoParkingTickets.txt"

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(file_path)


from pyspark.sql.functions import col

selected_cols = [
    "Per_capita_income",
    "Percent_unemployed",
    "Percent_households_below_poverty",
    "Hardship_Index",
    "PaymentIsOutstanding"
]

df_ml = df.select(*selected_cols).dropna()
df_ml.show(5)



df_ml = df_ml.withColumn(
    "label",
    col("PaymentIsOutstanding").cast("double")
)



from pyspark.ml.feature import VectorAssembler

feature_cols = [
    "Per_capita_income",
    "Percent_unemployed",
    "Percent_households_below_poverty",
    "Hardship_Index"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df_final = assembler.transform(df_ml).select("features", "label")
df_final.show(5)



train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)



from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")

model = lr.fit(train_df)



predictions = model.transform(test_df)
predictions.select("features", "label", "prediction", "probability").show(5)



from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print("AUC:", auc)


from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_model = rf.fit(train_df)

rf_predictions = rf_model.transform(test_df)

auc_rf = evaluator.evaluate(rf_predictions)
print("Random Forest AUC:", auc_rf)





+-----------------+------------------+--------------------------------+--------------+--------------------+
|Per_capita_income|Percent_unemployed|Percent_households_below_poverty|Hardship_Index|PaymentIsOutstanding|
+-----------------+------------------+--------------------------------+--------------+--------------------+
|          15957.0|              22.6|                            28.6|          73.0|                   1|
|          18881.0|              24.0|                            27.8|          60.0|                   0|
|          43198.0|               6.6|                            14.7|          10.0|                   0|
|          15089.0|              13.1|                            20.5|          71.0|                   0|
|          19713.0|              20.8|                            16.9|          48.0|                   0|
+-----------------+------------------+--------------------------------+--------------+--------------------+
only showing top 5 rows
+---