### ðŸ§± STAGE 1 â€” BASE RUNTIME ONLY (NO BIG DATA YET)

- What you install
  - Java 11
  - Python 3.10 (venv)

- "sudo apt update"
- "sudo apt install openjdk-11-jdk -y"
- set java 11 as default
   - sudo update-alternatives --config java
   - sudo update-alternatives --config javac


- set JAVA_HOME
   - " export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 " 
   - " export PATH=$JAVA_HOME/bin:$PATH "


- apply changes
   - "source ~/.bashrc"
- verify JAVA_HOME

- already have python 3.10 ,created a venv
   - "python3.10 -m venv venv"
- verified "python --version"
  - 3.10.13

___

### ðŸ§ª STAGE 2 â€” PRODUCER WITHOUT KAFKA (CRITICAL)

- What you install
   - Python libraries only:
      - requests
      - pandas

- created producer/coingecko_producer.py
___

### ðŸ§± STAGE 3 â€” ADD KAFKA (NO SPARK, NO HADOOP)

- What you install
  - ZooKeeper
  - Kafka

##### Download Command

- "cd ~/Downloads"
- "wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz"

#### Extract and Move

- "tar -xzf kafka_2.13-3.4.0.tgz"
- "mv kafka_2.13-3.4.0 ~/kafka"


#### Start ZooKeeper

- "cd ~/kafka"
- "bin/zookeeper-server-start.sh config/zookeeper.properties"


##### Leave this terminal running. In another terminal:

- "cd ~/kafka"
- "bin/kafka-server-start.sh config/server.properties"


##### Sanity Check (IMPORTANT). Open a third terminal:

- "cd ~/kafka"
- "bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092"


#### Create Kafka Topic

- from ~/kafka

```bash
bin/kafka-topics.sh \
  --create \
  --topic crypto_prices \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1
```

- Verify:
```bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```

- crypto_prices



___

##### Update Producer (Kafka Enabled)

```python
import requests
import time
import json
from datetime import datetime
from kafka import KafkaProducer

COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
ASSETS = ["bitcoin", "ethereum", "cardano", "solana", "dogecoin"]
CURRENCY = "usd"
POLL_INTERVAL = 10

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

def fetch_prices():
    params = {
        "ids": ",".join(ASSETS),
        "vs_currencies": CURRENCY
    }
    response = requests.get(COINGECKO_URL, params=params, timeout=5)
    response.raise_for_status()
    return response.json()

def main():
    print("CoinGecko Producer â†’ Kafka started")
    while True:
        try:
            data = fetch_prices()
            timestamp = datetime.utcnow().isoformat()

            for asset, price in data.items():
                event = {
                    "asset": asset,
                    "price_usd": price[CURRENCY],
                    "event_time": timestamp
                }
                producer.send("crypto_prices", value=event)
                print("Sent:", event)

            producer.flush()
        except Exception as e:
            print("ERROR:", e)

        time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    main()
```

___

- install dependency if not already done:
- `pip install kafka-python`

- run the producer
- `python producer/coingecko_producer.py`

___

- Verify Kafka Consumer (Critical Proof)
```bash
cd ~/kafka
bin/kafka-console-consumer.sh \
  --topic crypto_prices \
  --bootstrap-server localhost:9092 \
  --from-beginning
```

___

### STAGE 4 â€” SPARK CONSUMER (LOCAL, NO CLUSTER, NO HADOOP)

- This stage introduces exactly ONE new component: Apache Spark (local mode)


- "cd ~"
- "wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz"


- "tar -xvzf spark-3.5.0-bin-hadoop3.tgz"
- "mv spark-3.5.0-bin-hadoop3 spark"
- verify
   - "ls ~/spark"



- "export SPARK_HOME=~/spark"
- "export PATH=$SPARK_HOME/bin:$PATH"
- "spark-submit --version"

___

- "cd ~/codes/Cc_Pipe"
- "nano spark_kafka_consumer.py"


```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Spark session
spark = (
    SparkSession.builder
    .appName("CryptoKafkaConsumer")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# Kafka source
kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "crypto_prices")
    .option("startingOffsets", "latest")
    .load()
)

# Schema for JSON value
schema = StructType([
    StructField("asset", StringType(), True),
    StructField("price_usd", DoubleType(), True),
    StructField("event_time", StringType(), True)
])

# Parse Kafka value
parsed_df = (
    kafka_df
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(col("value"), schema).alias("data"))
    .select("data.*")
)

# Output to console
query = (
    parsed_df
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .start()
)

query.awaitTermination()
```

___

##### RUN SPARK STREAMING JOB

```bash
spark-submit \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
  spark_kafka_consumer.py
```