In [1]:
from pyspark.sql import SparkSession

# إنشاء Spark Session
spark = SparkSession.builder.appName("UsedCarAnalysis").getOrCreate()

# قراءة CSV إلى DataFrame
df = spark.read.csv("used_car_dataset.csv", header=True, inferSchema=True)

# تحويل الـ DataFrame إلى RDD
rdd = df.rdd


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/used_car_dataset.csv.

In [None]:
rdd.take(5)


In [None]:
rdd.map(lambda row: (row['name'], row['price'])).take(5)


In [None]:
rdd.filter(lambda row: row['price'] < 500000).take(5)


In [None]:
rdd.count()


In [None]:
rdd.map(lambda row: row['name']).distinct().take(5)


In [3]:
df.show(5)

NameError: name 'df' is not defined

In [None]:
df.select("name", "price").show(5)


In [None]:
df.filter(df.price < 500000).show(5)


In [None]:
df.groupBy("name").count().show(5)
df.groupBy("name").avg("price").show(5)


In [None]:
df.describe().show()


In [None]:
rdd.first()


In [None]:
rdd.collect()


In [None]:
from operator import add
rdd.map(lambda row: row['price']).reduce(add)


In [None]:
rdd.map(lambda row: row['fuel']).countByValue()


In [None]:
rdd.flatMap(lambda row: row['name'].split()).take(5)


In [None]:
rdd.sample(False, 0.1).take(5)


In [None]:
rdd.map(lambda row: row['price']).max()


In [None]:
rdd.map(lambda row: row['price']).min()


In [None]:
rdd.isEmpty()


In [None]:
rdd.zipWithIndex().take(5)


In [None]:
df.columns


In [None]:
df.printSchema()


In [None]:
from pyspark.sql.functions import col
df.withColumn("price_in_k", col("price") / 1000).show(5)


In [None]:
df.drop("seller_type").show(5)


In [None]:
df.orderBy("price", ascending=False).show(5)


In [None]:
df.dropDuplicates().show(5)


In [None]:
df.select(col("price").alias("car_price")).show(5)


In [None]:
df.fillna(0).show(5)


In [None]:
from pyspark.sql.functions import avg, max
df.agg(avg("price"), max("km_driven")).show()


In [None]:
from pyspark.sql.functions import avg, col

df.select(avg("price")).show()


filtered_df = df.filter(col("price") >= 510000)

sorted_df = filtered_df.orderBy(col("price").desc())

sorted_df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# 1. إعداد Spark
spark = SparkSession.builder.appName("UsedCarML").getOrCreate()

# 2. قراءة البيانات
df = spark.read.csv("used_car_dataset.csv", header=True, inferSchema=True)

# 3. تنظيف (مثلاً إزالة null rows)
df = df.na.drop(subset=["price", "year", "km_driven", "fuel", "transmission"])

# 4. تحويل البيانات الفئوية إلى أرقام
indexers = [
    StringIndexer(inputCol="fuel", outputCol="fuel_index"),
    StringIndexer(inputCol="transmission", outputCol="trans_index")
]

for indexer in indexers:
    df = indexer.fit(df).transform(df)

# 5. تجهيز الـ Features
feature_cols = ["year", "km_driven", "fuel_index", "trans_index"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# 6. تقسيم البيانات
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# 7. نموذج الانحدار الخطي
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_data)

# 8. التقييم
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE):", rmse)
