In [0]:
%run ./01_Environment_Setup

🔧 CURRENT CONFIGURATION STATUS:
   Storage Account: irishhealthdata ✅
   Database: supply_chain_analysis ✅
   MLflow Experiment: /Users/u1025325052@gmail.com/supply_chain_risk_prediction ✅
   GNews API Key: ✅ CONFIGURED


* 'schema_extra' has been renamed to 'json_schema_extra'


✅ All libraries imported successfully!


🔧 Azure Environment Configuration:
   resource_group: irish-healthcare-agents-west-europe
   databricks_workspace: irish-healthcare-db
   storage_account: irishhealthdata
   location: westeurope
   container_name: supply-chain-data


✅ Container already exists: supply-chain-data
✅ Azure Storage connection established successfully!


✅ Database 'supply_chain_analysis' created


✅ Bronze layer tables created




In [0]:
# Additional imports for data ingestion
import requests
from datetime import datetime
import time
from pyspark.sql.functions import lit, udf
from pyspark.sql.types import DoubleType, StringType, TimestampType, IntegerType, ArrayType

print("✅ Environment imported from Notebook 1")

✅ Environment imported from Notebook 1


✅ Ports dimension table created with 17 major global ports


port_name,latitude,longitude,country,port_size,capacity_rating,region
Shanghai,31.2304,121.4737,China,Major,Very High,Asia
Singapore,1.2644,103.822,Singapore,Major,Very High,Asia
Shenzhen,22.5431,114.0579,China,Major,High,Asia
Ningbo-Zhoushan,29.8686,121.5433,China,Major,High,Asia
Hong Kong,22.3193,114.1694,China,Major,High,Asia
Busan,35.1796,129.0756,South Korea,Major,High,Asia
Qingdao,36.0671,120.3826,China,Major,Medium,Asia
Rotterdam,51.9225,4.47917,Netherlands,Major,High,Europe
Antwerp,51.2291,4.4053,Belgium,Major,High,Europe
Hamburg,53.5511,9.9937,Germany,Major,High,Europe


✅ MLflow experiment configured: /Users/u1025325052@gmail.com/supply_chain_risk_prediction


🔍 Running Environment Validation...
✅ Test 1: Database accessible
✅ Test 2: 3 tables exist
✅ Test 3: MLflow experiment configured
✅ Test 4: Azure Storage connected
✅ Test 5: Spark operations working

🎯 Validation Results: 5/5 tests passed
🚀 Environment is READY for data ingestion!


💾 Saving configuration with ACTUAL values:
   • GNews API Key in config: ✅ PRESENT
   • GNews API Key value: cd8e55ad6d...
   • GDACS URL: https://www.gdacs.org/gdacsapi/api/events/geteventlist/SEARCH
✅ Configuration saved to: /dbfs/FileStore/supply_chain/config.json

🔍 VERIFYING SAVED CONFIGURATION:
   • GNews API Key saved: cd8e55ad6d...
   • GDACS URL saved: https://www.gdacs.org/gdacsapi/api/events/geteventlist/SEARCH



    🎉 ENVIRONMENT SETUP COMPLETE!
    
    Next steps:
    1. ✅ Replace GNews API key in the configuration
    2. ➡️ Proceed to Notebook 2: Data Ingestion Pipeline
    3. 🔧 Configure Azure OpenAI (optional for now)
    
    Your Azure Resources:
    • Resource Group: irish-healthcare-agents-west-europe
    • Databricks: irish-healthcare-db
    • Storage: irishhealthdata
    • Location: West Europe
    


🔧 CURRENT CONFIGURATION STATUS:
   Storage Account: irishhealthdata ✅
   Database: supply_chain_analysis ✅
   MLflow Experiment: /Users/u1025325052@gmail.com/supply_chain_risk_prediction ✅
   GNews API Key: ✅ CONFIGURED


In [0]:
GNEWS_API_KEY = ""  # ← REPLACE THIS WITH YOUR REAL GNEWS KEY!
GDACS_URL = "https://www.gdacs.org/gdacsapi/api/events/geteventlist/SEARCH"

print("🔧 USING DIRECT API CONFIGURATION:")
print(f"   • GNews API: {'✅ CONFIGURED' if GNEWS_API_KEY != 'YOUR_ACTUAL_GNEWS_API_KEY_HERE' else '❌ NOT CONFIGURED - REPLACE THE KEY!'}")
print(f"   • GDACS API: ✅ READY")

