# Logistic Regression Model to Classify Real/Fake News Data

## Notebook Flow

- Load the Kaggle and API data from GCS to Mongo
- Combine them on Mongo
- Load random 100 rows from combined_data to notebook and fit a model
- Because it is random, I set a threshold of 50% for accuracy
- The function runs for 3 times for better score and breaks
- It then loads the test file, randomly picks 50 rows and predicts whether it is real/fake and creates a CSV file
- This predicted file is loaded back to Mongo & GCS

## Imports

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, length, monotonically_increasing_id
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pymongo
import random
import pandas as pd
import os
from datetime import datetime
from google.cloud import storage
from google.cloud import bigquery
import tempfile
from datetime import datetime

## Initializations

In [27]:
# # Initialize Spark Session
from pyspark.sql import SparkSession

# # Initialize Spark Session with optimized memory settings
spark = SparkSession.builder.appName('test').config("spark.driver.memory", "10g").getOrCreate()

25/03/02 21:20:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# Initialize Spark Session with GCP configurations
def initialize_spark_with_gcp():
    spark = SparkSession.builder \
        .appName('fake_news_classifier') \
        .config("spark.driver.memory", "10g") \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.google.cloud:google-cloud-storage:2.20.1") \
        .getOrCreate()
   
    return spark

# MongoDB connection parameters with GCP connection
MONGO_URI = "mongodb+srv://bayanenisamanvithachowdary:<PASSWORD>@samtesting.rmc9m.mongodb.net/"
DB_NAME = "fake_news_db"
KAGGLE_COLLECTION = "kaggle_data"
API_COLLECTION = "api_data"
COMBINED_COLLECTION = "combined_data"
PREDICTIONS_COLLECTION = "predictions"

# GCP Storage bucket information
GCP_BUCKET_NAME = "testing694"  # Replace with your GCP bucket name
GCS_KAGGLE_FILE = "kaggle_datasets_full.csv"  # Path in your bucket
GCS_API_FILE = "API_datasets_full.csv"  # Path in your bucket
GCS_GNEWS_FILE = "gnews_dataset.csv"  # Path in your bucket

# In your Python code, explicitly specify the path to your service account key
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/samanvitha/Downloads/crafty-academy-452602-i1-b2f4d1be2ef2.json"

## Functions to Execute the Flow

### Combining Data in MongoDB Atlas

In [29]:
def load_csv_from_gcs_to_mongodb(bucket_name, file_path, collection_name):
    """Load CSV data from Google Cloud Storage into MongoDB"""
    try:
        # Initialize GCS client
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)
        
        # Download to a temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
            blob.download_to_filename(temp_file.name)
            temp_filepath = temp_file.name
        
        # Read CSV with Spark
        spark = initialize_spark_with_gcp()
        df = spark.read.csv(temp_filepath, header=True, inferSchema=True).limit(1000)
        
        # Convert to pandas for easier MongoDB insertion
        pandas_df = df.toPandas()
        
        # Connect to MongoDB
        client = pymongo.MongoClient(MONGO_URI)
        db = client[DB_NAME]
        collection = db[collection_name]
        
        # Delete existing data
        collection.delete_many({})
        
        # Insert data
        collection.insert_many(pandas_df.to_dict('records'))
        
        # Clean up temp file
        os.unlink(temp_filepath)
        
        print(f"Loaded {len(pandas_df)} records from GCS:{bucket_name}/{file_path} to {collection_name}")
        client.close()
        return True
    
    except Exception as e:
        print(f"Error loading data from GCS to MongoDB: {str(e)}")
        return False

