In [2]:
'''
1️⃣ Load data (Parquet/CSV).
2️⃣ Feature Engineering (convert media_type to numerical, extract day/hour from timestamp).
3️⃣ Assemble features using VectorAssembler.
4️⃣ Train a Random Forest model (regression or classification).
5️⃣ Evaluate model performance (RMSE for regression, Accuracy/AUC for classification).
'''

'\n1️⃣ Load data (Parquet/CSV).\n2️⃣ Feature Engineering (convert media_type to numerical, extract day/hour from timestamp).\n3️⃣ Assemble features using VectorAssembler.\n4️⃣ Train a Random Forest model (regression or classification).\n5️⃣ Evaluate model performance (RMSE for regression, Accuracy/AUC for classification).\n'

In [3]:
# from pyspark.sql import  SparkSession 

# spark = SparkSession.builder.appName('pipeline').getOrCreate() 

# df_csv = spark.read.csv('/workspaces/machine_learning-projects-/pending_work/pyspark task/small_media_data.csv',
#                         header = True, inferSchema = True)

# num_partitions = df_csv.rdd.getNumPartitions()  
# print(num_partitions) 

In [None]:
df_csv = df_csv.repartition(num_partitions + 10)
df_csv.write.option('compression','snappy').mode('overwrite').parquet('/workspaces/machine_learning-projects-/pending_work/pyspark task/small_media_data.parquet') 

                                                                                

In [100]:
df_parquet = spark.read.parquet('/workspaces/machine_learning-projects-/pending_work/pyspark task/small_media_data.parquet')


'''
# if you do that in sql 

df_parquet.createOrReplaceTempView('media_table')
df_parquet = spark.sql(
    """
        with temp_temp as (
            select 
                post_id, 
                user_id, 
                cast(timestamp as date) as dates, 
                cast(date_format(timestamp,"M") as int)  as months , 
                likes, 
                case when likes > 500 then 1 else 0 end as engagement_score, 
                case when media_type = 'image' then 1 
                    when media_type = 'text' then 2 
                    else 3 end as media_type_encoded

            from media_table 
        )
        select * from temp_temp;
    """
)
'''

from pyspark.sql.functions import when,cast , col , date_format,cast 
from pyspark.sql.types  import DateType  

df_parquet = df_parquet.withColumn('timestamp' , col('timestamp').cast(DateType())).withColumnRenamed('timestamp','date')
df_parquet = df_parquet.withColumn('months' , date_format('date', 'M').cast('int'))
df_parquet= df_parquet.withColumn('engagement_score', when(col('likes') > 500 ,1).otherwise(0))
df_parquet = df_parquet.withColumn('media_type',when(col('media_type') == 'image', 1).when(col('media_type') == 'video',2).otherwise(3)).withColumnRenamed('media_type', 'media_type_encoded')
df_parquet.show()


+-------+-------+----------+-----+------+--------+------------------+------+----------------+
|post_id|user_id|      date|likes|shares|comments|media_type_encoded|months|engagement_score|
+-------+-------+----------+-----+------+--------+------------------+------+----------------+
|    371|    126|2025-03-12|  516|   473|       5|                 1|     3|               1|
|    997|     34|2025-03-05|  259|   266|     208|                 2|     3|               0|
|    823|    105|2025-03-02|  462|    62|     241|                 1|     3|               0|
|    771|    103|2025-03-22|  845|     1|     274|                 3|     3|               1|
|    594|     17|2025-03-23|  212|   156|     182|                 3|     3|               0|
|    133|      8|2025-03-13|  123|   313|     182|                 3|     3|               0|
|    321|     47|2025-03-12|  811|    20|     132|                 2|     3|               1|
|    928|     57|2025-03-04|  771|   277|     250|          

In [None]:

train , test = df_parquet.randomSplit([0.8,0.2], seed= 42)

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import  RandomForestClassifier

train = train.drop('post_id','user_id','date')

column_list = train.columns
column_list.remove('engagement_score')

vector = VectorAssembler(
    inputCols = column_list,
    outputCol = 'vector_list'
)
train_transformed = vector.transform(train)
test_transformed = vector.transform(test)


In [None]:
from pyspark.ml.

In [None]:

model = RandomForestClassifier(
    featuresCol = 'vector_list', 
    labelCol = 'engagement_score'
)
model = model.fit(train_transformed)


In [119]:
from pyspark.ml.evaluation import  BinaryClassificationEvaluator

prediction = model.transform(test_transformed)
prediction.show()
evaluator = BinaryClassificationEvaluator(
    labelCol = 'engagement_score',
    rawPredictionCol = 'probability', 
    metricName = 'areaUnderROC'
)
roc = evaluator.evaluate(prediction)
print(f'roc : {roc}')

+-------+-------+----------+-----+------+--------+------------------+------+----------------+--------------------+--------------------+--------------------+----------+
|post_id|user_id|      date|likes|shares|comments|media_type_encoded|months|engagement_score|         vector_list|       rawPrediction|         probability|prediction|
+-------+-------+----------+-----+------+--------+------------------+------+----------------+--------------------+--------------------+--------------------+----------+
|      3|     29|2025-03-23|  969|   197|     148|                 3|     3|               1|[969.0,197.0,148....|[0.03698201760166...|[0.00184910088008...|       1.0|
|      8|      1|2025-03-14|  537|   358|      45|                 2|     3|               1|[537.0,358.0,45.0...|[0.55787569833490...|[0.02789378491674...|       1.0|
|     12|     90|2025-03-10|  880|   324|     146|                 3|     3|               1|[880.0,324.0,146....|[0.04165491479792...|[0.00208274573989...|    