##  A) Ingest the dataset using PySpark

## Step 1: Install Java, Spark & Required Packages

In [1]:
# Install Java and required dependencies
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark properly
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz

# Extract Spark
!tar -xvzf spark-3.2.1-bin-hadoop2.7.tgz

# Install dependencies
!pip install -q findspark pyspark


spark-3.2.1-bin-hadoop2.7/
spark-3.2.1-bin-hadoop2.7/LICENSE
spark-3.2.1-bin-hadoop2.7/NOTICE
spark-3.2.1-bin-hadoop2.7/R/
spark-3.2.1-bin-hadoop2.7/R/lib/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/DESCRIPTION
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/INDEX
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/Rd.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/features.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/hsearch.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/links.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/nsInfo.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/package.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/NAMESPACE
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR.rdb
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR.rdx
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/help/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/help/An

## Step 2: Set Environment Variables

In [2]:
import os

# Seting up environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"


## Step 3: Initialize Spark

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Starting Spark session
spark = SparkSession.builder.appName("GlobalTerrorismETL").getOrCreate()


## Step 4: Upload and Ingest the Dataset

In [4]:
# Reading dataset
df = spark.read.csv("globalterrorismdb_0718dist.csv", header=True, inferSchema=True)


df.show(5)
df.printSchema()

print("Rows:", df.count())
print("Columns:", len(df.columns))


+------------+-----+------+----+----------+--------+----------+-------+------------------+------+--------------------+---------+-------------+---------+----------+-----------+--------+--------+-------+-----+-----+-----+---------+-----------+---------------+--------+-------+-------+-----------+--------------------+-----------+---------------+-----------+---------------+---------+--------------------+------------+--------------------+--------------------+--------------------+-------+------------------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+--------------------+--------+------+---------+------+---------+------+-----------+-----------+-----------+----------+------+--------+-------+---------+-------------+------+----------+--------------+------+----------+--------------+---------+---------+-------------+------------+--------------------+---------+---------

## B) Basic Curation

## Null Handling

## Step 1: Check for Nulls Column-Wise

In [5]:
from pyspark.sql.functions import col, isnan, when, count


df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show(n=1, truncate=False)


+-------+-----+------+----+----------+--------+----------+-------+-----------+------+----------+---------+----+--------+---------+-----------+--------+--------+-------+-----+-----+-----+---------+-----------+---------------+--------+-------+-------+-----------+---------------+-----------+---------------+-----------+---------------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+-----+--------+------+---------+------+---------+------+-----------+-----------+-----------+----------+------+--------+-------+---------+-------------+------+----------+--------------+------+----------+--------------+---------+---------+-------------+------------+----------------+---------+-------------+------------+----------------+---------+-------------+------------+----------------+---------+----

## Step 2: Apply Null Handling

In [6]:
# Drop rows with critical nulls
df_cleaned = df.dropna(subset=["eventid", "iyear", "country_txt", "attacktype1_txt"])

# Fill numeric columns with 0
df_cleaned = df_cleaned.fillna({'nkill': 0, 'nwound': 0})

# Fill text columns with 'Unknown'
df_cleaned = df_cleaned.fillna({'summary': 'Unknown', 'city': 'Unknown'})


## Step 3: Confirm No Nulls Left (Optional Check)

In [7]:
df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_cleaned.columns
]).show(n=1, truncate=False)


+-------+-----+------+----+----------+--------+----------+-------+-----------+------+----------+---------+----+--------+---------+-----------+--------+--------+-------+-----+-----+-----+---------+-----------+---------------+--------+-------+-------+-----------+---------------+-----------+---------------+-----------+---------------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+---------+-------------+------------+----------------+-----+-------+-------+-----------+-----+--------+------+---------+------+---------+------+-----------+-----------+-----------+----------+------+--------+-------+---------+-------------+------+----------+--------------+------+----------+--------------+---------+---------+-------------+------------+----------------+---------+-------------+------------+----------------+---------+-------------+------------+----------------+---------+----

## Deduplication

## Step 1: Remove Fully Duplicate Rows (all columns identical)

In [8]:
df_deduped = df_cleaned.dropDuplicates()


## Step 2: Remove Based on a Key Column (e.g., eventid)

In [9]:
df_deduped = df_cleaned.dropDuplicates(["eventid"])


## Step 3: See the Effect

In [10]:
print("Before deduplication:", df_cleaned.count())
print("After deduplication :", df_deduped.count())


Before deduplication: 36803
After deduplication : 36803


## Feature Engineering