In [30]:
def combine_and_aggregate_mongodb_data():
    """Combine data from both collections and perform aggregation"""
    try:
        # Connect to MongoDB
        client = pymongo.MongoClient(MONGO_URI)
        db = client[DB_NAME]
        
        # Get data from both collections
        kaggle_data = list(db[KAGGLE_COLLECTION].find())
        api_data = list(db[API_COLLECTION].find())
        
        # Ensure we have common fields
        combined_data = []
        
        for item in kaggle_data:
            if 'text' in item and 'label' in item:
                # Basic cleaning
                if isinstance(item['text'], str) and len(item['text']) > 10:
                    combined_data.append({
                        'source': 'kaggle',
                        'text': item['text'],
                        'label': item['label'],
                        'timestamp': datetime.now()
                    })
        
        for item in api_data:
            if 'text' in item and 'label' in item:
                # Basic cleaning
                if isinstance(item['text'], str) and len(item['text']) > 10:
                    combined_data.append({
                        'source': 'api',
                        'text': item['text'],
                        'label': item['label'],
                        'timestamp': datetime.now()
                    })
        
        # Delete existing combined data
        db[COMBINED_COLLECTION].delete_many({})
        
        # Insert combined data
        if combined_data:
            db[COMBINED_COLLECTION].insert_many(combined_data)
            print(f"Combined {len(combined_data)} records in MongoDB")
        
        client.close()
        return True
    
    except Exception as e:
        print(f"Error combining data in MongoDB: {str(e)}")
        return False

### Load Sample from Combined Data into Notebook

In [31]:
def load_random_sample_from_mongodb(collection_name, sample_size=1000):
    """Load a random sample from MongoDB to Spark DataFrame"""
    try:
        # Connect to MongoDB
        client = pymongo.MongoClient(MONGO_URI)
        db = client[DB_NAME]
        collection = db[collection_name]
        
        # Count documents
        total_docs = collection.count_documents({})
        
        if total_docs == 0:
            print(f"No documents found in {collection_name}")
            return None
        
        # Determine sample size
        actual_sample_size = min(sample_size, total_docs)
        
        # Get random sample
        random_sample = list(collection.aggregate([
            {"$match": {"text": {"$exists": True}, "label": {"$exists": True}}},
            {"$sample": {"size": actual_sample_size}}
        ]))
        
        # Convert to pandas DataFrame
        pandas_df = pd.DataFrame(random_sample)
        
        # Keep only necessary columns
        if 'text' in pandas_df.columns and 'label' in pandas_df.columns:
            pandas_df = pandas_df[['text', 'label']]
            
            # Convert to Spark DataFrame
            spark_df = spark.createDataFrame(pandas_df)
            print(f"Loaded {len(pandas_df)} random samples from {collection_name}")
            
            client.close()
            return spark_df
        else:
            print("Required columns not found in the data")
            client.close()
            return None
    
    except Exception as e:
        print(f"Error loading random sample from MongoDB: {str(e)}")
        return None

### Training & Evaluating Logistic Regression Model

In [32]:
def train_and_evaluate_model(df, iterations=1):
    """Train and evaluate the model with multiple attempts if needed"""
    best_accuracy = 0
    best_model = None
    
    for i in range(iterations):
        try:
            print(f"Training attempt {i+1}/{iterations}")
            
            # Clean data
            clean_df = df.dropna(subset=["text", "label"])
            clean_df = clean_df.filter(length(col("text")) > 10)
            clean_df = clean_df.withColumn("text", lower(col("text")))
            
            # Convert string labels to numeric indices
            label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel").setHandleInvalid("skip")
            
            # Text processing pipeline
            tokenizer = Tokenizer(inputCol="text", outputCol="words")
            remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
            
            # Use CountVectorizer
            vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features", 
                                       vocabSize=1000, minDF=2.0)
            
            # Logistic Regression model
            lr = LogisticRegression(maxIter=15, regParam=0.05, elasticNetParam=0.7,
                                  labelCol="indexedLabel", featuresCol="features")
            
            # Create pipeline
            pipeline = Pipeline(stages=[label_indexer, tokenizer, remover, vectorizer, lr])
            
            # Split the data - use different random seeds for each attempt
            train_data, test_data = clean_df.randomSplit([0.8, 0.2], seed=42+i)
            
            # Fit the model
            model = pipeline.fit(train_data)
            
            # Make predictions
            predictions = model.transform(test_data)
            
            # Evaluate model
            evaluator = MulticlassClassificationEvaluator(
                labelCol="indexedLabel", 
                predictionCol="prediction", 
                metricName="accuracy"
            )
            
            accuracy = evaluator.evaluate(predictions)
            print(f"Model Accuracy (Attempt {i+1}): {accuracy:.4f}")
            
            # Keep track of the best model
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_model = model
                
            # If accuracy is good enough, break early
            if accuracy > 0.65:
                break
                
        except Exception as e:
            print(f"Error in training attempt {i+1}: {str(e)}")
    
    return best_model, best_accuracy

