# Project: Data Pipeline with Pandas and MongoDB

## Architecture of the solution

This project utilizes a data pipeline where:
- **Pandas** is used to read and transform the data from a CSV file.
- **MongoDB** acts as the document database to store the transformed data.
- The pipeline is linear: CSV → Pandas → MongoDB.

**Note:** Spark is not used in this implementation. If required, this pipeline could be extended with PySpark for distributed processing.


## Schema Design and Indexing Strategy

The data is structured in the following document format in MongoDB:

```json
{
    "date": ISODate("2020-03-15T00:00:00Z"),
    "country": "Finland",
    "stats": {
        "cumulative_total_cases": 100,
        "daily_new_cases": 10,
        "active_cases": 80,
        "cumulative_total_deaths": 5,
        "daily_new_deaths": 1
    }
}
```

This format is chosen for clarity and efficiency when querying COVID-19 statistics per country and date.

### Indexes

Indexes are created on:
- `date`
- `country`
- Composite index on `date` and `country`

These indexes help in:
- Fast retrieval of data for a given country
- Efficient filtering for reports on specific dates


## Step-by-step Implementation

This section details the data loading, cleaning, transformation, and insertion into MongoDB.


In [1]:
from pymongo import MongoClient
import time

client = MongoClient("mongodb://localhost:27017/")
db = client['Project']

In [2]:
# Step 1: Load data
import pandas as pd

# Load the CSV file into a DataFrame
df = pd.read_csv("Covid19.csv")
df['date'] = pd.to_datetime(df['date'])

client = MongoClient("mongodb://localhost:27017/")
db = client["Project"]
collection = db["CovidData"]


In [3]:
# Step 2: Convert and clean date and numeric columns
print(df.info())

# Fix column names (strip spaces just in case)
df.columns = [col.strip() for col in df.columns]

# Convert date column to datetime
df['date'] = pd.to_datetime(df['date'], errors='coerce')

# Make sure numerical columns are numbers
numerical_columns = [
    'cumulative_total_cases', 'daily_new_cases',
    'active_cases', 'cumulative_total_deaths', 'daily_new_deaths'
]

for col in numerical_columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# Drop rows with missing critical data
df.dropna(subset=['date', 'country'], inplace=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 184787 entries, 0 to 184786
Data columns (total 7 columns):
 #   Column                   Non-Null Count   Dtype         
---  ------                   --------------   -----         
 0   date                     184787 non-null  datetime64[ns]
 1   country                  184787 non-null  object        
 2   cumulative_total_cases   184787 non-null  float64       
 3   daily_new_cases          174329 non-null  float64       
 4   active_cases             166747 non-null  float64       
 5   cumulative_total_deaths  178227 non-null  float64       
 6   daily_new_deaths         157850 non-null  float64       
dtypes: datetime64[ns](1), float64(5), object(1)
memory usage: 9.9+ MB
None


In [4]:
# Step 3: Structure stats as a subdocument
df['stats'] = df.apply(lambda row: {
    "cumulative_total_cases": row["cumulative_total_cases"],
    "daily_new_cases": row["daily_new_cases"],
    "active_cases": row["active_cases"],
    "cumulative_total_deaths": row["cumulative_total_deaths"],
    "daily_new_deaths": row["daily_new_deaths"]
}, axis=1)

# Keep only essential columns for insertion
mongo_df = df[['date', 'country', 'stats']]

In [5]:
# Step 4: Insert data into MongoDB
from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["Project"]
collection = db["CovidData"]

# Convert to records
data_to_insert = mongo_df.to_dict("records")

# Insert the data
collection.insert_many(data_to_insert)

# Create indexes for query performance
collection.create_index([("date", 1)])
collection.create_index([("country", 1)])
collection.create_index([("date", 1), ("country", 1)])


'date_1_country_1'

In [6]:
# Step 6: Example queries
# All data for Finland
collection.find({"country": "Finland"})

# Data for Finland on a specific date
from datetime import datetime
collection.find({"country": "Finland", "date": datetime(2020, 3, 15)})

for doc in collection.find({"daily_new_cases": {"$gt": 100}}):
    print(doc)


## MongoDB Query Performance Tuning

We use `create_index()` on key fields such as `date` and `country`. These significantly reduce query time for analytics.

You can use the following command to analyze query plans:
```python
collection.find({"country": "Finland"}).explain()
```
This gives insight into how MongoDB uses indexes and can help you identify further optimization opportunities.
