In [0]:
# DATABRICKS: Complete Simplified Pipeline (No ML Processing)

from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
import re

print("🚀 Setting up simplified raw data collection pipeline...")


ehConf = {
    'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONNECTION_STRING),
    'eventhubs.consumerGroup': "$Default",
    'eventhubs.startingPosition': json.dumps({
        "offset": "-1",
        "seqNo": -1,
        "enqueuedTime": None,
        "isInclusive": False
    })
}

# JSON Schema (keep unchanged)
json_schema = StructType([
    StructField("post_id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("text_preview", StringType(), True),
    StructField("author", StringType(), True),
    StructField("author_display_name", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("hashtags", ArrayType(StringType()), True),
    StructField("mentions", ArrayType(StringType()), True),
    StructField("engagement", StructType([
        StructField("likes", IntegerType(), True),
        StructField("reposts", IntegerType(), True),
        StructField("replies", IntegerType(), True)
    ]), True),
    StructField("collection_metadata", StructType([
        StructField("collected_at", StringType(), True),
        StructField("collector_version", StringType(), True),
        StructField("source", StringType(), True)
    ]), True)
])

def deep_clean_text(text):
    if text is None:
        return None
    cleaned = re.sub(r'#\w+', '', text)
    cleaned = re.sub(r'http\S+|www.\S+', '', cleaned)
    cleaned = re.sub(r'@\w+', '', cleaned)
    cleaned = ' '.join(cleaned.split())
    return cleaned.strip()

deep_clean_udf = udf(deep_clean_text, StringType())

raw_stream_df = (
    spark
    .readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
    
    # Parse JSON
    .select(
        from_json(col("body").cast("string"), json_schema).alias("data"),
        col("enqueuedTime").alias("eventhub_enqueued_time"),
        col("offset").alias("eventhub_offset"),
        col("partition").alias("eventhub_partition")
    )
    .select(
        col("data.*"),
        col("eventhub_enqueued_time"),
        col("eventhub_offset"),
        col("eventhub_partition")
    )
    
    # Basic cleaning and filtering (NO ML)
    .withColumn("text_cleaned", deep_clean_udf(col("text")))
    .withColumn("text_preview_cleaned", deep_clean_udf(col("text_preview")))
    .filter(col("text").isNotNull())
    .filter(length(col("text")) >= 20)  # Keep decent posts
    .filter(col("author").isNotNull())
    
    # Add derived columns (NO SENTIMENT ANALYSIS)
    .withColumn("received_at", current_timestamp())
    .withColumn("text_length", length(col("text")))
    .withColumn("text_cleaned_length", length(col("text_cleaned")))
    .withColumn("hashtag_count", size(col("hashtags")))
    .withColumn("mention_count", size(col("mentions")))
    .withColumn("total_engagement",
        coalesce(col("engagement.likes"), lit(0)) +
        coalesce(col("engagement.replies"), lit(0)) * 2 +
        coalesce(col("engagement.reposts"), lit(0)) * 1.5
    )
    
    .dropDuplicates(["post_id"])
    
    .select(
        col("post_id"),
        col("text"),                    # Original text with hashtags
        col("text_cleaned"),            # Cleaned text for ML
        col("text_preview"),
        col("author"),
        col("author_display_name"),
        col("created_at"),
        col("hashtags"),
        col("mentions"),
        col("engagement"),
        col("collection_metadata"),
        col("received_at"),
        col("text_length"),
        col("text_cleaned_length"),
        col("hashtag_count"),
        col("mention_count"),
        col("total_engagement"),
        col("eventhub_enqueued_time"),
        col("eventhub_offset"),
        col("eventhub_partition")
    )
)

# New paths for raw data
RAW_CHECKPOINT_PATH = "/mnt/delta/checkpoints/bluesky_raw_posts"
RAW_DELTA_PATH = "/mnt/delta/tables/bluesky_raw_posts"

# Start streaming query (should be fast without ML)
raw_query = (
    raw_stream_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", RAW_CHECKPOINT_PATH)
    .option("path", RAW_DELTA_PATH)
    .option("mergeSchema", "true")
    .trigger(processingTime="10 seconds")
    .start()
)

print("✅ Simplified raw data pipeline started!")
print(f"🆔 Query ID: {raw_query.id}")
print(f"📁 Raw data location: {RAW_DELTA_PATH}")
print("🎯 Collecting raw data for college cluster ML processing")

# Wait a moment then create table
import time
time.sleep(30)

# Create database and table
spark.sql("CREATE DATABASE IF NOT EXISTS social_media")
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS social_media.bluesky_raw_posts
    USING DELTA
    LOCATION '{RAW_DELTA_PATH}'
""")

print("📊 Raw posts table created: social_media.bluesky_raw_posts")

# Check pipeline status
time.sleep(30)
if raw_query.lastProgress:
    input_rate = raw_query.lastProgress.get('inputRowsPerSecond', 0)
    processed_rate = raw_query.lastProgress.get('processedRowsPerSecond', 0)
    print(f"📈 Processing: {input_rate} input/sec, {processed_rate} processed/sec")
    
    if input_rate > 0:
        print("✅ Event Hubs data flowing successfully!")
    else:
        print("⚠️ No input data - check Azure Function")

try:
    count_query = spark.sql("SELECT COUNT(*) as count FROM social_media.bluesky_raw_posts")
    row_count = count_query.collect()[0]['count']
    print(f"📊 Raw posts table now has: {row_count} posts")
    
    if row_count > 0:
        print("🎉 SUCCESS: Raw data pipeline working!")
        print("🚀 Ready for college cluster connection test")
        
        # Show sample data
        sample = spark.sql("""
            SELECT post_id, author, text, hashtags, received_at
            FROM social_media.bluesky_raw_posts
            ORDER BY received_at DESC
            LIMIT 5
        """)
        print("\n📋 Sample raw posts:")
        sample.show(truncate=False)
    else:
        print("⏳ Waiting for data to flow...")
        
except Exception as e:
    print(f"⚠️ Table check failed: {e}")
    print("Table may still be initializing...")

print("\n" + "="*60)
print("🎯 DATABRICKS SIMPLIFIED PIPELINE COMPLETE!")
print("Next: Test college cluster connection to this table")
import requests
import json
import base64
from datetime import datetime, timedelta  # <-- This import is missing
import time
print("="*60)

🚀 Setting up simplified raw data collection pipeline...
✅ Simplified raw data pipeline started!
🆔 Query ID: c55441f4-b701-440a-8a60-e040814d5b2f
📁 Raw data location: /mnt/delta/tables/bluesky_raw_posts
🎯 Collecting raw data for college cluster ML processing
📊 Raw posts table created: social_media.bluesky_raw_posts
📈 Processing: 0.0 input/sec, 68.87052341597796 processed/sec
⚠️ No input data - check Azure Function
📊 Raw posts table now has: 9375 posts
🎉 SUCCESS: Raw data pipeline working!
🚀 Ready for college cluster connection test

📋 Sample raw posts:
+-------------+---------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+-----------------------+
|post_id      |author                           |text                                         

In [0]:
class SimpleBatchExporter:
    
    def __init__(self, github_username, repo_name, personal_token):
        self.username = github_username
        self.repo_name = repo_name
        self.token = personal_token
        self.api_base = f"https://api.github.com/repos/{github_username}/{repo_name}"
        self.batch_size = 50
        
    def setup_export_column_once(self):
        """One-time setup: Add export tracking column (Delta Lake compatible)"""
        try:
            print("📋 Current table schema:")
            current_schema = spark.sql("DESCRIBE social_media.bluesky_raw_posts")
            current_schema.show()
            
            # Check if column already exists
            columns = [row.col_name for row in current_schema.collect()]
            if 'exported_to_github' in columns:
                print("✅ exported_to_github column already exists!")
            else:
                print("🔧 Step 1: Adding exported_to_github column...")
                spark.sql("""
                    ALTER TABLE social_media.bluesky_raw_posts 
                    ADD COLUMN exported_to_github BOOLEAN
                """)
                print("✅ Column added!")
            
            print("⏭️ Skipping default value setup (not needed)")
            
            # Step 2: Initialize all existing posts as not exported
            print("🔄 Initializing all existing posts as FALSE...")
            spark.sql("""
                UPDATE social_media.bluesky_raw_posts 
                SET exported_to_github = FALSE 
                WHERE exported_to_github IS NULL
            """)
            
            print("✅ All existing posts marked as not exported!")
            
            # Verify final schema
            print("📋 Final table schema:")
            updated_schema = spark.sql("DESCRIBE social_media.bluesky_raw_posts")
            updated_schema.show()
            
            # Show counts
            self.show_progress()
            
        except Exception as e:
            print(f"⚠️ Column setup error: {e}")
            print("🔍 Error details:", str(e))
    
    def export_next_batch(self):
        """Export next 50 unmarked posts (or whatever's available)"""
        
        print(f"🔄 Looking for next {self.batch_size} unmarked posts...")
        
        # Dead simple query: oldest unmarked posts first
        query = f"""
        SELECT 
            post_id, text, text_cleaned, text_preview, author, author_display_name,
            created_at, hashtags, mentions, engagement, collection_metadata,
            received_at, text_length, text_cleaned_length, hashtag_count,
            mention_count, total_engagement
        FROM social_media.bluesky_raw_posts
        WHERE exported_to_github = FALSE OR exported_to_github IS NULL
        ORDER BY received_at ASC
        LIMIT {self.batch_size}
        """
        
        try:
            df = spark.sql(query).toPandas()
            
            if len(df) == 0:
                print("🎉 All posts exported! Nothing left to process.")
                return False
                
            print(f"📤 Found {len(df)} unmarked posts to export")
            
            # Get post IDs for marking later
            post_ids = df['post_id'].tolist()
            
            # Create batch
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            
            batch_data = {
                'batch_info': {
                    'timestamp': timestamp,
                    'post_count': len(df),
                    'batch_size_requested': self.batch_size,
                    'batch_size_actual': len(df),
                    'export_method': 'simple_batch_oldest_first',
                    'posts_remaining_estimate': self.estimate_remaining_posts()
                },
                'posts': df.to_dict('records')
            }
            
            # Upload to GitHub
            success = self.upload_to_github(batch_data, timestamp)
            
            if success:
                # Mark these posts as exported
                self.mark_posts_exported(post_ids)
                print(f"✅ Batch exported: {len(df)} posts")
                print(f"🏷️ Marked {len(post_ids)} posts as exported")
                
                # Show progress
                self.show_progress()
                return True
            else:
                print("❌ Upload failed - posts remain unmarked")
                return False
                
        except Exception as e:
            print(f"❌ Export batch failed: {e}")
            return False
    
    def mark_posts_exported(self, post_ids):
        """Mark specific posts as exported"""
        try:
            # Convert to SQL format
            id_list = "', '".join(post_ids)
            
            spark.sql(f"""
                UPDATE social_media.bluesky_raw_posts 
                SET exported_to_github = TRUE 
                WHERE post_id IN ('{id_list}')
            """)
            
            print(f"✅ {len(post_ids)} posts marked as exported")
            
        except Exception as e:
            print(f"❌ Failed to mark posts: {e}")
    
    def estimate_remaining_posts(self):
        """Quick count of remaining posts"""
        try:
            result = spark.sql("""
                SELECT COUNT(*) as remaining 
                FROM social_media.bluesky_raw_posts 
                WHERE exported_to_github = FALSE OR exported_to_github IS NULL
            """).collect()
            
            return result[0]['remaining']
            
        except:
            return "unknown"
    
    def show_progress(self):
        """Show current export progress"""
        try:
            progress = spark.sql("""
                SELECT 
                    COUNT(*) as total_posts,
                    COUNT(CASE WHEN exported_to_github = TRUE THEN 1 END) as exported,
                    COUNT(CASE WHEN exported_to_github = FALSE OR exported_to_github IS NULL THEN 1 END) as remaining,
                    ROUND(COUNT(CASE WHEN exported_to_github = TRUE THEN 1 END) * 100.0 / COUNT(*), 1) as percent_done
                FROM social_media.bluesky_raw_posts
            """).collect()[0]
            
            print(f"📊 Progress: {progress['exported']}/{progress['total_posts']} posts exported ({progress['percent_done']}%)")
            print(f"📋 Remaining: {progress['remaining']} posts")
            
            if progress['remaining'] > 0:
                batches_remaining = (progress['remaining'] + self.batch_size - 1) // self.batch_size
                print(f"⏱️ Est. {batches_remaining} more batches needed")
                
        except Exception as e:
            print(f"⚠️ Progress check failed: {e}")
    
    def upload_to_github(self, batch_data, timestamp):
        """Upload batch to GitHub"""
        try:
            import base64
            
            json_content = json.dumps(batch_data, indent=2, default=str)
            encoded_content = base64.b64encode(json_content.encode()).decode()
            
            filename = f"data/incremental/batch_{timestamp}.json"
            
            headers = {
                "Authorization": f"token {self.token}",
                "Accept": "application/vnd.github.v3+json"
            }
            
            payload = {
                "message": f"Simple batch: {batch_data['batch_info']['post_count']} posts ({timestamp})",
                "content": encoded_content,
                "branch": "main"
            }
            
            upload_url = f"{self.api_base}/contents/{filename}"
            response = requests.put(upload_url, headers=headers, json=payload)
            
            if response.status_code in [200, 201]:
                print(f"✅ Uploaded: {filename}")
                return True
            else:
                print(f"❌ Upload failed: {response.status_code}")
                return False
                
        except Exception as e:
            print(f"❌ Upload error: {e}")
            return False

# =============================================================================
# AUTOMATED 10-MINUTE BATCH PROCESSING
# =============================================================================

def automated_batch_export():
    """Run batch exports every 10 minutes"""
    
    exporter = SimpleBatchExporter(GITHUB_USERNAME, REPO_NAME, GITHUB_TOKEN)
    
    # One-time setup
    exporter.setup_export_column_once()
    
    print("🚀 Starting automated 10-minute batch exports...")
    print(f"📦 Batch size: {exporter.batch_size} posts per export")
    
    while True:
        try:
            print(f"\n{'='*60}")
            print(f"⏰ Batch export at {datetime.now()}")
            print(f"{'='*60}")
            
            success = exporter.export_next_batch()
            
            if success:
                print("✅ Batch export completed successfully")
            else:
                print("📭 No posts to export - all caught up!")
                
        except Exception as e:
            print(f"❌ Batch export error: {e}")
        
        print(f"⏸️ Waiting 10 minutes until next batch...")
        time.sleep(600)  # 10 minutes

# Configuration
GITHUB_USERNAME = "AlexanderHuynhKoehler" 
REPO_NAME = "bluesky-data-pipeline"
GITHUB_TOKEN = "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"

exporter = SimpleBatchExporter(GITHUB_USERNAME, REPO_NAME, GITHUB_TOKEN)


print("🧪 Testing single batch export...")
success = exporter.export_next_batch()

if success:
    print("🎉 Single batch test successful!")
    print("\n🤖 Ready for automated 10-minute exports:")
else:
    print("❌ Test failed or no posts to export")

📋 Current table schema:
+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|             post_id|              string|   NULL|
|                text|              string|   NULL|
|        text_cleaned|              string|   NULL|
|        text_preview|              string|   NULL|
|              author|              string|   NULL|
| author_display_name|              string|   NULL|
|          created_at|              string|   NULL|
|            hashtags|       array<string>|   NULL|
|            mentions|       array<string>|   NULL|
|          engagement|struct<likes:int,...|   NULL|
| collection_metadata|struct<collected_...|   NULL|
|         received_at|           timestamp|   NULL|
|         text_length|                 int|   NULL|
| text_cleaned_length|                 int|   NULL|
|       hashtag_count|                 int|   NULL|
|       mention_count|                 i

In [0]:
automated_batch_export()