In [1]:
# 1. تثبيت PySpark
!pip install pyspark

# 2. الاستيراد والتهيئة
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import NumericType, StringType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

spark = SparkSession.builder.appName("Flight_Delay_Fixed").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

print("---  بدأ تشغيل PySpark ---")

# 3. تحميل البيانات
try:
    df = spark.read.csv("flightsData.csv", header=True, inferSchema=True)
    print(f" تم تحميل البيانات: {df.count()} صف")
except:
    print(" خطأ: تأكد من رفع ملف flightsData.csv!")

# 4. تنظيف البيانات
print("---  تنظيف البيانات ---")

# حذف الأعمدة غير الضرورية
cols_to_drop = ['tailnum', 'id', 'year', 'time_hour', 'flight', 'name']
for c in cols_to_drop:
    if c in df.columns:
        df = df.drop(c)

# تعويض القيم المفقودة (Nulls)
num_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]
cat_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

# تعويض الأرقام بـ 0 والنصوص بـ Unknown
df = df.na.fill(0, subset=num_cols)
df = df.na.fill("Unknown", subset=cat_cols)

# 5. هندسة الميزات
print("---  هندسة الميزات ---")

# أ) الهدف
df = df.withColumn("label", F.when(F.col("arr_delay") > 0, 1.0).otherwise(0.0))

# ب) اللوغاريتم (مع حماية من القيم السالبة والصفر)
# نستخدم when للتأكد أننا لا نأخذ لوغاريتم لرقم <= 0
df = df.withColumn("dep_delay_log",
                   F.when(F.col("dep_delay") > 0, F.log1p(F.col("dep_delay")))
                   .otherwise(0.0))

# ج) مراحل Pipeline
stages = []

# تحويل النصوص
for c in cat_cols:
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") # keep مهم جداً
    encoder = OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_vec")
    stages += [indexer, encoder]

# تجميع الميزات
exclude = ['label', 'arr_delay'] + cat_cols
input_cols = [c for c in df.columns if c not in exclude]
final_cols = input_cols + [f"{c}_vec" for c in cat_cols]

# التعديل هنا: handleInvalid="skip"
# هذا يمنع الخطأ ويخبر Spark بتجاهل أي صف فيه مشكلة بدلاً من إيقاف الكود
assembler = VectorAssembler(inputCols=final_cols, outputCol="features_raw", handleInvalid="skip")

scaler = StandardScaler(inputCol="features_raw", outputCol="features_scaled")
stages += [assembler, scaler]

# 6. التدريب
print("---  التدريب (Logistic Regression) ---")

lr = LogisticRegression(featuresCol="features_scaled", labelCol="label", maxIter=20)
stages.append(lr)

pipeline = Pipeline(stages=stages)

# تقسيم البيانات
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Fit
model = pipeline.fit(train)

# 7. النتائج
print("---  النتائج ---")
predictions = model.transform(test)

acc = MulticlassClassificationEvaluator(metricName="accuracy").evaluate(predictions)
auc = BinaryClassificationEvaluator(metricName="areaUnderROC").evaluate(predictions)

print("\n" + "="*40)
print(f" Accuracy (الدقة): {acc:.2%}")
print(f" ROC-AUC (الجودة): {auc:.4f}")
print("="*40)

---  بدأ تشغيل PySpark ---
 تم تحميل البيانات: 336776 صف
---  تنظيف البيانات ---
---  هندسة الميزات ---
---  التدريب (Logistic Regression) ---
---  النتائج ---

 Accuracy (الدقة): 84.18%
 ROC-AUC (الجودة): 0.9029
