# Big Data Project: Hipparcos Dataset Analysis with MongoDB and MapReduce

This notebook demonstrates the Big Data project workflow:
- Ingest the Hipparcos dataset into MongoDB.
- Perform data preprocessing and cleaning.
- Apply MapReduce for analysis.
- Visualize insights.

Dataset: Hipparcos catalog (118k+ stars) from Kaggle or similar source.

## 1. Setup and Imports

In [None]:
import pymongo
from pymongo import MongoClient
from bson.code import Code
import pandas as pd
import matplotlib.pyplot as plt

## 2. Connect to MongoDB

In [None]:
# Connect to MongoDB running in Docker
client = MongoClient("localhost", 27017)

# Create/select database and collection
db = client["hipparcos_db"]
collection = db["stars"]

print("Connected to MongoDB")
print("Database:", db.name)
print("Collection:", collection.name)

## 3. Inspect the Dataset

In [None]:
# Load the Hipparcos dataset
df = pd.read_csv("hipparcos-voidmain.csv")

# Inspect the data
print("Dataset shape:", df.shape)
print("\nFirst 5 rows:")
print(df.head())
print("\nData info:")
print(df.info())
print("\nMissing values per column:")
print(df.isnull().sum())

## 4. Data Preprocessing and Cleaning

In [None]:
# Handle missing values
# Fill missing spectral types with 'Unknown'
df['SpType'] = df['SpType'].fillna('Unknown')

# Fill missing parallaxes with 0 (for distant stars)
df['Plx'] = df['Plx'].fillna(0)

# Convert data types if needed (e.g., ensure numeric columns are float)
numeric_cols = ['Vmag', 'Plx', 'pmRA', 'pmDE', 'RAdeg', 'DEdeg']
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)

print("Preprocessing complete. Missing values after cleaning:")
print(df.isnull().sum().sum(), "total missing values")

# Prepare data for MongoDB (convert to list of dicts)
data = df.to_dict('records')
print(f"Prepared {len(data)} documents for ingestion")

## 5. Ingest Data into MongoDB

In [None]:
# Ingest data into MongoDB
try:
    collection.insert_many(data)
    print("Data ingestion successful!")
    print(f"Inserted {len(data)} documents into {collection.name}")
except Exception as e:
    print("Error during ingestion:", e)

# Verify insertion
doc_count = collection.count_documents({})
print(f"Total documents in collection: {doc_count}")

## 6. Apply MapReduce: Example 1 - Count Stars by Spectral Type

In [None]:
# MapReduce: Count stars by spectral type
map_function = Code("""
function() {
    emit(this.SpType, 1);
}
""")

reduce_function = Code("""
function(key, values) {
    return Array.sum(values);
}
""")

# Run MapReduce
result_collection = db["spectral_type_count"]
collection.map_reduce(map_function, reduce_function, out=result_collection.name)

# Fetch and display results
results = list(result_collection.find().sort("value", -1).limit(10))
print("Top 10 spectral types by count:")
for res in results:
    print(f"{res['_id']}: {res['value']} stars")