Backend part of NDS portal
Used spark for storing dataset and run queries to search and filter data in dataset
Used fastAPI to build RestAPI's for the front end to recieve data

In [1]:
# install pyspark package
# As we are running python code in google colab we need to upload the dataset file in the runtime files section of colab
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 37.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=0098cf8af107ee0c7807f26ccfe55f689088456889c0663cd9bdef6776594b0f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


Importing all necessary packages/libraries

In [2]:
from pyspark.sql import SparkSession
from datetime import datetime, date
#import pandas as pd
from pyspark.sql import Row
import pyspark.pandas as ps



In [3]:
# initializing spark
spark = SparkSession.builder.getOrCreate()
def main():
    #first read dataset and clean with pandas
    df = ps.read_csv('us_disaster_declarations.csv')
    print(df.shape)
    df = df[['fema_declaration_string','state','declaration_date','fy_declared','incident_type','declaration_title',
    'incident_begin_date','incident_end_date','declaration_request_number','ih_program_declared','ia_program_declared','pa_program_declared',
    'hm_program_declared']]
    #itemfactors = spark.createDataFrame(model.itemFactors.rdd)
    df = df.dropna()
    print(df.shape)
    spark_df = df.to_spark()
    #spark_df.show()
    #spark_df.select("incident_type").distinct().show()
    #went from 63755 records to 55320 record, 23 to 6 columns
    #then convert to spark context after clenaing
    spark_df.createOrReplaceTempView('test')
    # test if spark is functioning properly by running a query
    spark.sql('SELECT * from test where state = "GA"').show()


In [4]:
if __name__ == '__main__':
    main()



(63755, 23)
(63755, 13)




+-----------------------+-----+-------------------+-----------+-------------+--------------------+-------------------+--------------------+--------------------------+-------------------+-------------------+-------------------+-------------------+
|fema_declaration_string|state|   declaration_date|fy_declared|incident_type|   declaration_title|incident_begin_date|   incident_end_date|declaration_request_number|ih_program_declared|ia_program_declared|pa_program_declared|hm_program_declared|
+-----------------------+-----+-------------------+-----------+-------------+--------------------+-------------------+--------------------+--------------------------+-------------------+-------------------+-------------------+-------------------+
|                DR-1-GA|   GA|1953-05-02 00:00:00|       1953|      Tornado|             Tornado|1953-05-02 00:00:00|1953-05-02T00:00:00Z|                     53013|                  0|                  1|                  1|                  1|
|           

Install Fast API 

In [5]:
!pip install fastapi

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting fastapi
  Downloading fastapi-0.88.0-py3-none-any.whl (55 kB)