if GNEWS_API_KEY == "YOUR_ACTUAL_GNEWS_API_KEY_HERE":
    print("\n🚨 ACTION REQUIRED:")
    print("   Please replace 'YOUR_ACTUAL_GNEWS_API_KEY_HERE' with your real GNews API key")
    print("   Get your free API key from: https://gnews.io/")


🔧 USING DIRECT API CONFIGURATION:
   • GNews API: ✅ CONFIGURED
   • GDACS API: ✅ READY


In [0]:
# Define consistent schemas for Delta tables
gdacs_bronze_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("severity", StringType(), True),
    StructField("country", StringType(), True),
    StructField("start_date", TimestampType(), True),
    StructField("end_date", TimestampType(), True),
    StructField("alert_level", StringType(), True),
    StructField("population_affected", IntegerType(), True),
    StructField("insert_timestamp", TimestampType(), True),
    StructField("data_source", StringType(), True)
])

news_bronze_schema = StructType([
    StructField("article_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("content", StringType(), True),
    StructField("published_at", TimestampType(), True),
    StructField("source", StringType(), True),
    StructField("url", StringType(), True),
    StructField("keywords", ArrayType(StringType()), True),
    StructField("sentiment_score", DoubleType(), True),
    StructField("insert_timestamp", TimestampType(), True),
    StructField("data_source", StringType(), True)
])

# Create tables with explicit schemas
def create_bronze_tables_with_schema():
    """Create bronze tables with explicit schemas"""
    
    spark.sql("DROP TABLE IF EXISTS supply_chain_analysis.bronze_gdacs_alerts")
    spark.sql("DROP TABLE IF EXISTS supply_chain_analysis.bronze_supply_chain_news")
    
    spark.createDataFrame([], gdacs_bronze_schema) \
        .write.format("delta").saveAsTable("supply_chain_analysis.bronze_gdacs_alerts")
    
    spark.createDataFrame([], news_bronze_schema) \
        .write.format("delta").saveAsTable("supply_chain_analysis.bronze_supply_chain_news")
    
    print("✅ Bronze tables created with explicit schemas")

create_bronze_tables_with_schema()

✅ Bronze tables created with explicit schemas


In [0]:
def fetch_real_gdacs_disasters():
    """Fetch REAL disaster data from GDACS API - No fallback to sample data"""
    try:
        print("🌪️ Fetching REAL disaster data from GDACS API...")
        
        # Parameters for recent events - broader search to get real data
        params = {
            "fromDate": (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),  # Last 30 days
            "toDate": datetime.now().strftime("%Y-%m-%d"),
            "alertLevel": "Green;Orange;Red",  # Include all alert levels
            "eventType": "EQ;TC;FL;VO;DR;WF"  # All event types
        }
        
        print(f"   • API Endpoint: {GDACS_URL}")
        print(f"   • Search Period: Last 30 days")
        print(f"   • Alert Levels: All")
        
        response = requests.get(GDACS_URL, params=params, timeout=30)
        print(f"   • Response Status: {response.status_code}")
        
        if response.status_code == 200:
            data = response.json()
            features = data.get('features', [])
            print(f"   • Found {len(features)} features in API response")
            
            events = []
            for i, feature in enumerate(features):
                props = feature.get('properties', {})
                geometry = feature.get('geometry', {})
                
                # Parse coordinates
                coords = geometry.get('coordinates', [0, 0])
                longitude = float(coords[0]) if len(coords) > 0 else 0.0
                latitude = float(coords[1]) if len(coords) > 1 else 0.0
                
                # Parse dates
                start_date_str = props.get('fromdate', '')
                end_date_str = props.get('todate', '')
                
                try:
                    start_date = datetime.fromisoformat(start_date_str.replace('Z', '+00:00')) if start_date_str else datetime.now()
                    end_date = datetime.fromisoformat(end_date_str.replace('Z', '+00:00')) if end_date_str else datetime.now()
                except:
                    start_date = datetime.now()
                    end_date = datetime.now()
                
                # Only include events with valid coordinates
                if latitude != 0.0 and longitude != 0.0:
                    event = (
                        props.get('eventid', f"GDACS_{i}"),
                        props.get('eventtype', 'Unknown'),
                        props.get('name', 'Unnamed Event'),
                        float(latitude),
                        float(longitude),
                        props.get('severity', 'Unknown'),
                        props.get('country', 'Unknown'),
                        start_date,
                        end_date,
                        props.get('alertlevel', 'Green'),
                        int(props.get('population', 0)),
                        datetime.now(),
                        'GDACS_API_REAL'
                    )
                    events.append(event)
            
            if events:
                df = spark.createDataFrame(events, gdacs_bronze_schema)
                print(f"✅ Successfully collected {len(events)} REAL disaster events")
                
                # Show statistics about collected data
                event_types = df.select("event_type").distinct().collect()
                countries = df.select("country").distinct().collect()
                alert_levels = df.select("alert_level").distinct().collect()
                
                print(f"   • Event Types: {[row.event_type for row in event_types]}")
                print(f"   • Countries: {[row.country for row in countries[:5]]}{'...' if len(countries) > 5 else ''}")
                print(f"   • Alert Levels: {[row.alert_level for row in alert_levels]}")
                
                return df
            else:
                print("❌ No valid events found in GDACS API response")
                print("   • This could mean:")
                print("     - No recent disasters in the search criteria")
                print("     - API response format changed")
                print("     - Network/connectivity issues")
                
                # Create empty DataFrame with proper schema instead of sample data
                empty_df = spark.createDataFrame([], gdacs_bronze_schema)
                print("   • Returning empty DataFrame to maintain data integrity")
                return empty_df
                
        else:
            print(f"❌ GDACS API returned error status: {response.status_code}")
            if hasattr(response, 'text'):
                print(f"   • Response: {response.text[:200]}...")
            
            # Return empty DataFrame instead of sample data
            empty_df = spark.createDataFrame([], gdacs_bronze_schema)
            return empty_df
            
    except Exception as e:
        print(f"❌ GDACS API error: {str(e)}")
        
        # Return empty DataFrame instead of sample data
        empty_df = spark.createDataFrame([], gdacs_bronze_schema)
        return empty_df

In [0]:
# MAGIC %md
# MAGIC ## 📰 REAL GNews Data Collection - COMPLETELY FIXED

# COMMAND ----------

def fetch_real_supply_chain_news():
    """Fetch REAL news from GNews API - COMPLETELY FIXED VERSION"""
    # Check if API key is properly configured
    if GNEWS_API_KEY == "YOUR_ACTUAL_GNEWS_API_KEY_HERE":
        print("❌ GNews API key not configured")
        print("   • Please replace 'YOUR_ACTUAL_GNEWS_API_KEY_HERE' with your real API key")
        empty_df = spark.createDataFrame([], news_bronze_schema)
        return empty_df
    
    try:
        print("📰 Fetching REAL supply chain news from GNews API...")
        print(f"   • API Key: {GNEWS_API_KEY[:8]}...{GNEWS_API_KEY[-4:]}")
        
        url = "https://gnews.io/api/v4/search"
        all_articles = []
        
        # Search queries for supply chain issues
        search_queries = [
            "port congestion shipping",
            "supply chain disruption",
            "maritime logistics",
            "shipping delays", 
            "container shipping"
        ]
        
        for query in search_queries:
            params = {
                'q': query,
                'lang': 'en',
                'max': 10,
                'apikey': GNEWS_API_KEY
            }
            
            print(f"   • Searching: '{query}'")
            
            try:
                response = requests.get(url, params=params, timeout=15)
                print(f"      • Status: {response.status_code}")
                
                if response.status_code == 200:
                    data = response.json()
                    articles = data.get('articles', [])
                    all_articles.extend(articles)
                    print(f"      ✅ Found {len(articles)} articles")
                    
                    if articles:
                        title = articles[0].get('title', 'No title')
                        print(f"      • Sample: '{title[:60]}...'")
                    
                    time.sleep(1)  # Rate limiting
                    
                elif response.status_code == 401:
                    print("      ❌ 401 Unauthorized - Invalid API key")
                    break
                else:
                    print(f"      ❌ API error: {response.status_code}")
                    
            except Exception as e:
                print(f"      ⚠️ Request failed: {str(e)}")
        
        if all_articles:
            processed_articles = []
            for i, article in enumerate(all_articles):
                try:
                    # Enhanced sentiment analysis - COMPLETELY FIXED
                    content = f"{article.get('title', '')} {article.get('description', '')}"
                    sentiment_score = fixed_sentiment_analysis(content)
                    
                    # Parse date safely
                    published_at = parse_news_date(article.get('publishedAt', ''))
                    
                    # Extract keywords safely
                    keywords = extract_supply_chain_keywords(content)
                    
                    # Create article data
                    news_article = (
                        f"REAL_NEWS_{i}_{int(datetime.now().timestamp())}",  # article_id
                        article.get('title', 'No Title'),                    # title
                        article.get('description', 'No Description'),        # description
                        article.get('content', 'No Content'),                # content
                        published_at,                                        # published_at
                        article.get('source', {}).get('name', 'Unknown Source'),  # source
                        article.get('url', ''),                             # url
                        keywords,                                           # keywords
                        float(sentiment_score),                             # sentiment_score
                        datetime.now(),                                     # insert_timestamp
                        'GNEWS_API_REAL'                                    # data_source
                    )
                    processed_articles.append(news_article)
                except Exception as e:
                    print(f"      ⚠️ Failed to process article {i}: {str(e)}")
                    continue
            
            if processed_articles:
                # Create DataFrame with explicit schema
                df = spark.createDataFrame(processed_articles, news_bronze_schema)
                print(f"✅ Processed {len(processed_articles)} REAL news articles")
                
                # Show statistics
                sources = df.select("source").distinct().collect()
                avg_sentiment = df.agg(avg("sentiment_score")).collect()[0][0] or 0.0
                
                print(f"   • Sources: {[row.source for row in sources[:3]]}{'...' if len(sources) > 3 else ''}")
                print(f"   • Avg Sentiment: {avg_sentiment:.2f}")
                
                return df
            else:
                print("❌ No articles could be processed successfully")
                empty_df = spark.createDataFrame([], news_bronze_schema)
                return empty_df
        else:
            print("❌ No articles found in GNews API response")
            empty_df = spark.createDataFrame([], news_bronze_schema)
            return empty_df
            
    except Exception as e:
        print(f"❌ GNews API error: {str(e)}")
        import traceback
        print(f"   • Full error: {traceback.format_exc()}")
        empty_df = spark.createDataFrame([], news_bronze_schema)
        return empty_df

def fixed_sentiment_analysis(text):
    """COMPLETELY FIXED sentiment analysis without min() error"""
    if not text or not isinstance(text, str):
        return 0.0
        
    # Negative impact keywords
    severe_negative = {
        'strike': -1.0, 'closed': -0.9, 'shutdown': -0.9, 'crisis': -0.8,
        'collapse': -0.8, 'emergency': -0.7, 'bankruptcy': -0.9, 'war': -1.0
    }
    
    negative_keywords = {
        'delay': -0.6, 'congestion': -0.6, 'disruption': -0.7, 'shortage': -0.6,
        'backlog': -0.5, 'bottleneck': -0.5, 'protest': -0.6, 'waiting': -0.4,
        'slowdown': -0.5, 'problem': -0.4, 'issue': -0.4, 'challenge': -0.3
    }
    
    positive_keywords = {
        'recovery': 0.7, 'improved': 0.6, 'efficient': 0.5, 'solution': 0.5,
        'resolved': 0.6, 'normalized': 0.5, 'reopened': 0.6, 'cleared': 0.5,
        'growth': 0.4, 'expansion': 0.4, 'success': 0.5, 'breakthrough': 0.6
    }
    
    text_lower = text.lower()
    sentiment_score = 0.0
    keyword_count = 0
    
    # Check severe negative keywords
    for word, weight in severe_negative.items():
        if word in text_lower:
            sentiment_score += weight
            keyword_count += 1
    
    # Check regular negative keywords
    for word, weight in negative_keywords.items():
        if word in text_lower:
            sentiment_score += weight
            keyword_count += 1
    
    # Check positive keywords
    for word, weight in positive_keywords.items():
        if word in text_lower:
            sentiment_score += weight
            keyword_count += 1
    
    # Normalize score - COMPLETELY FIXED: No min() with multiple arguments
    if keyword_count > 0:
        final_score = sentiment_score / keyword_count
        
        # Clamp between -1 and 1 using proper if-else (no min() with multiple args)
        if final_score > 1.0:
            return 1.0
        elif final_score < -1.0:
            return -1.0
        else:
            return final_score
    else:
        return 0.0

def extract_supply_chain_keywords(text):
    """Extract relevant supply chain keywords"""
    if not text or not isinstance(text, str):
        return []
        
    keywords = [
        'port', 'shipping', 'logistics', 'supply chain', 'cargo', 'container',
        'maritime', 'freight', 'vessel', 'terminal', 'customs', 'export', 'import',
        'trade', 'shipping line', 'carrier', 'warehouse', 'distribution',
        'transport', 'delivery', 'inventory', 'procurement'
    ]
    
    text_lower = text.lower()
    found_keywords = []
    
    for kw in keywords:
        if kw in text_lower:
            found_keywords.append(kw)
    
    # Return top 8 keywords maximum
    return found_keywords[:8]

def parse_news_date(date_string):
    """Parse news date string safely"""
    try:
        if date_string:
            # Handle various date formats
            if 'T' in date_string and 'Z' in date_string:
                return datetime.fromisoformat(date_string.replace('Z', '+00:00'))
            else:
                # Try other common formats
                return datetime.fromisoformat(date_string)
        else:
            return datetime.now()
    except:
        return datetime.now()

In [0]:
# MAGIC %md
# MAGIC ## 🚀 Data Ingestion Execution - WITH FIXED SENTIMENT

# COMMAND ----------

print("🚀 STARTING REAL DATA INGESTION PIPELINE...")
print("=" * 60)
print("📝 NOTE: Using REAL APIs only - No sample data generation")

# Store MLflow run ID for later use
mlflow_run_id = None

try:
    # Start MLflow run to track data collection
    with mlflow.start_run(run_name="real_data_ingestion_fixed_sentiment") as run:
        mlflow_run_id = run.info.run_id
        print(f"🔬 MLflow Run Started: {mlflow_run_id}")
        
        # Track ingestion parameters
        mlflow.log_param("data_source_gdacs", "API_REAL")
        mlflow.log_param("data_source_news", "API_REAL")
        mlflow.log_param("ingestion_timestamp", datetime.now().isoformat())
        mlflow.log_param("sentiment_analysis", "FIXED_VERSION")
        
        print("\n1. Ingesting GDACS Disaster Data...")
        start_time = time.time()
        gdacs_df = fetch_real_gdacs_disasters()
        gdacs_duration = time.time() - start_time
        
        gdacs_df.write.mode("overwrite").saveAsTable("supply_chain_analysis.bronze_gdacs_alerts")
        mlflow.log_metric("gdacs_events_collected", gdacs_df.count())
        mlflow.log_metric("gdacs_ingestion_seconds", gdacs_duration)
        
        print(f"   ✅ Stored {gdacs_df.count()} REAL disaster events ({gdacs_duration:.1f}s)")
        
        print("\n2. Ingesting Supply Chain News...")
        start_time = time.time()
        news_df = fetch_real_supply_chain_news()  # This now uses the FIXED version
        news_duration = time.time() - start_time
        
        news_df.write.mode("overwrite").saveAsTable("supply_chain_analysis.bronze_supply_chain_news")
        mlflow.log_metric("news_articles_collected", news_df.count())
        mlflow.log_metric("news_ingestion_seconds", news_duration)
        
        print(f"   ✅ Stored {news_df.count()} REAL news articles ({news_duration:.1f}s)")
        
        # Log additional metrics
        total_data = gdacs_df.count() + news_df.count()
        mlflow.log_metric("total_data_points", total_data)
        mlflow.log_metric("overall_ingestion_seconds", gdacs_duration + news_duration)
        
        print(f"🎉 REAL DATA INGESTION COMPLETED! Total data points: {total_data}")

except Exception as e:
    print(f"❌ MLflow run error: {e}")
    print("🔄 Continuing with data ingestion without MLflow...")
    
    print("\n1. Ingesting GDACS Disaster Data...")
    gdacs_df = fetch_real_gdacs_disasters()
    gdacs_df.write.mode("overwrite").saveAsTable("supply_chain_analysis.bronze_gdacs_alerts")
    print(f"   ✅ Stored {gdacs_df.count()} REAL disaster events")
    
    print("\n2. Ingesting Supply Chain News...")
    news_df = fetch_real_supply_chain_news()  # This now uses the FIXED version
    news_df.write.mode("overwrite").saveAsTable("supply_chain_analysis.bronze_supply_chain_news")
    print(f"   ✅ Stored {news_df.count()} REAL news articles")

🚀 STARTING REAL DATA INGESTION PIPELINE...
📝 NOTE: Using REAL APIs only - No sample data generation
🔬 MLflow Run Started: 67a32be91f8c46108d00eb9befc9d30c

1. Ingesting GDACS Disaster Data...
🌪️ Fetching REAL disaster data from GDACS API...
   • API Endpoint: https://www.gdacs.org/gdacsapi/api/events/geteventlist/SEARCH
   • Search Period: Last 30 days
   • Alert Levels: All
   • Response Status: 200
   • Found 100 features in API response
✅ Successfully collected 100 REAL disaster events
   • Event Types: ['TC', 'EQ', 'WF', 'FL']
   • Countries: ['Russia', 'Philippines', 'Fiji', 'Northern East Pacfic Rise', 'South Of Panama']...
   • Alert Levels: ['Orange', 'Green']
   ✅ Stored 100 REAL disaster events (2.9s)

2. Ingesting Supply Chain News...
📰 Fetching REAL supply chain news from GNews API...
   • API Key: cd8e55ad...c34d
   • Searching: 'port congestion shipping'
      • Status: 200
      ✅ Found 0 articles
   • Searching: 'supply chain disruption'
      • Status: 200
      ✅ Foun

In [0]:
def analyze_real_data_quality():
    """Analyze quality of REAL ingested data"""
    print("📊 REAL DATA QUALITY ANALYSIS")
    print("=" * 50)
    
    # GDACS Data Analysis
    gdacs_table = spark.table("supply_chain_analysis.bronze_gdacs_alerts")
    gdacs_count = gdacs_table.count()
    
    print("🌪️ GDACS Disaster Data:")
    print(f"   • Total Events: {gdacs_count}")
    
    if gdacs_count > 0:
        event_types = gdacs_table.select("event_type").distinct().collect()
        countries = gdacs_table.select("country").distinct().collect()
        data_source = gdacs_table.select("data_source").first()[0]
        
        print(f"   • Event Types: {[row.event_type for row in event_types]}")
        print(f"   • Countries: {[row.country for row in countries[:3]]}{'...' if len(countries) > 3 else ''}")
        print(f"   • Data Source: {data_source}")
    else:
        print("   • ⚠️ No disaster data collected from API")
    
    # News Data Analysis
    news_table = spark.table("supply_chain_analysis.bronze_supply_chain_news")
    news_count = news_table.count()
    
    print("\n📰 Supply Chain News:")
    print(f"   • Total Articles: {news_count}")
    
    if news_count > 0:
        sources = news_table.select("source").distinct().collect()
        avg_sentiment = news_table.agg(avg("sentiment_score")).collect()[0][0]
        data_source = news_table.select("data_source").first()[0]
        
        print(f"   • Sources: {[row.source for row in sources[:3]]}{'...' if len(sources) > 3 else ''}")
        print(f"   • Avg Sentiment: {avg_sentiment:.2f}")
        print(f"   • Data Source: {data_source}")
    else:
        print("   • ⚠️ No news data collected from API")
    
    return gdacs_count, news_count

gdacs_count, news_count = analyze_real_data_quality()

📊 REAL DATA QUALITY ANALYSIS
🌪️ GDACS Disaster Data:
   • Total Events: 100
   • Event Types: ['EQ', 'WF', 'TC', 'FL']
   • Countries: ['Russia', 'Philippines', 'South Of Kermadec Islands']...
   • Data Source: GDACS_API_REAL

📰 Supply Chain News:
   • Total Articles: 39
   • Sources: ['Wilmington News Journal', 'Fast Company', 'The Hindu Business Line']...
   • Avg Sentiment: -0.33
   • Data Source: GNEWS_API_REAL


In [0]:
print("🔍 PREVIEW OF REAL DATA COLLECTED")
print("=" * 50)

if gdacs_count > 0:
    print("\n🌪️ Recent Disaster Alerts:")
    display(spark.sql("""
    SELECT event_id, event_type, event_name, country, alert_level, start_date 
    FROM supply_chain_analysis.bronze_gdacs_alerts 
    ORDER BY start_date DESC 
    LIMIT 5
    """))
else:
    print("\n🌪️ No disaster data available from GDACS API")

if news_count > 0:
    print("\n📰 Recent Supply Chain News:")
    display(spark.sql("""
    SELECT article_id, title, source, sentiment_score, published_at 
    FROM supply_chain_analysis.bronze_supply_chain_news 
    ORDER BY published_at DESC 
    LIMIT 5
    """))
else:
    print("\n📰 No news data available from GNews API")

🔍 PREVIEW OF REAL DATA COLLECTED

🌪️ Recent Disaster Alerts:


event_id,event_type,event_name,country,alert_level,start_date
1503491,EQ,Earthquake in Fiji,Fiji,Green,2025-10-04T18:50:54Z
1503487,EQ,Earthquake in Russia,Russia,Green,2025-10-04T17:43:53Z
1503482,EQ,Earthquake in Off East Coast Of Kamchatka,Off East Coast Of Kamchatka,Green,2025-10-04T17:38:01Z
1503467,EQ,Earthquake in Japan,Japan,Green,2025-10-04T15:21:09Z
1503456,EQ,Earthquake in Japan,Japan,Green,2025-10-04T14:15:19Z



📰 Recent Supply Chain News:


article_id,title,source,sentiment_score,published_at
REAL_NEWS_29_1759611147,"Falling ocean shipping rates put carrier profits at risk, analysts say",The Economic Times,-0.9,2025-10-04T05:01:00Z
REAL_NEWS_30_1759611147,China eyes Arctic shortcut as top container lines stay away,The Economic Times,0.0,2025-10-04T04:36:00Z
REAL_NEWS_31_1759611147,"Falling ocean shipping rates put carrier profits at risk, analysts say",Reuters,0.0,2025-10-03T18:37:38Z
REAL_NEWS_32_1759611147,LNG row splits Europe as US challenges net-zero shipping deal,Euractiv,-0.3,2025-10-01T04:00:19Z
REAL_NEWS_33_1759611147,Global container shipping lines making waves,The Hindu Business Line,0.0,2025-09-30T16:05:59Z


In [0]:
# Update configuration with REAL ingestion results
config_path = "/dbfs/FileStore/supply_chain/config.json"

try:
    with open(config_path, "r") as f:
        config = json.load(f)
    
    config['ingestion_completed'] = True
    config['ingestion_timestamp'] = datetime.now().isoformat()
    config['gdacs_events'] = gdacs_count
    config['news_articles'] = news_count
    config['real_data_only'] = True
    config['sample_data_used'] = False
    
    if mlflow_run_id:
        config['mlflow_run_id'] = mlflow_run_id
    
    with open(config_path, "w") as f:
        json.dump(config, f, indent=2)
    
    print("✅ Configuration updated with REAL data ingestion results")
    
except Exception as e:
    print(f"❌ Failed to update configuration: {e}")

✅ Configuration updated with REAL data ingestion results


In [0]:

print("""
🎉 REAL DATA INGESTION COMPLETED!

📊 REAL DATA SUMMARY:
• Disaster Alerts: {} events
• News Articles: {} articles
• Data Source: ✅ REAL APIs ONLY
• Sample Data: ❌ NOT USED
• Data Integrity: ✅ MAINTAINED

🔧 API PERFORMANCE:
• GDACS API: {} 
• GNews API: {}

📈 NEXT STEPS:
• Proceed to Notebook 3 for Feature Engineering
• Even with limited data, we can build the ML pipeline
• Real-world data scenarios include empty/limited responses
• System designed for real production use cases

💡 PRODUCTION READY:
• No synthetic data contamination
• Real API error handling
• Empty dataset management
• Production-grade data pipeline
""".format(
    gdacs_count, news_count,
    "✅ DATA" if gdacs_count > 0 else "⚠️ NO DATA",
    "✅ DATA" if news_count > 0 else "⚠️ NO DATA"
))


🎉 REAL DATA INGESTION COMPLETED!

📊 REAL DATA SUMMARY:
• Disaster Alerts: 100 events
• News Articles: 39 articles
• Data Source: ✅ REAL APIs ONLY
• Sample Data: ❌ NOT USED
• Data Integrity: ✅ MAINTAINED

🔧 API PERFORMANCE:
• GDACS API: ✅ DATA 
• GNews API: ✅ DATA

📈 NEXT STEPS:
• Proceed to Notebook 3 for Feature Engineering
• Even with limited data, we can build the ML pipeline
• Real-world data scenarios include empty/limited responses
• System designed for real production use cases

💡 PRODUCTION READY:
• No synthetic data contamination
• Real API error handling
• Empty dataset management
• Production-grade data pipeline