### Predictions on GNews Data

In [36]:
def predict_gnews_data_from_gcs_in_memory(model, accuracy, bucket_name=GCP_BUCKET_NAME, file_path=GCS_GNEWS_FILE):
    """Make predictions on gnews data using in-memory approach"""
    if accuracy <= 0.5:
        print("Model accuracy below threshold (0.5). Skipping predictions.")
        return False
    try:
        # Initialize GCS client
        from google.cloud import storage
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(file_path)
        
        print(f"Downloading content from GCS: {bucket_name}/{file_path}")
        # Download content directly to memory
        content = blob.download_as_string()
        
        # Use StringIO to create a file-like object
        import io
        csv_file = io.StringIO(content.decode('utf-8'))
        
        # Read with pandas
        print("Loading data with pandas")
        pandas_df = pd.read_csv(csv_file)
        
        # Ensure we have the required columns
        required_columns = ["text"]
        if not all(col in pandas_df.columns for col in required_columns):
            print(f"GNews dataset missing required columns. Available columns: {pandas_df.columns}")
            return False
        
        # Select only needed columns and limit to 50 rows
        available_columns = ["text"]
        if "source" in pandas_df.columns:
            available_columns.append("source")
            
        pandas_sample = pandas_df[available_columns].head(50)
        
        # Add a dummy label column (required by the model)
        pandas_sample['label'] = 0
        
        # Basic text cleaning for prediction
        pandas_sample = pandas_sample[pandas_sample['text'].str.len() > 10]
        pandas_sample['text'] = pandas_sample['text'].str.lower()
        
        # Initialize Spark
        spark = initialize_spark_with_gcp()
        
        # Convert to Spark DataFrame
        print("Converting to Spark DataFrame")
        gnews_sample = spark.createDataFrame(pandas_sample)
        
        # Make predictions
        print("Making predictions")
        predictions = model.transform(gnews_sample)
        
        # Get the label indexer to map back prediction indices to original labels
        from pyspark.ml.feature import StringIndexer
        label_indexer_model = None
        for stage in model.stages:
            if isinstance(stage, StringIndexer):
                label_indexer_model = stage
                break
        
        # Select relevant columns including predicted label
        from pyspark.sql.functions import col
        select_cols = [col(c) for c in available_columns]
        select_cols.append(col("prediction").alias("predicted_label_idx"))
        
        # Add original column names back to result
        result_df = predictions.select(*select_cols)
        
        # If we have the label indexer model, map predictions back to original labels
        if label_indexer_model is not None:
            # Get the mapping of index to label
            label_mapping = {idx: label for idx, label in enumerate(label_indexer_model.labels)}
            
            # Register user-defined function to map indices to labels
            from pyspark.sql.functions import udf
            from pyspark.sql.types import StringType
            
            @udf(StringType())
            def idx_to_label(idx):
                return label_mapping.get(float(idx), "unknown")
            
            result_df = result_df.withColumn("predicted_label", idx_to_label(col("predicted_label_idx")))
            result_df = result_df.drop("predicted_label_idx")
        
        # Collect results from Spark (force evaluation)
        print("Collecting results")
        results_pandas = result_df.toPandas()
        
        # Save results back to GCS
        print("Saving results back to GCS")
        output_filename = f"gnews_predictions_{int(accuracy*100)}.csv"
        csv_buffer = io.StringIO()
        results_pandas.to_csv(csv_buffer, index=False)
        
        # Upload the results
        output_blob = bucket.blob(f"Task2/results/{output_filename}")
        output_blob.upload_from_string(csv_buffer.getvalue())
        
        # Save to MongoDB
        print("Saving to MongoDB")
        client = pymongo.MongoClient(MONGO_URI)
        db = client[DB_NAME]
        
        # Delete existing predictions
        db[PREDICTIONS_COLLECTION].delete_many({})
        
        # Insert new predictions
        db[PREDICTIONS_COLLECTION].insert_many(results_pandas.to_dict('records'))
        
        print(f"Saved {len(results_pandas)} predictions to MongoDB and GCS at {bucket_name}/Task2/results/{output_filename}")
        client.close()
        return True
        
    except Exception as e:
        print(f"Error making predictions: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

### Execution of the Flow

In [37]:
def main_with_gcp():
    """Main workflow using GCP"""
    # Step 1: Load data from GCS into MongoDB
    kaggle_loaded = load_csv_from_gcs_to_mongodb(
        GCP_BUCKET_NAME, 
        GCS_KAGGLE_FILE, 
        KAGGLE_COLLECTION,
        
    )
    
    api_loaded = load_csv_from_gcs_to_mongodb(
        GCP_BUCKET_NAME, 
        GCS_API_FILE, 
        API_COLLECTION
    )

    api_loaded = load_csv_from_gcs_to_mongodb(
        GCP_BUCKET_NAME, 
        GCS_GNEWS_FILE, 
        API_COLLECTION
    )
    
    if not (kaggle_loaded and api_loaded):
        print("Failed to load data from GCS to MongoDB. Exiting.")
        return
    
    # Step 2: Combine and aggregate data (unchanged from your original code)
    if not combine_and_aggregate_mongodb_data():
        print("Failed to combine data in MongoDB. Exiting.")
        return
    
    # Step 3: Load random sample for training (unchanged)
    train_df = load_random_sample_from_mongodb(COMBINED_COLLECTION, 100)
    
    if train_df is None:
        print("Failed to load training sample. Exiting.")
        return
    
    # Step 4: Train and evaluate model (unchanged)
    model, accuracy = train_and_evaluate_model(train_df, iterations=3)
    
    if model is None:
        print("Failed to train model. Exiting.")
        return
        
    print(f"Best model accuracy: {accuracy:.4f}")
    
    # Step 5: If accuracy > 0.5, make predictions on gnews data from GCS
    if accuracy > 0.5:
        predict_gnews_data_from_gcs_in_memory(model, accuracy)
    else:
        print("Model accuracy below threshold. Loading more data...")
        # Load more data
        larger_train_df = load_random_sample_from_mongodb(COMBINED_COLLECTION, 1000)
        
        if larger_train_df is not None:
            # Try again with more data
            model, accuracy = train_and_evaluate_model(larger_train_df, iterations=2)
            
            if model is not None and accuracy > 0.5:
                predict_gnews_data_from_gcs_in_memory(model, accuracy)
            else:
                print("Still couldn't achieve accuracy > 0.5. Stopping.")
        else:
            print("Failed to load larger training sample. Exiting.")

if __name__ == "__main__":
    main_with_gcp()

                                                                                

Loaded 1000 records from GCS:testing694/kaggle_datasets_full.csv to kaggle_data
Loaded 1000 records from GCS:testing694/API_datasets_full.csv to api_data
Loaded 1000 records from GCS:testing694/gnews_dataset.csv to api_data
Combined 959 records in MongoDB
Loaded 100 random samples from combined_data
Training attempt 1/3
Model Accuracy (Attempt 1): 0.5000
Training attempt 2/3
Model Accuracy (Attempt 2): 0.4000
Training attempt 3/3
Model Accuracy (Attempt 3): 1.0000
Best model accuracy: 1.0000
Downloading content from GCS: testing694/gnews_dataset.csv
Loading data with pandas
Converting to Spark DataFrame
Making predictions
Collecting results
Saving results back to GCS
Saving to MongoDB
Saved 50 predictions to MongoDB and GCS at testing694/Task2/results/gnews_predictions_100.csv