[K     |████████████████████████████████| 55 kB 2.4 MB/s 
Collecting starlette==0.22.0
  Downloading starlette-0.22.0-py3-none-any.whl (64 kB)
[K     |████████████████████████████████| 64 kB 3.1 MB/s 
Installing collected packages: starlette, fastapi
Successfully installed fastapi-0.88.0 starlette-0.22.0


Install Colab Code to run fast api in google colab
But after running this library multiple times I noticed that the runtime of colab gets disconnected again and again after sometime. 

In [1]:
!pip install colabcode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting colabcode
  Downloading colabcode-0.3.0-py3-none-any.whl (5.0 kB)
Collecting nest-asyncio==1.4.3
  Downloading nest_asyncio-1.4.3-py3-none-any.whl (5.3 kB)
Collecting jupyterlab==3.0.7
  Downloading jupyterlab-3.0.7-py3-none-any.whl (8.3 MB)
[K     |████████████████████████████████| 8.3 MB 4.3 MB/s 
[?25hCollecting uvicorn==0.13.1
  Downloading uvicorn-0.13.1-py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 1.1 MB/s 
[?25hCollecting pyngrok>=5.0.0
  Downloading pyngrok-5.2.1.tar.gz (761 kB)
[K     |████████████████████████████████| 761 kB 30.3 MB/s 
Collecting nbclassic~=0.2
  Downloading nbclassic-0.4.8-py3-none-any.whl (9.8 MB)
[K     |████████████████████████████████| 9.8 MB 15.6 MB/s 
[?25hCollecting tornado>=6.1.0
  Downloading tornado-6.2-cp37-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (423 kB)

Importing all packages

In [6]:
from fastapi import FastAPI
from colabcode import ColabCode
from fastapi.middleware.cors import CORSMiddleware

In [7]:
# function to execute spark query and convert the result into JSON format
def exec_spark_sql_query(query):
    json_rdd = spark.sql(query).toJSON()
    rdd_list = json_rdd.collect()
    output_json = ''
    for i, entry in enumerate(rdd_list):
        if (i == 0):
            output_json = output_json + '['
        if (i == len(rdd_list) - 1):
            return (output_json + entry + ']')
        output_json = output_json + entry + ','
    return output_json

# test to see if everything works till now
#ans = exec_spark_sql_query('SELECT * from test where state = "GA"')
#print(ans)

# configuring the port to run the server for Rest API's
cc = ColabCode(port=12000, code=False)
app = FastAPI()
# have to CORS to fast API to enable browsers to accept data from our API's. 
# if we don't add CORS then our API's request are blocked by the web browser when running the website
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# just have defined one rest api and depending on the query we perform different functionality
# if we would have more time for this project then i would have seperated the functionality into multiple API's. That would be more easy to
# manage and understand in code.
@app.get("/filter/{query}")
async def read_root(query):
  if query == "all":
    # We defined this API to also return the whole dataset but we tried to execute this on our front-end website by the server took 
    # a lot of time to respond that is why we removed it from our front-end
    print("performing all");
    queryResult = exec_spark_sql_query('SELECT * from test')
    return {"data": queryResult}
  # query: CA null null
  # splitting the query to return data for different search function
  # so basically the query format would be in this format: statename disasterType year
  # so when we split it we get the first element as statename, second as disaster type and third as year
  allQuery = query.split(" ")
  # depending on the fields provided in query we filter out the data by running query in spark and return data 
  # for example: if we just got state name in query then we will filter or search dataset on the provided state name
  # same example goes for other fields as well
  # if we are provided multiple fields in the query then we run search or filter dataset on both/all fields provided.
  if(allQuery[0] != "null" and allQuery[1] != "null" and allQuery[2] != "null"):
    queryString = "SELECT * from test where state = '" + allQuery[0] + "' AND incident_type = '" + allQuery[1] + "' AND fy_declared = '" + allQuery[2] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[0] != "null" and allQuery[1] != "null"):
    queryString = "SELECT * from test where state = '" + allQuery[0] + "' AND incident_type = '" + allQuery[1] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[0] != "null" and allQuery[2] != "null"):
    queryString = "SELECT * from test where state = '" + allQuery[0] + "' AND fy_declared = '" + allQuery[2] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[1] != "null" and allQuery[2] != "null"):
    queryString = "SELECT * from test where incident_type = '" + allQuery[1] + "' AND fy_declared = '" + allQuery[2] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[0] != "null"):
    queryString = "SELECT * from test where state = '" + allQuery[0] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[1] != "null"):
    queryString = "SELECT * from test where incident_type = '" + allQuery[1] + "'"
    return {"data": exec_spark_sql_query(queryString)}
  elif(allQuery[2] != "null"):
    queryString = "SELECT * from test where fy_declared = '" + allQuery[2] + "'"
    return {"data": exec_spark_sql_query(queryString)}


In [None]:
# start the server, so that we can call rest api from front-end
# once runned successfully, you would see a console message like: Public URL: NgrokTunnel: "https://026b-34-82-59-139.ngrok.io" -> "http://localhost:12000"
# you can use this kind of url "https://026b-34-82-59-139.ngrok.io" to call your Rest API 
cc.run_app(app=app)



INFO:     Started server process [160]
INFO:uvicorn.error:Started server process [160]
INFO:     Waiting for application startup.
INFO:uvicorn.error:Waiting for application startup.
INFO:     Application startup complete.
INFO:uvicorn.error:Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:12000 (Press CTRL+C to quit)
INFO:uvicorn.error:Uvicorn running on http://127.0.0.1:12000 (Press CTRL+C to quit)


Public URL: NgrokTunnel: "https://7f39-34-75-188-237.ngrok.io" -> "http://localhost:12000"
INFO:     12.156.141.229:0 - "GET /filter/null%20Flood%20null HTTP/1.1" 200 OK
INFO:     12.156.141.229:0 - "GET /filter/CA%20null%20null HTTP/1.1" 200 OK