In [11]:
from pyspark.sql.functions import col, when, concat_ws

# Fill nkill and nwound with 0 if nulls still exist
df_fe = df_deduped.fillna({'nkill': 0, 'nwound': 0})

# Add 'casualties' = nkill + nwound
df_fe = df_fe.withColumn("casualties", col("nkill") + col("nwound"))

# Add 'is_successful' = True if success == 1
df_fe = df_fe.withColumn("is_successful", when(col("success") == 1, True).otherwise(False))

# Add 'is_suicide_attack' = True if suicide == 1
df_fe = df_fe.withColumn("is_suicide_attack", when(col("suicide") == 1, True).otherwise(False))

# Add 'year_month' = "iyear-imonth"
df_fe = df_fe.withColumn("year_month", concat_ws("-", col("iyear"), col("imonth")))


In [12]:
df_fe.select("iyear", "imonth", "nkill", "nwound", "casualties",
             "success", "is_successful",
             "suicide", "is_suicide_attack", "year_month").show(10)


+-----+------+-----+------+----------+-------+-------------+-------+-----------------+----------+
|iyear|imonth|nkill|nwound|casualties|success|is_successful|suicide|is_suicide_attack|year_month|
+-----+------+-----+------+----------+-------+-------------+-------+-----------------+----------+
| 1970|     7|    1|     0|       1.0|      1|         true|      0|            false|    1970-7|
| 1970|     0|    0|     0|       0.0|      1|         true|      0|            false|    1970-0|
| 1970|     1|    1|     0|       1.0|      1|         true|      0|            false|    1970-1|
| 1970|     1|    0|     0|       0.0|      1|         true|      0|            false|    1970-1|
| 1970|     1|    0|     0|       0.0|      1|         true|      0|            false|    1970-1|
| 1970|     1|    0|     0|       0.0|      1|         true|      0|            false|    1970-1|
| 1970|     1|    0|     0|       0.0|      0|        false|      0|            false|    1970-1|
| 1970|     1|    0|

## Store cleaned data to a local or mock S3 bucket

## Step 1: Choose a mock S3 path

In [13]:
mock_s3_path = "/content/mock_s3_bucket/global_terrorism_cleaned"


## Step 2: Save the DataFrame as a CSV

In [14]:
df_fe.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(mock_s3_path)


## Step 3: (Optional) Check If File Saved

In [15]:
import os


os.listdir("/content/mock_s3_bucket/global_terrorism_cleaned")


['.part-00001-9cd5a7cc-f30c-4428-8a2b-1a4c660bf692-c000.csv.crc',
 '_SUCCESS',
 'part-00001-9cd5a7cc-f30c-4428-8a2b-1a4c660bf692-c000.csv',
 '.part-00000-9cd5a7cc-f30c-4428-8a2b-1a4c660bf692-c000.csv.crc',
 '._SUCCESS.crc',
 'part-00000-9cd5a7cc-f30c-4428-8a2b-1a4c660bf692-c000.csv']

## Build and expose a simple FastAPI endpoint that fetches sample records from this cleaned dataset

In [16]:
!pip install fastapi uvicorn pyngrok nest-asyncio pandas




In [17]:
import pandas as pd
import glob


file_paths = glob.glob("/content/mock_s3_bucket/global_terrorism_cleaned/part-*.csv")


dfs = []
for fp in file_paths:
    try:
        df = pd.read_csv(fp, on_bad_lines='skip', engine='python')
        dfs.append(df)
    except Exception as e:
        print(f"Skipped file {fp} due to error: {e}")


df_sample = pd.concat(dfs, ignore_index=True)


df_sample.head()


Unnamed: 0,eventid,iyear,imonth,iday,approxdate,extended,resolution,country,country_txt,region,...,dbsource,INT_LOG,INT_IDEO,INT_MISC,INT_ANY,related,casualties,is_successful,is_suicide_attack,year_month
0,197000000001,1970,7,2,,0,,58,Dominican Republic,2,...,PGIS,0,0,0,0,,1.0,True,False,1970-7
1,197001000003,1970,1,0,,0,,101,Japan,4,...,PGIS,-9,-9,1,1,,0.0,True,False,1970-1
2,197001020001,1970,1,2,,0,,218,Uruguay,3,...,PGIS,0,0,0,0,,0.0,False,False,1970-1
3,197001110001,1970,1,11,,0,,65,Ethiopia,11,...,PGIS,0,1,1,1,,1.0,True,False,1970-1
4,197001200001,1970,1,20,,0,,83,Guatemala,2,...,PGIS,-9,-9,1,1,,1.0,True,False,1970-1


