In [2]:
!pip install pyspark
!pip install seaborn matplotlib


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=95f2bfb7dece468ff6dfd6781367daec2f7ace6d64dbf42f0f79244329150796
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Step 2: Create a SparkSession
spark = SparkSession.builder.appName("LogisticRegressionFBLive").getOrCreate()

# Step 3: Load data into DataFrame (fb_live_thailand.csv)
data = spark.read.csv("fb_live_thailand.csv", header=True, inferSchema=True)

# Step 4: Use StringIndexer for 'status_type' and 'status_published' to create indexes
indexer_status_type = StringIndexer(inputCol="status_type", outputCol="status_type_ind")
indexer_status_published = StringIndexer(inputCol="status_published", outputCol="status_published_ind")

# Fit and transform the data with the indexers
indexed_data = indexer_status_type.fit(data).transform(data)
indexed_data = indexer_status_published.fit(indexed_data).transform(indexed_data)

# Step 5: Use VectorAssembler to create a feature vector of 'status_type_ind' and 'status_published_ind'
assembler = VectorAssembler(inputCols=["status_type_ind", "status_published_ind"], outputCol="features")

# Step 6: Create Logistic Regression where 'status_type_ind' is the label
lr = LogisticRegression(featuresCol="features", labelCol="status_type_ind", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Step 7: Create a pipeline with VectorAssembler and LogisticRegression
pipeline = Pipeline(stages=[assembler, lr])

# Step 8: Split the data into train and test datasets
train_data, test_data = indexed_data.randomSplit([0.8, 0.2], seed=1234)

# Step 9: Fit the train data into the pipeline to create the model
lr_model = pipeline.fit(train_data)

# Step 10: Use the model to transform the test data to get predictions
predictions = lr_model.transform(test_data)

# Step 11: Show 5 rows of the predictions DataFrame
predictions.select("status_type_ind", "prediction").show(5)

# Step 12: Use MulticlassClassificationEvaluator to evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="status_type_ind", predictionCol="prediction")

# Evaluate Accuracy
accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Evaluate Precision
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
print(f"Weighted Precision: {precision}")

# Evaluate Recall
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
print(f"Weighted Recall: {recall}")

# Evaluate F1 Score
f1 = evaluator.setMetricName("f1").evaluate(predictions)
print(f"F1 Score: {f1}")


+---------------+----------+
|status_type_ind|prediction|
+---------------+----------+
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
+---------------+----------+
only showing top 5 rows

Accuracy: 0.5778748180494906
Weighted Precision: 0.35540867972989576
Weighted Recall: 0.5778748180494906
F1 Score: 0.44012719955040336


#Decision tree classification

In [10]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Create a SparkSession
spark = SparkSession.builder.appName("DecisionTreeClassificationExample").getOrCreate()

# Load data file into DataFrame (assuming the file is named 'fb_live_thailand.csv' and located in the current directory)
df = spark.read.csv("fb_live_thailand.csv", header=True, inferSchema=True)

# Check the initial DataFrame schema
df.printSchema()

# Use StringIndexer to index categorical columns
status_type_indexer = StringIndexer(inputCol="status_type", outputCol="status_type_ind")
status_published_indexer = StringIndexer(inputCol="status_published", outputCol="status_published_ind")

# Transform the data to add indexed columns
df_indexed = status_type_indexer.fit(df).transform(df)
df_indexed = status_published_indexer.fit(df_indexed).transform(df_indexed)

# Use OneHotEncoder to encode the indexed columns
encoder = OneHotEncoder(inputCols=["status_type_ind", "status_published_ind"], outputCols=["status_type_vec", "status_published_vec"])

# Use VectorAssembler to create a features vector
assembler = VectorAssembler(inputCols=["status_type_vec", "status_published_vec"], outputCol="features")

# Create a Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="status_type_ind", featuresCol="features")

# Create a pipeline
pipeline = Pipeline(stages=[encoder, assembler, dt])

# Create train and test datasets
train_data, test_data = df_indexed.randomSplit([0.7, 0.3], seed=42)

# Fit the pipeline model on the training data
pipeline_model = pipeline.fit(train_data)

# Use the pipeline model to make predictions
predictions = pipeline_model.transform(test_data)

# Show 5 rows of the predictions DataFrame
predictions.select("status_type_ind", "prediction").show(5)

# Create a MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="status_type_ind", predictionCol="prediction")

# Evaluate metrics
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Show metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Calculate and show Test Error
test_error = 1.0 - accuracy
print(f"Test Error: {test_error}")

# Stop the SparkSession
spark.stop()

root
 |-- status_id: string (nullable = true)
 |-- status_type: string (nullable = true)
 |-- status_published: string (nullable = true)
 |-- num_reactions: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- num_shares: integer (nullable = true)
 |-- num_likes: integer (nullable = true)
 |-- num_loves: integer (nullable = true)
 |-- num_wows: integer (nullable = true)
 |-- num_hahas: integer (nullable = true)
 |-- num_sads: integer (nullable = true)
 |-- num_angrys: integer (nullable = true)

+---------------+----------+
|status_type_ind|prediction|
+---------------+----------+
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
+---------------+----------+
only showing top 5 rows

Accuracy: 1.0
Precision: 1.0
Recall: 1.0
F1 Score: 1.0
Test Error: 0.0
