<a href="https://colab.research.google.com/github/RajnishProgrammer/Google-Colab/blob/main/Stock_Data_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Large-Scale Data Processing Pipeline
## Tech Stack: Python, Apache Kafka, Apache Spark, PostgreSQL, AWS S3
---
Description: Build a system that ingests, processes, and stores large volumes of data in real time.

Use Case: Useful for businesses handling streaming data (e.g., stock market, IoT data, social media feeds).

Key Features:
- Fetch data from APIs or Kafka streams.
- Process and clean data using Pandas/Spark.
- Store processed data in a database (PostgreSQL/BigQuery).
- Expose REST APIs for accessing processed data.



📌 Step 1: Define the Use Case

Before implementation, decide what kind of data you want to process.

Some options:

- Real-time stock market data 🏦 (from Alpha Vantage API)
- Twitter sentiment analysis 📢 (via Twitter API)
- IoT sensor data processing 🏭 (from MQTT brokers)
- Log processing for system monitoring 📊 (server logs)

🛠 Step 2: Tech Stack

- Python → Core language for data processing
- Apache Kafka → Message queue for real-time streaming
- Apache Spark → Distributed data processing
- PostgreSQL / BigQuery → Database for storage
- FastAPI / Flask → API layer to expose processed data
- AWS S3 → Storage for raw data

🚀 Step 3: Build the Pipeline

1️⃣ Data Ingestion

2️⃣ Data Processing & Transformation

3️⃣ Store Processed Data

4️⃣ Expose the Data via API


In [None]:
# 01 Data Ingestion
import json
import requests
import pandas as pd


API_URL = 'https://www.alphavantage.co/query'
API_KEY = 'AHDD20EVTIKJ1QJB'

def fetch_stock_data(symbol):
  params = {
    'function': 'TIME_SERIES_INTRADAY',
    'symbol': symbol,
    'interval': '1min',
    'apikey': API_KEY
  }
  response = requests.get(API_URL, params=params)
  return response.json()

json_data = fetch_stock_data('IBM')
json_str = json.dumps(json_data)
data = pd.read_json(json_str)
data

In [None]:
data['Meta Data']

In [None]:
data['Time Series (1min)']

In [None]:
# 02 Data Processing & Transformation
def clean_data(raw_data):
  df = pd.DataFrame(raw_data['Time Series (1min)']).T
  df = df.astype(float)
  df.index = pd.to_datetime(df.index)
  return df


cleaned_data = clean_data(fetch_stock_data('IBM'))
print(cleaned_data)
print('Done Cleaning...')

In [None]:
cleaned_data.columns = ['open', 'high', 'low', 'close', 'volume']

In [None]:
cleaned_data = cleaned_data.reset_index()

In [None]:
cleaned_data = cleaned_data.rename(columns={'index': 'timestamp'})

In [None]:
raw = fetch_stock_data('IBM')
raw['Time Series (1min)']

In [None]:
temp = pd.DataFrame(raw['Time Series (1min)']).T
temp

In [None]:
temp = temp.astype(float)
temp

In [None]:
temp.index = pd.to_datetime(temp.index)
temp

In [None]:
# 03 Store processed data
import psycopg2

conn = psycopg2.connect("postgresql://postgres:wSiOhjRgzIPRzJYzFLeGUCQSiLGRtvQs@junction.proxy.rlwy.net:53289/railway")

cursor = conn.cursor()
cursor.execute("""
    CREATE TABLE IF NOT EXISTS stock_prices(
      timestamp TIMESTAMP PRIMARY KEY,
      open_price FLOAT,
      close_price FLOAT
    )
""")
conn.commit()

In [None]:
for index, row in cleaned_data.iterrows():

  cursor.execute("""
      INSERT INTO stock_prices (timestamp, open_price, close_price)
      VALUES (%s, %s, %s)
  """, (row['timestamp'], row['open'], row['close']))

conn.commit()

In [None]:
! pip install fastapi

In [None]:
! pip install pyngrok
! pip install uvicorn

In [None]:
# 04 Expose the data via API
from fastapi import FastAPI
from pyngrok import ngrok
import nest_asyncio
import uvicorn
import psycopg2

app = FastAPI()

def get_latest_data():
  conn = psycopg2.connect("postgresql://postgres:wSiOhjRgzIPRzJYzFLeGUCQSiLGRtvQs@junction.proxy.rlwy.net:53289/railway")
  cursor = conn.cursor()
  cursor.execute("SELECT * FROM stock_prices ORDER BY timestamp DESC LIMIT 10")
  return cursor.fetchall()

@app.get('/')
def index():
  return {'result': 'Try this endpoint ---> /latest-prices'}

@app.get("/latest-prices")
def latest_prices():
  return get_latest_data()

In [None]:
nest_asyncio.apply()

In [None]:
! ngrok config add-authtoken 2sXnC1k3Jv39N7S4dk58PF7AKLR_7kugrF1FyFLkp3Pg9np7a

In [None]:
public_url = ngrok.connect(addr="8000", proto="http")
print(f'Public URL: {public_url}')

In [None]:
uvicorn.run(app, host='0.0.0.0', port=8000)

In [None]:
# Install required packages
# !pip install fastapi uvicorn nest-asyncio pyngrok psycopg2

# Import necessary modules
from fastapi import FastAPI
import psycopg2
import nest_asyncio
import uvicorn
from pyngrok import ngrok

# Initialize FastAPI app
app = FastAPI()

# Connect to PostgreSQL database
conn = psycopg2.connect("postgresql://postgres:wSiOhjRgzIPRzJYzFLeGUCQSiLGRtvQs@junction.proxy.rlwy.net:53289/railway")
cursor = conn.cursor()

# Define endpoint to get latest stock prices
@app.get('/')
def index():
  return 'Hello World !!'

@app.get("/latest-prices")
def latest_prices():
    cursor.execute("SELECT * FROM stock_prices ORDER BY timestamp DESC LIMIT 10")
    result = cursor.fetchall()
    return {"data": result}

# Apply nest_asyncio to avoid event loop issues in Colab
nest_asyncio.apply()

# Authenticate ngrok (Replace with your actual authtoken)
!ngrok authtoken 2sXnC1k3Jv39N7S4dk58PF7AKLR_7kugrF1FyFLkp3Pg9np7a

# Start ngrok tunnel
public_url = ngrok.connect(addr="8000", proto="http")
print(f"Public URL: {public_url}")

# Run FastAPI app
uvicorn.run(app, host="localhost", port=8000)

⏳ Step 4: Automate & Deploy

🔹 Automate with Airflow

🔹 Deploy to Cloud (AWS, GCP, or Azure)

In [None]:
# Automate & Deploy
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG("data_pipeline", start_date=datetime(2025, 2, 4), schedule_interval="@hourly")

fetch_task = PythonOperator(
    task_id="fetch_stock_data",
    python_callable=fetch_stock_data,
    op_args=['IBM'],
    dag=dag
)

🚀 Deploy to cloud >
- use docker
- deploy api's on aws lambda or EC2 instance
- use aws rds/big query for databases

🔮 What else:
- Add ml predict trends
- process more data sources
- Use kafka stream for better performance

In [None]:
# THE END