In [18]:
!ngrok config add-authtoken 2yz3hDIGGRz3GFEoYzfvXIMr3Gn_7EyABDX2tJETUvY8iZznu


Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [19]:
from fastapi import FastAPI
from typing import Optional
import nest_asyncio
from pyngrok import ngrok
import uvicorn

nest_asyncio.apply()


app = FastAPI()

@app.get("/")
def root():
    return {"message": "🎉 Global Terrorism Cleaned API is running!"}

@app.get("/sample")
def get_sample(n: Optional[int] = 5):
    return df_sample.sample(n).to_dict(orient="records")


public_url = ngrok.connect(8000)
print("🎯 Your FastAPI endpoint is available at:", public_url)


uvicorn.run(app, port=8000)




🎯 Your FastAPI endpoint is available at: NgrokTunnel: "https://ad73-34-125-105-140.ngrok-free.app" -> "http://localhost:8000"


INFO:     Started server process [1216]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [1216]


In [21]:
from fastapi import FastAPI
from typing import Optional
import nest_asyncio
from pyngrok import ngrok
import uvicorn
import threading

# Apply asyncio patch
nest_asyncio.apply()

# Define FastAPI app
app = FastAPI()

@app.get("/")
def root():
    return {"message": "🎉 FastAPI + PySpark ETL API is running!"}

@app.get("/sample")
def sample(n: Optional[int] = 5):
    return df_sample.sample(n).to_dict(orient="records")

# Function to run app (must bind to 0.0.0.0!)
def start_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

# Start FastAPI server in background
server_thread = threading.Thread(target=start_server)
server_thread.start()

# Create ngrok tunnel
public_url = ngrok.connect(8000)
print("🚀 API is live at:", public_url)


INFO:     Started server process [1216]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
ERROR:    [Errno 98] error while attempting to bind on address ('0.0.0.0', 8000): address already in use
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.


🚀 API is live at: NgrokTunnel: "https://f289-34-125-105-140.ngrok-free.app" -> "http://localhost:8000"


## Use Airflow for orchestration (can be mock/dry-run)

In [22]:
!pip install apache-airflow==2.6.3 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.8.txt"


Collecting apache-airflow==2.6.3
  Downloading apache_airflow-2.6.3-py3-none-any.whl.metadata (119 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/119.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.5/119.5 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.6.3 (from apache-airflow==2.6.3)
  Downloading alembic-1.11.1-py3-none-any.whl.metadata (7.2 kB)
Collecting argcomplete>=1.10 (from apache-airflow==2.6.3)
  Downloading argcomplete-3.1.1-py3-none-any.whl.metadata (16 kB)
Collecting asgiref (from apache-airflow==2.6.3)
  Downloading asgiref-3.7.2-py3-none-any.whl.metadata (9.2 kB)
Collecting attrs>=22.1.0 (from apache-airflow==2.6.3)
  Downloading attrs-23.1.0-py3-none-any.whl.metadata (11 kB)
Collecting blinker (from apache-airflow==2.6.3)
  Downloading blinker-1.6.2-py3-none-any.whl.metadata (2.0 kB)
Collecting cattrs>=22.1.0 (from apache-airflow==2.6.3)
  Downloading c

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Server.serve() done, defined at /usr/local/lib/python3.11/dist-packages/uvicorn/server.py:68> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/main.py", line 580, in run
    server.run()
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/usr/lib/python3.11/asyncio/events.py", line 84, in _run
    se

In [1]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime


def ingest():
    print("✅ Ingested CSV using PySpark")

def clean():
    print("✅ Nulls handled & deduplication done")

def feature_engineer():
    print("✅ Feature engineering completed")

def save_to_s3():
    print("✅ Cleaned data saved to mock S3 bucket")

def start_api():
    print("✅ FastAPI launched (simulated)")

# DAG definition
dag = DAG(
    'mini_etl_pipeline',
    description='Simulated ETL using Airflow',
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

# Task definitions
t1 = PythonOperator(task_id='ingest', python_callable=ingest, dag=dag)
t2 = PythonOperator(task_id='clean', python_callable=clean, dag=dag)
t3 = PythonOperator(task_id='feature_engineering', python_callable=feature_engineer, dag=dag)
t4 = PythonOperator(task_id='save_to_s3', python_callable=save_to_s3, dag=dag)
t5 = PythonOperator(task_id='start_api', python_callable=start_api, dag=dag)

# Task flow
t1 >> t2 >> t3 >> t4 >> t5


<Task(PythonOperator): start_api>